Queue and log all entries online.

This commit is contained in:
Daniel Nichter
2013-06-09 11:54:10 -07:00
parent bcb3758f64
commit e26a8744ef
2 changed files with 217 additions and 171 deletions

View File

@@ -72,6 +72,13 @@ has 'data_ts' => (
required => 0,
);
has '_local_q' => (
is => 'rw',
isa => 'ArrayRef',
required => 0,
default => sub { return []; },
);
has '_message_queue' => (
is => 'rw',
isa => 'Object',
@@ -98,64 +105,65 @@ sub enable_online_logging {
$self->_message_queue(Thread::Queue->new());
$self->_thread(
threads::async {
my @log_entries;
my $oktorun = 1;
QUEUE:
while ( $oktorun ) {
my $max_log_entries = 1_000; # for each POST + backlog
while ( $self->_message_queue
&& $self->_message_queue->pending()
&& $max_log_entries--
&& (my $entry = $self->_message_queue->dequeue()) )
{
# $entry = [ ts, level, "message" ]
if ( defined $entry->[0] ) {
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
entry_ts => $entry->[0],
log_level => $entry->[1],
message => $entry->[2],
($self->service ? (service => $self->service) : ()),
($self->data_ts ? (data_ts => $self->data_ts) : ()),
);
}
else {
# Got "stop" entry: [ undef, undef, undef ]
$oktorun = 0;
}
} # read log entries from queue
if ( scalar @log_entries ) {
eval {
$client->post(
link => $log_link,
resources => \@log_entries,
);
};
if ( my $e = $EVAL_ERROR ) {
warn "$e";
}
else {
@log_entries = ();
}
} # have log entries
if ( $oktorun ) {
sleep $self->queue_wait;
my $thread = threads::async {
my @log_entries;
my $oktorun = 1;
QUEUE:
while ( $oktorun ) {
my $max_log_entries = 1_000; # for each POST + backlog
while ( $self->_message_queue
&& $self->_message_queue->pending()
&& $max_log_entries--
&& (my $entry = $self->_message_queue->dequeue()) )
{
# $entry = [ ts, level, "message" ]
if ( defined $entry->[0] ) {
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
entry_ts => $entry->[0],
log_level => $entry->[1],
message => $entry->[2],
($self->service ? (service => $self->service) : ()),
($self->data_ts ? (data_ts => $self->data_ts) : ()),
);
}
} # QUEUE oktorun
if ( scalar @log_entries ) {
my $ts = ts(time, 0); # 0=local time
warn "$ts WARNING Failed to send these log entries (timestamps are UTC):\n";
foreach my $entry ( @log_entries ) {
warn sprintf("%s %s %s\n", $entry->[0], level_name($entry->[1]), $entry->[2]);
else {
# Got "stop" entry: [ undef, undef, undef ]
$oktorun = 0;
}
} # read log entries from queue
if ( scalar @log_entries ) {
eval {
$client->post(
link => $log_link,
resources => \@log_entries,
);
};
if ( my $e = $EVAL_ERROR ) {
warn "$e";
}
else {
@log_entries = ();
}
} # have log entries
if ( $oktorun ) {
sleep $self->queue_wait;
}
} # QUEUE oktorun
} # threads::async
);
if ( scalar @log_entries ) {
my $ts = ts(time, 0); # 0=local time
warn "$ts WARNING Failed to send these log entries (timestamps are UTC):\n";
foreach my $entry ( @log_entries ) {
warn sprintf("%s %s %s\n", $entry->[0], level_name($entry->[1]), $entry->[2]);
}
}
return;
}; # threads::async
$self->_thread($thread);
$self->online_logging(1);
@@ -228,11 +236,19 @@ sub _log {
chomp($msg);
my $ts = ts(time, 1); # 1=UTC
my $level_number = level_number($level);
my @event :shared = ($ts, $level_number, $msg);
$self->_message_queue->enqueue(\@event);
if ( !$self->online_logging ) {
if ( $self->online_logging ) {
foreach my $log_entry ( shift @{$self->_local_q} ) {
last unless defined $log_entry;
my @event :shared = (@$log_entry);
$self->_message_queue->enqueue(\@event);
}
my @event :shared = ($ts, $level_number, $msg);
$self->_message_queue->enqueue(\@event);
}
else {
push @{$self->_local_q}, [$ts, $level_number, $msg];
my $ts = ts(time, 0); # 0=local time
if ( $level_number >= 3 ) { # warning
print STDERR "$ts $level $msg\n";
@@ -245,13 +261,20 @@ sub _log {
return;
}
sub DESTROY {
sub stop_online_logging {
my $self = shift;
if ( $self->_thread && $self->_thread->is_running() ) {
my @stop :shared = (undef, undef);
$self->_message_queue->enqueue(\@stop); # stop the thread
$self->_thread->join();
}
$self->online_logging(0);
return;
}
sub DESTROY {
my $self = shift;
$self->stop_online_logging();
return;
}