diff --git a/bin/pt-agent b/bin/pt-agent index 1e003604..e62916ae 100755 --- a/bin/pt-agent +++ b/bin/pt-agent @@ -4822,29 +4822,16 @@ use Percona::WebAPI::Resource::LogEntry; Transformers->import(qw(ts)); -has 'client' => ( - is => 'rw', - isa => 'Object', - required => 0, -); - -has 'log_link' => ( - is => 'rw', - isa => 'Str', - required => 0, -); - has 'exit_status' => ( is => 'rw', isa => 'ScalarRef', required => 1, ); -has 'queue_wait' => ( - is => 'rw', +has 'pid' => ( + is => 'ro', isa => 'Int', - required => 0, - default => sub { return 3; }, + required => 1, ); has 'service' => ( @@ -4859,13 +4846,6 @@ has 'data_ts' => ( required => 0, ); -has '_local_q' => ( - is => 'rw', - isa => 'ArrayRef', - required => 0, - default => sub { return []; }, -); - has 'online_logging' => ( is => 'rw', isa => 'Bool', @@ -4873,13 +4853,20 @@ has 'online_logging' => ( default => sub { return 0 }, ); +has '_buffer' => ( + is => 'rw', + isa => 'ArrayRef', + required => 0, + default => sub { return []; }, +); + has '_pipe_write' => ( is => 'rw', isa => 'Maybe[FileHandle]', required => 0, ); -sub read_timeout { +sub read_stdin { my ( $t ) = @_; POSIX::sigaction( @@ -4891,7 +4878,6 @@ sub read_timeout { eval { alarm $t; while(defined(my $line = )) { - chomp $line; push @lines, $line; } push @lines, undef; # stop @@ -4905,11 +4891,11 @@ sub read_timeout { return \@lines; } - -sub enable_online_logging { +sub start_online_logging { my ($self, %args) = @_; - my $client = $args{client}; - my $log_link = $args{log_link}; + my $client = $args{client}; + my $log_link = $args{log_link}; + my $read_timeout = $args{read_timeout} || 3; my $pid = open(my $pipe_write, "|-"); @@ -4924,7 +4910,7 @@ sub enable_online_logging { my $oktorun = 1; QUEUE: while ($oktorun) { - my $lines = read_timeout($self->queue_wait); + my $lines = read_stdin($read_timeout); LINE: foreach my $line ( @$lines ) { if ( !defined $line ) { @@ -4933,8 +4919,10 @@ sub enable_online_logging { } my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s; + chomp $msg; push @log_entries, Percona::WebAPI::Resource::LogEntry->new( + pid => $self->pid, entry_ts => $ts, log_level => $level, message => $msg, @@ -4957,10 +4945,6 @@ sub enable_online_logging { @log_entries = (); } } # have log entries - - if ( $oktorun ) { - sleep $self->queue_wait; - } } # QUEUE oktorun if ( scalar @log_entries ) { @@ -5043,14 +5027,14 @@ sub _log { my $level_number = level_number($level); if ( $self->online_logging ) { - foreach my $log_entry ( shift @{$self->_local_q} ) { + foreach my $log_entry ( shift @{$self->_buffer} ) { last unless defined $log_entry; $self->_queue_log_entry(@$log_entry); } $self->_queue_log_entry($ts, $level_number, $msg); } else { - push @{$self->_local_q}, [$ts, $level_number, $msg]; + push @{$self->_buffer}, [$ts, $level_number, $msg]; my $ts = ts(time, 0); # 0=local time if ( $level_number >= 3 ) { # warning @@ -5080,6 +5064,11 @@ sub stop_online_logging { return; } +sub DESTROY { + my $self = shift; + $self->stop_online_logging(); + return; +} sub _d { my ($package, undef, $line) = caller 0; @@ -5281,6 +5270,7 @@ sub main { ); $logger = Percona::Agent::Logger->new( exit_status => \$exit_status, + pid => $PID, ); # ######################################################################## @@ -5460,9 +5450,6 @@ sub main { $logger->info("pt-agent exit $exit_status, oktorun $oktorun"); - $logger->info('Waiting for logging thread to stop...'); - $logger->stop_online_logging(); - return $exit_status; } @@ -5855,7 +5842,9 @@ These values can change if a different configuration is received. # simple info("Hello world!") to STDOUT won't block if the API isn't # responding. -- Both client and log_link are required to enable this. if ( $agent->links->{log} && $logger_client ) { - $logger->enable_online_logging( + $logger->info("Starting online logging. No more log entries will be printed here. " + . "Agent logs are accessible through the web interface."); + $logger->start_online_logging( client => $logger_client, log_link => $agent->links->{log}, ); @@ -7111,8 +7100,10 @@ sub send_data { $logger->info("Sending $service service data"); # Connect to Percona, get entry links. + my $entry_links; + my $logger_client; if ( !$client ) { - ($client) = get_api_client( + ($client, $entry_links, $logger_client) = get_api_client( api_key => $api_key, tries => 3, interval => sub { sleep 10 }, @@ -7139,6 +7130,31 @@ sub send_data { } } + $agent = eval { + $client->get( + link => $entry_links->{agents} . '/' . $agent->uuid, + ); + }; + if ( $EVAL_ERROR ) { + $logger->fatal("Failed to get the agent: $EVAL_ERROR"); + } + my $log_link = eval { + $client->get( + link => $agent->links->{log}, + ); + }; + if ( $EVAL_ERROR ) { + $logger->warning("Failed to get the agent log link: $EVAL_ERROR"); + } + else { + $logger->info("Starting online logging. No more log entries will be printed here. " + . "Agent logs are accessible through the web interface."); + $logger->start_online_logging( + client => $logger_client, + log_link => $log_link, + ); + } + # XXX # Load the Service object from local service JSON file. # $service changes from a string scalar to a Service object. diff --git a/lib/Percona/Agent/Logger.pm b/lib/Percona/Agent/Logger.pm index ab9fc42a..3341ce90 100644 --- a/lib/Percona/Agent/Logger.pm +++ b/lib/Percona/Agent/Logger.pm @@ -33,29 +33,16 @@ use Percona::WebAPI::Resource::LogEntry; Transformers->import(qw(ts)); -has 'client' => ( - is => 'rw', - isa => 'Object', - required => 0, -); - -has 'log_link' => ( - is => 'rw', - isa => 'Str', - required => 0, -); - has 'exit_status' => ( is => 'rw', isa => 'ScalarRef', required => 1, ); -has 'queue_wait' => ( - is => 'rw', +has 'pid' => ( + is => 'ro', isa => 'Int', - required => 0, - default => sub { return 3; }, + required => 1, ); has 'service' => ( @@ -70,13 +57,6 @@ has 'data_ts' => ( required => 0, ); -has '_local_q' => ( - is => 'rw', - isa => 'ArrayRef', - required => 0, - default => sub { return []; }, -); - has 'online_logging' => ( is => 'rw', isa => 'Bool', @@ -84,13 +64,20 @@ has 'online_logging' => ( default => sub { return 0 }, ); +has '_buffer' => ( + is => 'rw', + isa => 'ArrayRef', + required => 0, + default => sub { return []; }, +); + has '_pipe_write' => ( is => 'rw', isa => 'Maybe[FileHandle]', required => 0, ); -sub read_timeout { +sub read_stdin { my ( $t ) = @_; # Set the SIGALRM handler. @@ -103,7 +90,6 @@ sub read_timeout { eval { alarm $t; while(defined(my $line = )) { - chomp $line; push @lines, $line; } push @lines, undef; # stop @@ -117,15 +103,16 @@ sub read_timeout { return \@lines; } - -sub enable_online_logging { +sub start_online_logging { my ($self, %args) = @_; - my $client = $args{client}; - my $log_link = $args{log_link}; + my $client = $args{client}; + my $log_link = $args{log_link}; + my $read_timeout = $args{read_timeout} || 3; my $pid = open(my $pipe_write, "|-"); if ($pid) { + # parent select $pipe_write; $OUTPUT_AUTOFLUSH = 1; $self->_pipe_write($pipe_write); @@ -137,7 +124,7 @@ sub enable_online_logging { my $oktorun = 1; QUEUE: while ($oktorun) { - my $lines = read_timeout($self->queue_wait); + my $lines = read_stdin($read_timeout); LINE: foreach my $line ( @$lines ) { if ( !defined $line ) { @@ -147,8 +134,10 @@ sub enable_online_logging { # $line = ts,level,message my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s; + chomp $msg; push @log_entries, Percona::WebAPI::Resource::LogEntry->new( + pid => $self->pid, entry_ts => $ts, log_level => $level, message => $msg, @@ -171,10 +160,6 @@ sub enable_online_logging { @log_entries = (); } } # have log entries - - if ( $oktorun ) { - sleep $self->queue_wait; - } } # QUEUE oktorun if ( scalar @log_entries ) { @@ -258,14 +243,14 @@ sub _log { my $level_number = level_number($level); if ( $self->online_logging ) { - foreach my $log_entry ( shift @{$self->_local_q} ) { + foreach my $log_entry ( shift @{$self->_buffer} ) { last unless defined $log_entry; $self->_queue_log_entry(@$log_entry); } $self->_queue_log_entry($ts, $level_number, $msg); } else { - push @{$self->_local_q}, [$ts, $level_number, $msg]; + push @{$self->_buffer}, [$ts, $level_number, $msg]; my $ts = ts(time, 0); # 0=local time if ( $level_number >= 3 ) { # warning @@ -295,11 +280,11 @@ sub stop_online_logging { return; } -#sub DESTROY { -# my $self = shift; -# $self->stop_online_logging(); -# return; -#} +sub DESTROY { + my $self = shift; + $self->stop_online_logging(); + return; +} sub _d { my ($package, undef, $line) = caller 0; diff --git a/lib/Percona/WebAPI/Resource/LogEntry.pm b/lib/Percona/WebAPI/Resource/LogEntry.pm index f44cd30e..b686fcbc 100644 --- a/lib/Percona/WebAPI/Resource/LogEntry.pm +++ b/lib/Percona/WebAPI/Resource/LogEntry.pm @@ -22,6 +22,12 @@ package Percona::WebAPI::Resource::LogEntry; use Lmo; +has 'pid' => ( + is => 'ro', + isa => 'Int', + required => 1, +); + has 'service' => ( is => 'ro', isa => 'Str',