Rewrite Logger using pipes because Perl threads cause a small memory leak.

This commit is contained in:
Daniel Nichter
2013-06-09 15:53:35 -07:00
parent e26a8744ef
commit 34bc178cd8
2 changed files with 154 additions and 116 deletions

View File

@@ -25,9 +25,7 @@ use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use JSON;
use threads;
use Thread::Queue;
use POSIX qw(SIGALRM);
use Lmo;
use Transformers;
@@ -79,18 +77,6 @@ has '_local_q' => (
default => sub { return []; },
);
has '_message_queue' => (
is => 'rw',
isa => 'Object',
required => 0,
);
has '_thread' => (
is => 'rw',
isa => 'Object',
required => 0,
);
has 'online_logging' => (
is => 'rw',
isa => 'Bool',
@@ -98,39 +84,78 @@ has 'online_logging' => (
default => sub { return 0 },
);
has '_pipe_write' => (
is => 'rw',
isa => 'Maybe[FileHandle]',
required => 0,
);
sub read_timeout {
my ( $t ) = @_;
# Set the SIGALRM handler.
POSIX::sigaction(
SIGALRM,
POSIX::SigAction->new(sub { die 'read timeout'; }),
) or die "Error setting SIGALRM handler: $OS_ERROR";
my @lines;
eval {
alarm $t;
while(defined(my $line = <STDIN>)) {
chomp $line;
push @lines, $line;
}
push @lines, undef; # stop
alarm 0;
};
if ( $EVAL_ERROR ) {
PTDEBUG && _d('Read error:', $EVAL_ERROR);
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
}
return \@lines;
}
sub enable_online_logging {
my ($self, %args) = @_;
my $client = $args{client};
my $log_link = $args{log_link};
$self->_message_queue(Thread::Queue->new());
my $pid = open(my $pipe_write, "|-");
my $thread = threads::async {
if ($pid) {
select $pipe_write;
$OUTPUT_AUTOFLUSH = 1;
$self->_pipe_write($pipe_write);
$self->online_logging(1);
}
else {
# child
my @log_entries;
my $oktorun = 1;
QUEUE:
while ( $oktorun ) {
my $max_log_entries = 1_000; # for each POST + backlog
while ( $self->_message_queue
&& $self->_message_queue->pending()
&& $max_log_entries--
&& (my $entry = $self->_message_queue->dequeue()) )
{
# $entry = [ ts, level, "message" ]
if ( defined $entry->[0] ) {
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
entry_ts => $entry->[0],
log_level => $entry->[1],
message => $entry->[2],
($self->service ? (service => $self->service) : ()),
($self->data_ts ? (data_ts => $self->data_ts) : ()),
);
}
else {
# Got "stop" entry: [ undef, undef, undef ]
while ($oktorun) {
my $lines = read_timeout($self->queue_wait);
LINE:
foreach my $line ( @$lines ) {
if ( !defined $line ) {
$oktorun = 0;
last LINE;
}
} # read log entries from queue
# $line = ts,level,message
my ($ts, $level, $msg) = $line =~ m/^([^,]+),([^,]+),(.+)/s;
push @log_entries, Percona::WebAPI::Resource::LogEntry->new(
entry_ts => $ts,
log_level => $level,
message => $msg,
($self->service ? (service => $self->service) : ()),
($self->data_ts ? (data_ts => $self->data_ts) : ()),
);
} # LINE
if ( scalar @log_entries ) {
eval {
@@ -160,12 +185,8 @@ sub enable_online_logging {
}
}
return;
}; # threads::async
$self->_thread($thread);
$self->online_logging(1);
exit 0;
} # child
return;
}
@@ -233,18 +254,15 @@ sub _set_exit_status {
sub _log {
my ($self, $level, $msg) = @_;
chomp($msg);
my $ts = ts(time, 1); # 1=UTC
my $level_number = level_number($level);
if ( $self->online_logging ) {
foreach my $log_entry ( shift @{$self->_local_q} ) {
last unless defined $log_entry;
my @event :shared = (@$log_entry);
$self->_message_queue->enqueue(\@event);
$self->_queue_log_entry(@$log_entry);
}
my @event :shared = ($ts, $level_number, $msg);
$self->_message_queue->enqueue(\@event);
$self->_queue_log_entry($ts, $level_number, $msg);
}
else {
push @{$self->_local_q}, [$ts, $level_number, $msg];
@@ -254,29 +272,34 @@ sub _log {
print STDERR "$ts $level $msg\n";
}
else {
print "$ts $level $msg\n";
print STDOUT "$ts $level $msg\n";
}
}
return;
}
sub _queue_log_entry {
my ($self, $ts, $log_level, $msg) = @_;
$msg .= "\n" unless $msg =~ m/\n\Z/;
print "$ts,$log_level,$msg";
return;
}
sub stop_online_logging {
my $self = shift;
if ( $self->_thread && $self->_thread->is_running() ) {
my @stop :shared = (undef, undef);
$self->_message_queue->enqueue(\@stop); # stop the thread
$self->_thread->join();
if ( $self->_pipe_write ) {
close $self->_pipe_write;
}
$self->online_logging(0);
return;
}
sub DESTROY {
my $self = shift;
$self->stop_online_logging();
return;
}
#sub DESTROY {
# my $self = shift;
# $self->stop_online_logging();
# return;
#}
sub _d {
my ($package, undef, $line) = caller 0;