mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-10 21:19:59 +00:00
Fix pipe handling so all log entries are written.
This commit is contained in:
33
bin/pt-agent
33
bin/pt-agent
@@ -4874,20 +4874,21 @@ sub read_stdin {
|
|||||||
POSIX::SigAction->new(sub { die 'read timeout'; }),
|
POSIX::SigAction->new(sub { die 'read timeout'; }),
|
||||||
) or die "Error setting SIGALRM handler: $OS_ERROR";
|
) or die "Error setting SIGALRM handler: $OS_ERROR";
|
||||||
|
|
||||||
|
my $timeout = 0;
|
||||||
my @lines;
|
my @lines;
|
||||||
eval {
|
eval {
|
||||||
alarm $t;
|
alarm $t;
|
||||||
while(defined(my $line = <STDIN>)) {
|
while(defined(my $line = <STDIN>)) {
|
||||||
push @lines, $line;
|
push @lines, $line;
|
||||||
}
|
}
|
||||||
push @lines, undef; # stop
|
|
||||||
alarm 0;
|
alarm 0;
|
||||||
};
|
};
|
||||||
if ( $EVAL_ERROR ) {
|
if ( $EVAL_ERROR ) {
|
||||||
PTDEBUG && _d('Read error:', $EVAL_ERROR);
|
PTDEBUG && _d('Read error:', $EVAL_ERROR);
|
||||||
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
|
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
|
||||||
|
$timeout = 1;
|
||||||
}
|
}
|
||||||
|
return unless scalar @lines || $timeout;
|
||||||
return \@lines;
|
return \@lines;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -4911,13 +4912,9 @@ sub start_online_logging {
|
|||||||
QUEUE:
|
QUEUE:
|
||||||
while ($oktorun) {
|
while ($oktorun) {
|
||||||
my $lines = read_stdin($read_timeout);
|
my $lines = read_stdin($read_timeout);
|
||||||
|
last QUEUE unless $lines;
|
||||||
LINE:
|
LINE:
|
||||||
foreach my $line ( @$lines ) {
|
foreach my $line ( @$lines ) {
|
||||||
if ( !defined $line ) {
|
|
||||||
$oktorun = 0;
|
|
||||||
last LINE;
|
|
||||||
}
|
|
||||||
|
|
||||||
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
||||||
chomp $msg;
|
chomp $msg;
|
||||||
|
|
||||||
@@ -5027,8 +5024,7 @@ sub _log {
|
|||||||
my $level_number = level_number($level);
|
my $level_number = level_number($level);
|
||||||
|
|
||||||
if ( $self->online_logging ) {
|
if ( $self->online_logging ) {
|
||||||
foreach my $log_entry ( shift @{$self->_buffer} ) {
|
while ( defined(my $log_entry = shift @{$self->_buffer}) ) {
|
||||||
last unless defined $log_entry;
|
|
||||||
$self->_queue_log_entry(@$log_entry);
|
$self->_queue_log_entry(@$log_entry);
|
||||||
}
|
}
|
||||||
$self->_queue_log_entry($ts, $level_number, $msg);
|
$self->_queue_log_entry($ts, $level_number, $msg);
|
||||||
@@ -5055,25 +5051,6 @@ sub _queue_log_entry {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub stop_online_logging {
|
|
||||||
my $self = shift;
|
|
||||||
if ( $self->_pipe_write ) {
|
|
||||||
close $self->_pipe_write;
|
|
||||||
}
|
|
||||||
$self->online_logging(0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub DESTROY {
|
|
||||||
my $self = shift;
|
|
||||||
foreach my $log_entry ( shift @{$self->_buffer} ) {
|
|
||||||
last unless defined $log_entry;
|
|
||||||
$self->_queue_log_entry(@$log_entry);
|
|
||||||
}
|
|
||||||
$self->stop_online_logging();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub _d {
|
sub _d {
|
||||||
my ($package, undef, $line) = caller 0;
|
my ($package, undef, $line) = caller 0;
|
||||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||||
|
@@ -86,20 +86,21 @@ sub read_stdin {
|
|||||||
POSIX::SigAction->new(sub { die 'read timeout'; }),
|
POSIX::SigAction->new(sub { die 'read timeout'; }),
|
||||||
) or die "Error setting SIGALRM handler: $OS_ERROR";
|
) or die "Error setting SIGALRM handler: $OS_ERROR";
|
||||||
|
|
||||||
|
my $timeout = 0;
|
||||||
my @lines;
|
my @lines;
|
||||||
eval {
|
eval {
|
||||||
alarm $t;
|
alarm $t;
|
||||||
while(defined(my $line = <STDIN>)) {
|
while(defined(my $line = <STDIN>)) {
|
||||||
push @lines, $line;
|
push @lines, $line;
|
||||||
}
|
}
|
||||||
push @lines, undef; # stop
|
|
||||||
alarm 0;
|
alarm 0;
|
||||||
};
|
};
|
||||||
if ( $EVAL_ERROR ) {
|
if ( $EVAL_ERROR ) {
|
||||||
PTDEBUG && _d('Read error:', $EVAL_ERROR);
|
PTDEBUG && _d('Read error:', $EVAL_ERROR);
|
||||||
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
|
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
|
||||||
|
$timeout = 1;
|
||||||
}
|
}
|
||||||
|
return unless scalar @lines || $timeout;
|
||||||
return \@lines;
|
return \@lines;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,13 +126,9 @@ sub start_online_logging {
|
|||||||
QUEUE:
|
QUEUE:
|
||||||
while ($oktorun) {
|
while ($oktorun) {
|
||||||
my $lines = read_stdin($read_timeout);
|
my $lines = read_stdin($read_timeout);
|
||||||
|
last QUEUE unless $lines;
|
||||||
LINE:
|
LINE:
|
||||||
foreach my $line ( @$lines ) {
|
foreach my $line ( @$lines ) {
|
||||||
if ( !defined $line ) {
|
|
||||||
$oktorun = 0;
|
|
||||||
last LINE;
|
|
||||||
}
|
|
||||||
|
|
||||||
# $line = ts,level,message
|
# $line = ts,level,message
|
||||||
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
|
||||||
chomp $msg;
|
chomp $msg;
|
||||||
@@ -243,8 +240,7 @@ sub _log {
|
|||||||
my $level_number = level_number($level);
|
my $level_number = level_number($level);
|
||||||
|
|
||||||
if ( $self->online_logging ) {
|
if ( $self->online_logging ) {
|
||||||
foreach my $log_entry ( shift @{$self->_buffer} ) {
|
while ( defined(my $log_entry = shift @{$self->_buffer}) ) {
|
||||||
last unless defined $log_entry;
|
|
||||||
$self->_queue_log_entry(@$log_entry);
|
$self->_queue_log_entry(@$log_entry);
|
||||||
}
|
}
|
||||||
$self->_queue_log_entry($ts, $level_number, $msg);
|
$self->_queue_log_entry($ts, $level_number, $msg);
|
||||||
@@ -271,25 +267,6 @@ sub _queue_log_entry {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub stop_online_logging {
|
|
||||||
my $self = shift;
|
|
||||||
if ( $self->_pipe_write ) {
|
|
||||||
close $self->_pipe_write;
|
|
||||||
}
|
|
||||||
$self->online_logging(0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub DESTROY {
|
|
||||||
my $self = shift;
|
|
||||||
foreach my $log_entry ( shift @{$self->_buffer} ) {
|
|
||||||
last unless defined $log_entry;
|
|
||||||
$self->_queue_log_entry(@$log_entry);
|
|
||||||
}
|
|
||||||
$self->stop_online_logging();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub _d {
|
sub _d {
|
||||||
my ($package, undef, $line) = caller 0;
|
my ($package, undef, $line) = caller 0;
|
||||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||||
|
Reference in New Issue
Block a user