From ea481471c00dcf33143c0fc1aa25f00ee74cbae1 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Fri, 17 May 2013 09:45:09 -0700 Subject: [PATCH] Always build message queue, don't dequeue until it exists though. --- bin/pt-agent | 72 ++++++++++++++++++------------------ lib/Percona/Agent/Logger.pm | 74 ++++++++++++++++++------------------- 2 files changed, 73 insertions(+), 73 deletions(-) diff --git a/bin/pt-agent b/bin/pt-agent index 7c6b1314..bf1bf62f 100755 --- a/bin/pt-agent +++ b/bin/pt-agent @@ -4846,44 +4846,44 @@ has '_thread' => ( sub BUILD { my $self = shift; - if ( $self->client && $self->log_link ) { - $self->_message_queue(Thread::Queue->new()); - $self->_thread( - threads::async { - my @log_entries; - my $oktorun = 1; - QUEUE: - while ( $oktorun ) { - my $max_log_entries = 1_000; # for each POST + backlog - while ( $self->message_queue->pending() - && $max_log_entries-- - && (my $entry = $self->message_queue->dequeue()) ) - { - $oktorun = 0 if !defined $entry; - push @log_entries, Percona::WebAPI::Resource::LogEntry->new( - log_level => $entry->[0], - message => $entry->[1], + $self->_message_queue(Thread::Queue->new()); + + $self->_thread( + threads::async { + my @log_entries; + my $oktorun = 1; + QUEUE: + while ( $oktorun ) { + my $max_log_entries = 1_000; # for each POST + backlog + while ( $self->_message_queue + && $self->_message_queue->pending() + && $max_log_entries-- + && (my $entry = $self->message_queue->dequeue()) ) + { + $oktorun = 0 if !defined $entry; + push @log_entries, Percona::WebAPI::Resource::LogEntry->new( + log_level => $entry->[0], + message => $entry->[1], + ); + } + if ( scalar @log_entries ) { + eval { + $self->client->post( + link => $self->log_link, + resources => \@log_entries, ); + }; + if ( my $e = $EVAL_ERROR ) { + warn "$e"; } - if ( scalar @log_entries ) { - eval { - $self->client->post( - link => $self->log_link, - resources => \@log_entries, - ); - }; - if ( my $e = $EVAL_ERROR ) { - warn "$e"; - } - else { - @log_entries = (); - } - } # have log entries - sleep 3; - } # QUEUE - } # threads::async - ); - } + else { + @log_entries = (); + } + } # have log entries + sleep ($self->_message_queue ? 3 : 5); + } # QUEUE + } # threads::async + ); return; } diff --git a/lib/Percona/Agent/Logger.pm b/lib/Percona/Agent/Logger.pm index 8a1674b1..ff4eae48 100644 --- a/lib/Percona/Agent/Logger.pm +++ b/lib/Percona/Agent/Logger.pm @@ -75,45 +75,45 @@ has '_thread' => ( sub BUILD { my $self = shift; - if ( $self->client && $self->log_link ) { - $self->_message_queue(Thread::Queue->new()); - $self->_thread( - threads::async { - my @log_entries; - my $oktorun = 1; - QUEUE: - while ( $oktorun ) { - my $max_log_entries = 1_000; # for each POST + backlog - while ( $self->message_queue->pending() - && $max_log_entries-- - && (my $entry = $self->message_queue->dequeue()) ) - { - $oktorun = 0 if !defined $entry; - # $event = [ level, "message" ] - push @log_entries, Percona::WebAPI::Resource::LogEntry->new( - log_level => $entry->[0], - message => $entry->[1], + $self->_message_queue(Thread::Queue->new()); + + $self->_thread( + threads::async { + my @log_entries; + my $oktorun = 1; + QUEUE: + while ( $oktorun ) { + my $max_log_entries = 1_000; # for each POST + backlog + while ( $self->_message_queue + && $self->_message_queue->pending() + && $max_log_entries-- + && (my $entry = $self->message_queue->dequeue()) ) + { + $oktorun = 0 if !defined $entry; + # $event = [ level, "message" ] + push @log_entries, Percona::WebAPI::Resource::LogEntry->new( + log_level => $entry->[0], + message => $entry->[1], + ); + } + if ( scalar @log_entries ) { + eval { + $self->client->post( + link => $self->log_link, + resources => \@log_entries, ); + }; + if ( my $e = $EVAL_ERROR ) { + warn "$e"; } - if ( scalar @log_entries ) { - eval { - $self->client->post( - link => $self->log_link, - resources => \@log_entries, - ); - }; - if ( my $e = $EVAL_ERROR ) { - warn "$e"; - } - else { - @log_entries = (); - } - } # have log entries - sleep 3; - } # QUEUE - } # threads::async - ); - } + else { + @log_entries = (); + } + } # have log entries + sleep ($self->_message_queue ? 3 : 5); + } # QUEUE + } # threads::async + ); return; }