From df6ba9f3a5f3c549fb0cb7e7f107249811827d96 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Fri, 17 May 2013 09:32:32 -0700 Subject: [PATCH] First implementation of online logging. --- bin/pt-agent | 137 +++++++++++++++++++++++++++--------- lib/Percona/Agent/Logger.pm | 53 ++++++++------ 2 files changed, 138 insertions(+), 52 deletions(-) diff --git a/bin/pt-agent b/bin/pt-agent index 8e76ac1a..c3dd5fd0 100755 --- a/bin/pt-agent +++ b/bin/pt-agent @@ -27,6 +27,7 @@ BEGIN { Percona::WebAPI::Resource::Config Percona::WebAPI::Resource::Service Percona::WebAPI::Resource::Task + Percona::WebAPI::Resource::LogEntry VersionCheck DSNParser OptionParser @@ -1479,6 +1480,38 @@ no Lmo; # End Percona::WebAPI::Resource::Task package # ########################################################################### +# ########################################################################### +# Percona::WebAPI::Resource::LogEntry package +# This package is a copy without comments from the original. The original +# with comments and its test file can be found in the Bazaar repository at, +# lib/Percona/WebAPI/Resource/LogEntry.pm +# t/lib/Percona/WebAPI/Resource/LogEntry.t +# See https://launchpad.net/percona-toolkit for more information. +# ########################################################################### +{ +package Percona::WebAPI::Resource::LogEntry; + +use Lmo; + +has 'log_level' => ( + is => 'ro', + isa => 'Int', + required => 1, +); + +has 'message' => ( + is => 'ro', + isa => 'Str', + required => 1, +); + +no Lmo; +1; +} +# ########################################################################### +# End Percona::WebAPI::Resource::LogEntry package +# ########################################################################### + # ########################################################################### # VersionCheck package # This package is a copy without comments from the original. The original @@ -4769,6 +4802,7 @@ use Thread::Queue; use Lmo; use Transformers; +use Percona::WebAPI::Resource::LogEntry; Transformers->import(qw(ts)); @@ -4785,7 +4819,7 @@ has 'client' => ( required => 0, ); -has 'status_link' => ( +has 'log_link' => ( is => 'rw', isa => 'Str', required => 0, @@ -4812,27 +4846,40 @@ has '_thread' => ( sub BUILD { my $self = shift; - if ( $self->client && $self->status_link ) { + if ( $self->client && $self->log_link ) { $self->_message_queue(Thread::Queue->new()); $self->_thread( threads::async { - EVENT: - while ( my $event = $self->_message_queue->dequeue() ) { - last unless defined $event; - my $status = { - log_level => $event->[0], - message => $event->[1], - }; - eval { - $self->client->post( - link => $self->status_link, - resources => encode_json($status), + my @log_entries; + my $oktorun = 1; + QUEUE: + while ( $oktorun ) { + my $max_log_entries = 1_000; # for each POST + backlog + while ( $self->message_queue->pending() + && $max_log_entries-- + && (my $entry = $self->message_queue->dequeue()) ) + { + $oktorun = 0 if !defined $entry; + push @log_entries, Percona::WebAPI::Resource::LogEntry->new( + log_level => $entry->[0], + message => $entry->[1], ); - }; - if ( my $e = $EVAL_ERROR ) { - warn "$e"; } - } # EVENT + if ( scalar @log_entries ) { + eval { + $self->client->post( + link => $self->log_link, + resources => \@log_entries, + ); + }; + if ( my $e = $EVAL_ERROR ) { + warn "$e"; + } + else { + @log_entries = (); + } + } # have log entries + } # QUEUE } # threads::async ); } @@ -4903,7 +4950,7 @@ sub _log { else { print "$ts $level $msg\n"; } - if ( $self->client && $self->status_link ) { + if ( $self->client && $self->log_link ) { my @event :shared = ($level_number, $msg); $self->_message_queue->enqueue(\@event); } @@ -5364,7 +5411,21 @@ sub get_api_client { $interval->(); # failure, try again } - return $client, $entry_links; + # Create another client for Percona::Agent::Logger. If the primary + # client was created, then the API key and entry link worked, so + # just duplicate them for the new logger client. We don't need to + # connect the logger client because clients are stateless so knowing + # the primary client connected ensures that the logger client can/will + # connect to with the same API and entry link. + my $logger_client; + if ( $client && $entry_links ) { + $logger_client = Percona::WebAPI::Client->new( + api_key => $api_key, + ($entry_link ? (entry_link => $entry_link) : ()), + ); + } + + return $client, $entry_links, $logger_client; } sub load_local_agent { @@ -5543,17 +5604,18 @@ sub start_agent { my $cxn = $args{Cxn}; # Optional args - my $agent_uuid = $args{agent_uuid}; - my $daemonize = $args{daemonize}; - my $pid_file = $args{pid_file}; - my $log_file = $args{log_file}; - my $_oktorun = $args{oktorun} || sub { return $oktorun }; - my $tries = $args{tries}; - my $interval = $args{interval} || sub { sleep 60; }; - my $actions = $args{actions}; - my $versions = $args{versions}; # for testing - my $client = $args{client}; # for testing - my $entry_links = $args{entry_links}; # for testing + my $agent_uuid = $args{agent_uuid}; + my $daemonize = $args{daemonize}; + my $pid_file = $args{pid_file}; + my $log_file = $args{log_file}; + my $_oktorun = $args{oktorun} || sub { return $oktorun }; + my $tries = $args{tries}; + my $interval = $args{interval} || sub { sleep 60; }; + my $actions = $args{actions}; + my $versions = $args{versions}; # for testing + my $client = $args{client}; # for testing + my $entry_links = $args{entry_links}; # for testing + my $logger_client = $args{logger_client}; # for testing # Daemonize first so all output goes to the --log. my $daemon; @@ -5600,10 +5662,10 @@ These values can change if a different configuration is received. . "to use a writeable --lib directory."); } - # Connect to https://api.pws.percona.com and get entry links. + # Connect to the Percona Web API and get entry links. # Don't return until successful. if ( !$client || !$entry_links ) { - ($client, $entry_links) = get_api_client( + ($client, $entry_links, $logger_client) = get_api_client( api_key => $api_key, tries => undef, # forever interval => sub { sleep 60 }, @@ -5671,6 +5733,17 @@ These values can change if a different configuration is received. actions => $actions, # Agent.actions ); + # Give the logger its client so that it will also POST every log entry + # to /agent/{uuid}/log. This is done asynchronously by a thread so a + # simple info("Hello world!") to STDOUT won't block if the API isn't + # responding. -- Both client and log_link are required to enable this. + if ( $agent->links->{log} && $logger_client ) { + $logger->client($logger_client); + $logger->log_link($agent->links->{log}); + + $logger->info("Online logging enabled"); + } + save_agent( agent => $agent, lib_dir => $lib_dir, diff --git a/lib/Percona/Agent/Logger.pm b/lib/Percona/Agent/Logger.pm index 461998e3..9e8e0d97 100644 --- a/lib/Percona/Agent/Logger.pm +++ b/lib/Percona/Agent/Logger.pm @@ -31,6 +31,7 @@ use Thread::Queue; use Lmo; use Transformers; +use Percona::WebAPI::Resource::LogEntry; Transformers->import(qw(ts)); @@ -47,7 +48,7 @@ has 'client' => ( required => 0, ); -has 'status_link' => ( +has 'log_link' => ( is => 'rw', isa => 'Str', required => 0, @@ -74,29 +75,41 @@ has '_thread' => ( sub BUILD { my $self = shift; - if ( $self->client && $self->status_link ) { + if ( $self->client && $self->log_link ) { $self->_message_queue(Thread::Queue->new()); $self->_thread( threads::async { - EVENT: - while ( my $event = $self->_message_queue->dequeue() ) { - last unless defined $event; - # $event = [ level, "message" ] - my $status = { - log_level => $event->[0], - message => $event->[1], - }; - eval { - $self->client->post( - link => $self->status_link, - resources => encode_json($status), + my @log_entries; + my $oktorun = 1; + QUEUE: + while ( $oktorun ) { + my $max_log_entries = 1_000; # for each POST + backlog + while ( $self->message_queue->pending() + && $max_log_entries-- + && (my $entry = $self->message_queue->dequeue()) ) + { + $oktorun = 0 if !defined $entry; + # $event = [ level, "message" ] + push @log_entries, Percona::WebAPI::Resource::LogEntry->new( + log_level => $entry->[0], + message => $entry->[1], ); - }; - if ( my $e = $EVAL_ERROR ) { - warn "$e"; - # TODO: a queue for failed messages? } - } # EVENT + if ( scalar @log_entries ) { + eval { + $self->client->post( + link => $self->log_link, + resources => \@log_entries, + ); + }; + if ( my $e = $EVAL_ERROR ) { + warn "$e"; + } + else { + @log_entries = (); + } + } # have log entries + } # QUEUE } # threads::async ); } @@ -168,7 +181,7 @@ sub _log { else { print "$ts $level $msg\n"; } - if ( $self->client && $self->status_link ) { + if ( $self->client && $self->log_link ) { my @event :shared = ($level_number, $msg); $self->_message_queue->enqueue(\@event); }