Always build message queue, don't dequeue until it exists though.

This commit is contained in:
Daniel Nichter
2013-05-17 09:45:09 -07:00
parent e2aab02bee
commit ea481471c0
2 changed files with 73 additions and 73 deletions

View File

@@ -4846,44 +4846,44 @@ has '_thread' => (
sub BUILD { sub BUILD {
my $self = shift; my $self = shift;
if ( $self->client && $self->log_link ) { $self->_message_queue(Thread::Queue->new());
$self->_message_queue(Thread::Queue->new());
$self->_thread( $self->_thread(
threads::async { threads::async {
my @log_entries; my @log_entries;
my $oktorun = 1; my $oktorun = 1;
QUEUE: QUEUE:
while ( $oktorun ) { while ( $oktorun ) {
my $max_log_entries = 1_000; # for each POST + backlog my $max_log_entries = 1_000; # for each POST + backlog
while ( $self->message_queue->pending() while ( $self->_message_queue
&& $max_log_entries-- && $self->_message_queue->pending()
&& (my $entry = $self->message_queue->dequeue()) ) && $max_log_entries--
{ && (my $entry = $self->message_queue->dequeue()) )
$oktorun = 0 if !defined $entry; {
push @log_entries, Percona::WebAPI::Resource::LogEntry->new( $oktorun = 0 if !defined $entry;
log_level => $entry->[0], push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
message => $entry->[1], log_level => $entry->[0],
message => $entry->[1],
);
}
if ( scalar @log_entries ) {
eval {
$self->client->post(
link => $self->log_link,
resources => \@log_entries,
); );
};
if ( my $e = $EVAL_ERROR ) {
warn "$e";
} }
if ( scalar @log_entries ) { else {
eval { @log_entries = ();
$self->client->post( }
link => $self->log_link, } # have log entries
resources => \@log_entries, sleep ($self->_message_queue ? 3 : 5);
); } # QUEUE
}; } # threads::async
if ( my $e = $EVAL_ERROR ) { );
warn "$e";
}
else {
@log_entries = ();
}
} # have log entries
sleep 3;
} # QUEUE
} # threads::async
);
}
return; return;
} }

View File

@@ -75,45 +75,45 @@ has '_thread' => (
sub BUILD { sub BUILD {
my $self = shift; my $self = shift;
if ( $self->client && $self->log_link ) { $self->_message_queue(Thread::Queue->new());
$self->_message_queue(Thread::Queue->new());
$self->_thread( $self->_thread(
threads::async { threads::async {
my @log_entries; my @log_entries;
my $oktorun = 1; my $oktorun = 1;
QUEUE: QUEUE:
while ( $oktorun ) { while ( $oktorun ) {
my $max_log_entries = 1_000; # for each POST + backlog my $max_log_entries = 1_000; # for each POST + backlog
while ( $self->message_queue->pending() while ( $self->_message_queue
&& $max_log_entries-- && $self->_message_queue->pending()
&& (my $entry = $self->message_queue->dequeue()) ) && $max_log_entries--
{ && (my $entry = $self->message_queue->dequeue()) )
$oktorun = 0 if !defined $entry; {
# $event = [ level, "message" ] $oktorun = 0 if !defined $entry;
push @log_entries, Percona::WebAPI::Resource::LogEntry->new( # $event = [ level, "message" ]
log_level => $entry->[0], push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
message => $entry->[1], log_level => $entry->[0],
message => $entry->[1],
);
}
if ( scalar @log_entries ) {
eval {
$self->client->post(
link => $self->log_link,
resources => \@log_entries,
); );
};
if ( my $e = $EVAL_ERROR ) {
warn "$e";
} }
if ( scalar @log_entries ) { else {
eval { @log_entries = ();
$self->client->post( }
link => $self->log_link, } # have log entries
resources => \@log_entries, sleep ($self->_message_queue ? 3 : 5);
); } # QUEUE
}; } # threads::async
if ( my $e = $EVAL_ERROR ) { );
warn "$e";
}
else {
@log_entries = ();
}
} # have log entries
sleep 3;
} # QUEUE
} # threads::async
);
}
return; return;
} }