First implementation of online logging.

This commit is contained in:
Daniel Nichter
2013-05-17 09:32:32 -07:00
parent 883590d448
commit df6ba9f3a5
2 changed files with 138 additions and 52 deletions

View File

@@ -27,6 +27,7 @@ BEGIN {
Percona::WebAPI::Resource::Config Percona::WebAPI::Resource::Config
Percona::WebAPI::Resource::Service Percona::WebAPI::Resource::Service
Percona::WebAPI::Resource::Task Percona::WebAPI::Resource::Task
Percona::WebAPI::Resource::LogEntry
VersionCheck VersionCheck
DSNParser DSNParser
OptionParser OptionParser
@@ -1479,6 +1480,38 @@ no Lmo;
# End Percona::WebAPI::Resource::Task package # End Percona::WebAPI::Resource::Task package
# ########################################################################### # ###########################################################################
# ###########################################################################
# Percona::WebAPI::Resource::LogEntry package
# This package is a copy without comments from the original. The original
# with comments and its test file can be found in the Bazaar repository at,
# lib/Percona/WebAPI/Resource/LogEntry.pm
# t/lib/Percona/WebAPI/Resource/LogEntry.t
# See https://launchpad.net/percona-toolkit for more information.
# ###########################################################################
{
package Percona::WebAPI::Resource::LogEntry;
use Lmo;
has 'log_level' => (
is => 'ro',
isa => 'Int',
required => 1,
);
has 'message' => (
is => 'ro',
isa => 'Str',
required => 1,
);
no Lmo;
1;
}
# ###########################################################################
# End Percona::WebAPI::Resource::LogEntry package
# ###########################################################################
# ########################################################################### # ###########################################################################
# VersionCheck package # VersionCheck package
# This package is a copy without comments from the original. The original # This package is a copy without comments from the original. The original
@@ -4769,6 +4802,7 @@ use Thread::Queue;
use Lmo; use Lmo;
use Transformers; use Transformers;
use Percona::WebAPI::Resource::LogEntry;
Transformers->import(qw(ts)); Transformers->import(qw(ts));
@@ -4785,7 +4819,7 @@ has 'client' => (
required => 0, required => 0,
); );
has 'status_link' => ( has 'log_link' => (
is => 'rw', is => 'rw',
isa => 'Str', isa => 'Str',
required => 0, required => 0,
@@ -4812,27 +4846,40 @@ has '_thread' => (
sub BUILD { sub BUILD {
my $self = shift; my $self = shift;
if ( $self->client && $self->status_link ) { if ( $self->client && $self->log_link ) {
$self->_message_queue(Thread::Queue->new()); $self->_message_queue(Thread::Queue->new());
$self->_thread( $self->_thread(
threads::async { threads::async {
EVENT: my @log_entries;
while ( my $event = $self->_message_queue->dequeue() ) { my $oktorun = 1;
last unless defined $event; QUEUE:
my $status = { while ( $oktorun ) {
log_level => $event->[0], my $max_log_entries = 1_000; # for each POST + backlog
message => $event->[1], while ( $self->message_queue->pending()
}; && $max_log_entries--
&& (my $entry = $self->message_queue->dequeue()) )
{
$oktorun = 0 if !defined $entry;
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
log_level => $entry->[0],
message => $entry->[1],
);
}
if ( scalar @log_entries ) {
eval { eval {
$self->client->post( $self->client->post(
link => $self->status_link, link => $self->log_link,
resources => encode_json($status), resources => \@log_entries,
); );
}; };
if ( my $e = $EVAL_ERROR ) { if ( my $e = $EVAL_ERROR ) {
warn "$e"; warn "$e";
} }
} # EVENT else {
@log_entries = ();
}
} # have log entries
} # QUEUE
} # threads::async } # threads::async
); );
} }
@@ -4903,7 +4950,7 @@ sub _log {
else { else {
print "$ts $level $msg\n"; print "$ts $level $msg\n";
} }
if ( $self->client && $self->status_link ) { if ( $self->client && $self->log_link ) {
my @event :shared = ($level_number, $msg); my @event :shared = ($level_number, $msg);
$self->_message_queue->enqueue(\@event); $self->_message_queue->enqueue(\@event);
} }
@@ -5364,7 +5411,21 @@ sub get_api_client {
$interval->(); # failure, try again $interval->(); # failure, try again
} }
return $client, $entry_links; # Create another client for Percona::Agent::Logger. If the primary
# client was created, then the API key and entry link worked, so
# just duplicate them for the new logger client. We don't need to
# connect the logger client because clients are stateless so knowing
# the primary client connected ensures that the logger client can/will
# connect to with the same API and entry link.
my $logger_client;
if ( $client && $entry_links ) {
$logger_client = Percona::WebAPI::Client->new(
api_key => $api_key,
($entry_link ? (entry_link => $entry_link) : ()),
);
}
return $client, $entry_links, $logger_client;
} }
sub load_local_agent { sub load_local_agent {
@@ -5554,6 +5615,7 @@ sub start_agent {
my $versions = $args{versions}; # for testing my $versions = $args{versions}; # for testing
my $client = $args{client}; # for testing my $client = $args{client}; # for testing
my $entry_links = $args{entry_links}; # for testing my $entry_links = $args{entry_links}; # for testing
my $logger_client = $args{logger_client}; # for testing
# Daemonize first so all output goes to the --log. # Daemonize first so all output goes to the --log.
my $daemon; my $daemon;
@@ -5600,10 +5662,10 @@ These values can change if a different configuration is received.
. "to use a writeable --lib directory."); . "to use a writeable --lib directory.");
} }
# Connect to https://api.pws.percona.com and get entry links. # Connect to the Percona Web API and get entry links.
# Don't return until successful. # Don't return until successful.
if ( !$client || !$entry_links ) { if ( !$client || !$entry_links ) {
($client, $entry_links) = get_api_client( ($client, $entry_links, $logger_client) = get_api_client(
api_key => $api_key, api_key => $api_key,
tries => undef, # forever tries => undef, # forever
interval => sub { sleep 60 }, interval => sub { sleep 60 },
@@ -5671,6 +5733,17 @@ These values can change if a different configuration is received.
actions => $actions, # Agent.actions actions => $actions, # Agent.actions
); );
# Give the logger its client so that it will also POST every log entry
# to /agent/{uuid}/log. This is done asynchronously by a thread so a
# simple info("Hello world!") to STDOUT won't block if the API isn't
# responding. -- Both client and log_link are required to enable this.
if ( $agent->links->{log} && $logger_client ) {
$logger->client($logger_client);
$logger->log_link($agent->links->{log});
$logger->info("Online logging enabled");
}
save_agent( save_agent(
agent => $agent, agent => $agent,
lib_dir => $lib_dir, lib_dir => $lib_dir,

View File

@@ -31,6 +31,7 @@ use Thread::Queue;
use Lmo; use Lmo;
use Transformers; use Transformers;
use Percona::WebAPI::Resource::LogEntry;
Transformers->import(qw(ts)); Transformers->import(qw(ts));
@@ -47,7 +48,7 @@ has 'client' => (
required => 0, required => 0,
); );
has 'status_link' => ( has 'log_link' => (
is => 'rw', is => 'rw',
isa => 'Str', isa => 'Str',
required => 0, required => 0,
@@ -74,29 +75,41 @@ has '_thread' => (
sub BUILD { sub BUILD {
my $self = shift; my $self = shift;
if ( $self->client && $self->status_link ) { if ( $self->client && $self->log_link ) {
$self->_message_queue(Thread::Queue->new()); $self->_message_queue(Thread::Queue->new());
$self->_thread( $self->_thread(
threads::async { threads::async {
EVENT: my @log_entries;
while ( my $event = $self->_message_queue->dequeue() ) { my $oktorun = 1;
last unless defined $event; QUEUE:
while ( $oktorun ) {
my $max_log_entries = 1_000; # for each POST + backlog
while ( $self->message_queue->pending()
&& $max_log_entries--
&& (my $entry = $self->message_queue->dequeue()) )
{
$oktorun = 0 if !defined $entry;
# $event = [ level, "message" ] # $event = [ level, "message" ]
my $status = { push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
log_level => $event->[0], log_level => $entry->[0],
message => $event->[1], message => $entry->[1],
}; );
}
if ( scalar @log_entries ) {
eval { eval {
$self->client->post( $self->client->post(
link => $self->status_link, link => $self->log_link,
resources => encode_json($status), resources => \@log_entries,
); );
}; };
if ( my $e = $EVAL_ERROR ) { if ( my $e = $EVAL_ERROR ) {
warn "$e"; warn "$e";
# TODO: a queue for failed messages?
} }
} # EVENT else {
@log_entries = ();
}
} # have log entries
} # QUEUE
} # threads::async } # threads::async
); );
} }
@@ -168,7 +181,7 @@ sub _log {
else { else {
print "$ts $level $msg\n"; print "$ts $level $msg\n";
} }
if ( $self->client && $self->status_link ) { if ( $self->client && $self->log_link ) {
my @event :shared = ($level_number, $msg); my @event :shared = ($level_number, $msg);
$self->_message_queue->enqueue(\@event); $self->_message_queue->enqueue(\@event);
} }