mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-10 21:19:59 +00:00
Add pid to LogEntry. Use online logging for --send-data.
This commit is contained in:
94
bin/pt-agent
94
bin/pt-agent
@@ -4822,29 +4822,16 @@ use Percona::WebAPI::Resource::LogEntry;
|
|||||||
|
|
||||||
Transformers->import(qw(ts));
|
Transformers->import(qw(ts));
|
||||||
|
|
||||||
has 'client' => (
|
|
||||||
is => 'rw',
|
|
||||||
isa => 'Object',
|
|
||||||
required => 0,
|
|
||||||
);
|
|
||||||
|
|
||||||
has 'log_link' => (
|
|
||||||
is => 'rw',
|
|
||||||
isa => 'Str',
|
|
||||||
required => 0,
|
|
||||||
);
|
|
||||||
|
|
||||||
has 'exit_status' => (
|
has 'exit_status' => (
|
||||||
is => 'rw',
|
is => 'rw',
|
||||||
isa => 'ScalarRef',
|
isa => 'ScalarRef',
|
||||||
required => 1,
|
required => 1,
|
||||||
);
|
);
|
||||||
|
|
||||||
has 'queue_wait' => (
|
has 'pid' => (
|
||||||
is => 'rw',
|
is => 'ro',
|
||||||
isa => 'Int',
|
isa => 'Int',
|
||||||
required => 0,
|
required => 1,
|
||||||
default => sub { return 3; },
|
|
||||||
);
|
);
|
||||||
|
|
||||||
has 'service' => (
|
has 'service' => (
|
||||||
@@ -4859,13 +4846,6 @@ has 'data_ts' => (
|
|||||||
required => 0,
|
required => 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
has '_local_q' => (
|
|
||||||
is => 'rw',
|
|
||||||
isa => 'ArrayRef',
|
|
||||||
required => 0,
|
|
||||||
default => sub { return []; },
|
|
||||||
);
|
|
||||||
|
|
||||||
has 'online_logging' => (
|
has 'online_logging' => (
|
||||||
is => 'rw',
|
is => 'rw',
|
||||||
isa => 'Bool',
|
isa => 'Bool',
|
||||||
@@ -4873,13 +4853,20 @@ has 'online_logging' => (
|
|||||||
default => sub { return 0 },
|
default => sub { return 0 },
|
||||||
);
|
);
|
||||||
|
|
||||||
|
has '_buffer' => (
|
||||||
|
is => 'rw',
|
||||||
|
isa => 'ArrayRef',
|
||||||
|
required => 0,
|
||||||
|
default => sub { return []; },
|
||||||
|
);
|
||||||
|
|
||||||
has '_pipe_write' => (
|
has '_pipe_write' => (
|
||||||
is => 'rw',
|
is => 'rw',
|
||||||
isa => 'Maybe[FileHandle]',
|
isa => 'Maybe[FileHandle]',
|
||||||
required => 0,
|
required => 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
sub read_timeout {
|
sub read_stdin {
|
||||||
my ( $t ) = @_;
|
my ( $t ) = @_;
|
||||||
|
|
||||||
POSIX::sigaction(
|
POSIX::sigaction(
|
||||||
@@ -4891,7 +4878,6 @@ sub read_timeout {
|
|||||||
eval {
|
eval {
|
||||||
alarm $t;
|
alarm $t;
|
||||||
while(defined(my $line = <STDIN>)) {
|
while(defined(my $line = <STDIN>)) {
|
||||||
chomp $line;
|
|
||||||
push @lines, $line;
|
push @lines, $line;
|
||||||
}
|
}
|
||||||
push @lines, undef; # stop
|
push @lines, undef; # stop
|
||||||
@@ -4905,11 +4891,11 @@ sub read_timeout {
|
|||||||
return \@lines;
|
return \@lines;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub start_online_logging {
|
||||||
sub enable_online_logging {
|
|
||||||
my ($self, %args) = @_;
|
my ($self, %args) = @_;
|
||||||
my $client = $args{client};
|
my $client = $args{client};
|
||||||
my $log_link = $args{log_link};
|
my $log_link = $args{log_link};
|
||||||
|
my $read_timeout = $args{read_timeout} || 3;
|
||||||
|
|
||||||
my $pid = open(my $pipe_write, "|-");
|
my $pid = open(my $pipe_write, "|-");
|
||||||
|
|
||||||
@@ -4924,7 +4910,7 @@ sub enable_online_logging {
|
|||||||
my $oktorun = 1;
|
my $oktorun = 1;
|
||||||
QUEUE:
|
QUEUE:
|
||||||
while ($oktorun) {
|
while ($oktorun) {
|
||||||
my $lines = read_timeout($self->queue_wait);
|
my $lines = read_stdin($read_timeout);
|
||||||
LINE:
|
LINE:
|
||||||
foreach my $line ( @$lines ) {
|
foreach my $line ( @$lines ) {
|
||||||
if ( !defined $line ) {
|
if ( !defined $line ) {
|
||||||
@@ -4933,8 +4919,10 @@ sub enable_online_logging {
|
|||||||
}
|
}
|
||||||
|
|
||||||
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
||||||
|
chomp $msg;
|
||||||
|
|
||||||
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
|
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
|
||||||
|
pid => $self->pid,
|
||||||
entry_ts => $ts,
|
entry_ts => $ts,
|
||||||
log_level => $level,
|
log_level => $level,
|
||||||
message => $msg,
|
message => $msg,
|
||||||
@@ -4957,10 +4945,6 @@ sub enable_online_logging {
|
|||||||
@log_entries = ();
|
@log_entries = ();
|
||||||
}
|
}
|
||||||
} # have log entries
|
} # have log entries
|
||||||
|
|
||||||
if ( $oktorun ) {
|
|
||||||
sleep $self->queue_wait;
|
|
||||||
}
|
|
||||||
} # QUEUE oktorun
|
} # QUEUE oktorun
|
||||||
|
|
||||||
if ( scalar @log_entries ) {
|
if ( scalar @log_entries ) {
|
||||||
@@ -5043,14 +5027,14 @@ sub _log {
|
|||||||
my $level_number = level_number($level);
|
my $level_number = level_number($level);
|
||||||
|
|
||||||
if ( $self->online_logging ) {
|
if ( $self->online_logging ) {
|
||||||
foreach my $log_entry ( shift @{$self->_local_q} ) {
|
foreach my $log_entry ( shift @{$self->_buffer} ) {
|
||||||
last unless defined $log_entry;
|
last unless defined $log_entry;
|
||||||
$self->_queue_log_entry(@$log_entry);
|
$self->_queue_log_entry(@$log_entry);
|
||||||
}
|
}
|
||||||
$self->_queue_log_entry($ts, $level_number, $msg);
|
$self->_queue_log_entry($ts, $level_number, $msg);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
push @{$self->_local_q}, [$ts, $level_number, $msg];
|
push @{$self->_buffer}, [$ts, $level_number, $msg];
|
||||||
|
|
||||||
my $ts = ts(time, 0); # 0=local time
|
my $ts = ts(time, 0); # 0=local time
|
||||||
if ( $level_number >= 3 ) { # warning
|
if ( $level_number >= 3 ) { # warning
|
||||||
@@ -5080,6 +5064,11 @@ sub stop_online_logging {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub DESTROY {
|
||||||
|
my $self = shift;
|
||||||
|
$self->stop_online_logging();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
sub _d {
|
sub _d {
|
||||||
my ($package, undef, $line) = caller 0;
|
my ($package, undef, $line) = caller 0;
|
||||||
@@ -5281,6 +5270,7 @@ sub main {
|
|||||||
);
|
);
|
||||||
$logger = Percona::Agent::Logger->new(
|
$logger = Percona::Agent::Logger->new(
|
||||||
exit_status => \$exit_status,
|
exit_status => \$exit_status,
|
||||||
|
pid => $PID,
|
||||||
);
|
);
|
||||||
|
|
||||||
# ########################################################################
|
# ########################################################################
|
||||||
@@ -5460,9 +5450,6 @@ sub main {
|
|||||||
|
|
||||||
$logger->info("pt-agent exit $exit_status, oktorun $oktorun");
|
$logger->info("pt-agent exit $exit_status, oktorun $oktorun");
|
||||||
|
|
||||||
$logger->info('Waiting for logging thread to stop...');
|
|
||||||
$logger->stop_online_logging();
|
|
||||||
|
|
||||||
return $exit_status;
|
return $exit_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5855,7 +5842,9 @@ These values can change if a different configuration is received.
|
|||||||
# simple info("Hello world!") to STDOUT won't block if the API isn't
|
# simple info("Hello world!") to STDOUT won't block if the API isn't
|
||||||
# responding. -- Both client and log_link are required to enable this.
|
# responding. -- Both client and log_link are required to enable this.
|
||||||
if ( $agent->links->{log} && $logger_client ) {
|
if ( $agent->links->{log} && $logger_client ) {
|
||||||
$logger->enable_online_logging(
|
$logger->info("Starting online logging. No more log entries will be printed here. "
|
||||||
|
. "Agent logs are accessible through the web interface.");
|
||||||
|
$logger->start_online_logging(
|
||||||
client => $logger_client,
|
client => $logger_client,
|
||||||
log_link => $agent->links->{log},
|
log_link => $agent->links->{log},
|
||||||
);
|
);
|
||||||
@@ -7111,8 +7100,10 @@ sub send_data {
|
|||||||
$logger->info("Sending $service service data");
|
$logger->info("Sending $service service data");
|
||||||
|
|
||||||
# Connect to Percona, get entry links.
|
# Connect to Percona, get entry links.
|
||||||
|
my $entry_links;
|
||||||
|
my $logger_client;
|
||||||
if ( !$client ) {
|
if ( !$client ) {
|
||||||
($client) = get_api_client(
|
($client, $entry_links, $logger_client) = get_api_client(
|
||||||
api_key => $api_key,
|
api_key => $api_key,
|
||||||
tries => 3,
|
tries => 3,
|
||||||
interval => sub { sleep 10 },
|
interval => sub { sleep 10 },
|
||||||
@@ -7139,6 +7130,31 @@ sub send_data {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$agent = eval {
|
||||||
|
$client->get(
|
||||||
|
link => $entry_links->{agents} . '/' . $agent->uuid,
|
||||||
|
);
|
||||||
|
};
|
||||||
|
if ( $EVAL_ERROR ) {
|
||||||
|
$logger->fatal("Failed to get the agent: $EVAL_ERROR");
|
||||||
|
}
|
||||||
|
my $log_link = eval {
|
||||||
|
$client->get(
|
||||||
|
link => $agent->links->{log},
|
||||||
|
);
|
||||||
|
};
|
||||||
|
if ( $EVAL_ERROR ) {
|
||||||
|
$logger->warning("Failed to get the agent log link: $EVAL_ERROR");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
$logger->info("Starting online logging. No more log entries will be printed here. "
|
||||||
|
. "Agent logs are accessible through the web interface.");
|
||||||
|
$logger->start_online_logging(
|
||||||
|
client => $logger_client,
|
||||||
|
log_link => $log_link,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
# XXX
|
# XXX
|
||||||
# Load the Service object from local service JSON file.
|
# Load the Service object from local service JSON file.
|
||||||
# $service changes from a string scalar to a Service object.
|
# $service changes from a string scalar to a Service object.
|
||||||
|
@@ -33,29 +33,16 @@ use Percona::WebAPI::Resource::LogEntry;
|
|||||||
|
|
||||||
Transformers->import(qw(ts));
|
Transformers->import(qw(ts));
|
||||||
|
|
||||||
has 'client' => (
|
|
||||||
is => 'rw',
|
|
||||||
isa => 'Object',
|
|
||||||
required => 0,
|
|
||||||
);
|
|
||||||
|
|
||||||
has 'log_link' => (
|
|
||||||
is => 'rw',
|
|
||||||
isa => 'Str',
|
|
||||||
required => 0,
|
|
||||||
);
|
|
||||||
|
|
||||||
has 'exit_status' => (
|
has 'exit_status' => (
|
||||||
is => 'rw',
|
is => 'rw',
|
||||||
isa => 'ScalarRef',
|
isa => 'ScalarRef',
|
||||||
required => 1,
|
required => 1,
|
||||||
);
|
);
|
||||||
|
|
||||||
has 'queue_wait' => (
|
has 'pid' => (
|
||||||
is => 'rw',
|
is => 'ro',
|
||||||
isa => 'Int',
|
isa => 'Int',
|
||||||
required => 0,
|
required => 1,
|
||||||
default => sub { return 3; },
|
|
||||||
);
|
);
|
||||||
|
|
||||||
has 'service' => (
|
has 'service' => (
|
||||||
@@ -70,13 +57,6 @@ has 'data_ts' => (
|
|||||||
required => 0,
|
required => 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
has '_local_q' => (
|
|
||||||
is => 'rw',
|
|
||||||
isa => 'ArrayRef',
|
|
||||||
required => 0,
|
|
||||||
default => sub { return []; },
|
|
||||||
);
|
|
||||||
|
|
||||||
has 'online_logging' => (
|
has 'online_logging' => (
|
||||||
is => 'rw',
|
is => 'rw',
|
||||||
isa => 'Bool',
|
isa => 'Bool',
|
||||||
@@ -84,13 +64,20 @@ has 'online_logging' => (
|
|||||||
default => sub { return 0 },
|
default => sub { return 0 },
|
||||||
);
|
);
|
||||||
|
|
||||||
|
has '_buffer' => (
|
||||||
|
is => 'rw',
|
||||||
|
isa => 'ArrayRef',
|
||||||
|
required => 0,
|
||||||
|
default => sub { return []; },
|
||||||
|
);
|
||||||
|
|
||||||
has '_pipe_write' => (
|
has '_pipe_write' => (
|
||||||
is => 'rw',
|
is => 'rw',
|
||||||
isa => 'Maybe[FileHandle]',
|
isa => 'Maybe[FileHandle]',
|
||||||
required => 0,
|
required => 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
sub read_timeout {
|
sub read_stdin {
|
||||||
my ( $t ) = @_;
|
my ( $t ) = @_;
|
||||||
|
|
||||||
# Set the SIGALRM handler.
|
# Set the SIGALRM handler.
|
||||||
@@ -103,7 +90,6 @@ sub read_timeout {
|
|||||||
eval {
|
eval {
|
||||||
alarm $t;
|
alarm $t;
|
||||||
while(defined(my $line = <STDIN>)) {
|
while(defined(my $line = <STDIN>)) {
|
||||||
chomp $line;
|
|
||||||
push @lines, $line;
|
push @lines, $line;
|
||||||
}
|
}
|
||||||
push @lines, undef; # stop
|
push @lines, undef; # stop
|
||||||
@@ -117,15 +103,16 @@ sub read_timeout {
|
|||||||
return \@lines;
|
return \@lines;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub start_online_logging {
|
||||||
sub enable_online_logging {
|
|
||||||
my ($self, %args) = @_;
|
my ($self, %args) = @_;
|
||||||
my $client = $args{client};
|
my $client = $args{client};
|
||||||
my $log_link = $args{log_link};
|
my $log_link = $args{log_link};
|
||||||
|
my $read_timeout = $args{read_timeout} || 3;
|
||||||
|
|
||||||
my $pid = open(my $pipe_write, "|-");
|
my $pid = open(my $pipe_write, "|-");
|
||||||
|
|
||||||
if ($pid) {
|
if ($pid) {
|
||||||
|
# parent
|
||||||
select $pipe_write;
|
select $pipe_write;
|
||||||
$OUTPUT_AUTOFLUSH = 1;
|
$OUTPUT_AUTOFLUSH = 1;
|
||||||
$self->_pipe_write($pipe_write);
|
$self->_pipe_write($pipe_write);
|
||||||
@@ -137,7 +124,7 @@ sub enable_online_logging {
|
|||||||
my $oktorun = 1;
|
my $oktorun = 1;
|
||||||
QUEUE:
|
QUEUE:
|
||||||
while ($oktorun) {
|
while ($oktorun) {
|
||||||
my $lines = read_timeout($self->queue_wait);
|
my $lines = read_stdin($read_timeout);
|
||||||
LINE:
|
LINE:
|
||||||
foreach my $line ( @$lines ) {
|
foreach my $line ( @$lines ) {
|
||||||
if ( !defined $line ) {
|
if ( !defined $line ) {
|
||||||
@@ -147,8 +134,10 @@ sub enable_online_logging {
|
|||||||
|
|
||||||
# $line = ts,level,message
|
# $line = ts,level,message
|
||||||
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
||||||
|
chomp $msg;
|
||||||
|
|
||||||
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
|
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
|
||||||
|
pid => $self->pid,
|
||||||
entry_ts => $ts,
|
entry_ts => $ts,
|
||||||
log_level => $level,
|
log_level => $level,
|
||||||
message => $msg,
|
message => $msg,
|
||||||
@@ -171,10 +160,6 @@ sub enable_online_logging {
|
|||||||
@log_entries = ();
|
@log_entries = ();
|
||||||
}
|
}
|
||||||
} # have log entries
|
} # have log entries
|
||||||
|
|
||||||
if ( $oktorun ) {
|
|
||||||
sleep $self->queue_wait;
|
|
||||||
}
|
|
||||||
} # QUEUE oktorun
|
} # QUEUE oktorun
|
||||||
|
|
||||||
if ( scalar @log_entries ) {
|
if ( scalar @log_entries ) {
|
||||||
@@ -258,14 +243,14 @@ sub _log {
|
|||||||
my $level_number = level_number($level);
|
my $level_number = level_number($level);
|
||||||
|
|
||||||
if ( $self->online_logging ) {
|
if ( $self->online_logging ) {
|
||||||
foreach my $log_entry ( shift @{$self->_local_q} ) {
|
foreach my $log_entry ( shift @{$self->_buffer} ) {
|
||||||
last unless defined $log_entry;
|
last unless defined $log_entry;
|
||||||
$self->_queue_log_entry(@$log_entry);
|
$self->_queue_log_entry(@$log_entry);
|
||||||
}
|
}
|
||||||
$self->_queue_log_entry($ts, $level_number, $msg);
|
$self->_queue_log_entry($ts, $level_number, $msg);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
push @{$self->_local_q}, [$ts, $level_number, $msg];
|
push @{$self->_buffer}, [$ts, $level_number, $msg];
|
||||||
|
|
||||||
my $ts = ts(time, 0); # 0=local time
|
my $ts = ts(time, 0); # 0=local time
|
||||||
if ( $level_number >= 3 ) { # warning
|
if ( $level_number >= 3 ) { # warning
|
||||||
@@ -295,11 +280,11 @@ sub stop_online_logging {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#sub DESTROY {
|
sub DESTROY {
|
||||||
# my $self = shift;
|
my $self = shift;
|
||||||
# $self->stop_online_logging();
|
$self->stop_online_logging();
|
||||||
# return;
|
return;
|
||||||
#}
|
}
|
||||||
|
|
||||||
sub _d {
|
sub _d {
|
||||||
my ($package, undef, $line) = caller 0;
|
my ($package, undef, $line) = caller 0;
|
||||||
|
@@ -22,6 +22,12 @@ package Percona::WebAPI::Resource::LogEntry;
|
|||||||
|
|
||||||
use Lmo;
|
use Lmo;
|
||||||
|
|
||||||
|
has 'pid' => (
|
||||||
|
is => 'ro',
|
||||||
|
isa => 'Int',
|
||||||
|
required => 1,
|
||||||
|
);
|
||||||
|
|
||||||
has 'service' => (
|
has 'service' => (
|
||||||
is => 'ro',
|
is => 'ro',
|
||||||
isa => 'Str',
|
isa => 'Str',
|
||||||
|
Reference in New Issue
Block a user