Merge simplify-pqd r542.

This commit is contained in:
Daniel Nichter
2013-01-18 18:29:38 -07:00
4 changed files with 118 additions and 79 deletions

View File

@@ -12039,9 +12039,7 @@ sub add {
push @{$self->{procs}}, $process;
push @{$self->{names}}, $name;
if ( my $n = $args{retry_on_error} ) {
$self->{retries}->{$name} = $n;
}
$self->{retries}->{$name} = $args{retry_on_error} || 100;
if ( $self->{instrument} ) {
$self->{instrumentation}->{$name} = { time => 0, calls => 0 };
}
@@ -12110,7 +12108,11 @@ sub execute {
my $msg = "Pipeline process " . ($procno + 1)
. " ($name) caused an error: "
. $EVAL_ERROR;
if ( defined $self->{retries}->{$name} ) {
if ( !$self->{continue_on_error} ) {
die $msg . "Terminating pipeline because --continue-on-error "
. "is false.\n";
}
elsif ( defined $self->{retries}->{$name} ) {
my $n = $self->{retries}->{$name};
if ( $n ) {
warn $msg . "Will retry pipeline process $procno ($name) "
@@ -12122,9 +12124,6 @@ sub execute {
. "($name) caused too many errors.\n";
}
}
elsif ( !$self->{continue_on_error} ) {
die $msg;
}
else {
warn $msg;
}
@@ -13483,22 +13482,37 @@ sub _d {
# ###########################################################################
package pt_query_digest;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Time::Local qw(timelocal);
use Time::HiRes qw(time usleep);
use List::Util qw(max);
use POSIX qw(signal_h);
use File::Spec;
use Data::Dumper;
$Data::Dumper::Indent = 1;
$OUTPUT_AUTOFLUSH = 1;
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
Transformers->import(qw(shorten micro_t percentage_of ts make_checksum
any_unix_timestamp parse_timestamp unix_timestamp crc32));
use Time::Local qw(timelocal);
use Time::HiRes qw(time usleep);
use List::Util qw(max);
use Scalar::Util qw(looks_like_number);
use POSIX qw(signal_h);
use Data::Dumper;
use Percona::Toolkit;
use JSONReportFormatter;
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
$OUTPUT_AUTOFLUSH = 1;
Transformers->import(qw(
shorten
micro_t
percentage_of
ts
make_checksum
any_unix_timestamp
parse_timestamp
unix_timestamp
crc32
));
use sigtrap 'handler', \&sig_int, 'normal-signals';
@@ -13508,13 +13522,15 @@ my $ep_dbh; # For --explain
my $ps_dbh; # For Processlist
my $aux_dbh; # For --aux-dsn (--since/--until "MySQL expression")
my %resume;
my $save_resume = undef;
my $resume_file = File::Spec->catfile(File::Spec->tmpdir(), 'pt-query-digest-resume');
my $resume_file;
my $offset;
sub main {
local @ARGV = @_; # set global ARGV for this package
$oktorun = 1; # reset between tests else pipeline won't run
# Reset global vars, else tests will fail.
local @ARGV = @_;
$oktorun = 1;
$resume_file = undef;
$offset = undef;
# ##########################################################################
# Get configuration information.
@@ -13597,8 +13613,6 @@ sub main {
$o->usage_or_errors();
$save_resume = $o->get('resume');
# ########################################################################
# Common modules.
# #######################################################################
@@ -13749,16 +13763,6 @@ sub main {
}
}
if ( $o->get('resume') ) {
if (open my $resume_fh, q{<}, $resume_file) {
while (my $line = <$resume_fh>) {
chomp $line;
my ($file, $pos) = $line =~ m/\A(.+)\t([0-9]+)\z/;
$resume{$file} = $pos;
}
}
}
# ########################################################################
# Create all the pipeline processes that do all the work: get input,
# parse events, manage runtime, switch iterations, aggregate, etc.
@@ -13807,7 +13811,7 @@ sub main {
} # prep
{ # input
my $fi = new FileIterator(resume => \%resume);
my $fi = FileIterator->new();
my $next_file = $fi->get_file_itr(@ARGV);
my $input_fh; # the current input fh
my $pr; # Progress obj for ^
@@ -13816,20 +13820,52 @@ sub main {
name => 'input',
process => sub {
my ( $args ) = @_;
# Only get the next file when there's no fh or no more events in
# the current fh. This allows us to do collect-and-report cycles
# (i.e. iterations) on huge files. This doesn't apply to infinite
# inputs because they don't set more_events false.
if ( !$args->{input_fh} || !$args->{more_events} ) {
# Close the current file.
if ( $args->{input_fh} ) {
close $args->{input_fh}
or die "Cannot close input fh: $OS_ERROR";
}
# Open the next file.
my ($fh, $filename, $filesize) = $next_file->();
if ( $fh ) {
PTDEBUG && _d('Reading', $filename);
PTDEBUG && _d('File size:', $filesize);
push @read_files, $filename || "STDIN";
# Read the file offset for --resume.
if ( $o->get('resume') && $filename ) {
$resume_file = $filename . '.resume';
if ( -f $resume_file ) {
open my $resume_fh, '<', $resume_file
or die "Error opening $resume_file: $OS_ERROR";
chomp(my $resume_offset = <$resume_fh>);
close $resume_fh
or die "Error close $resume_file: $OS_ERROR";
if ( !looks_like_number($resume_offset) ) {
die "Offset $resume_offset in $resume_file "
. "does not look like a number.\n";
}
PTDEBUG && _d('Resuming at offset', $resume_offset);
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "Resuming $filename from offset $resume_offset "
. "(file size: $filesize)...\n";
}
else {
PTDEBUG && _d('Not resuming', $filename, 'because',
$resume_file, 'does not exist');
}
}
# Create callback to read next event. Some inputs, like
# Processlist, may use something else but most next_event.
if ( my $read_time = $o->get('read-timeout') ) {
@@ -13842,11 +13878,11 @@ sub main {
$args->{filename} = $filename;
$args->{input_fh} = $fh;
$args->{tell} = sub {
my $pos = tell $fh;
$offset = tell $fh; # update global $offset
if ( $args->{filename} ) {
$args->{pos_for}->{$args->{filename}} = $pos;
$args->{pos_for}->{$args->{filename}} = $offset;
}
return $pos;
return $offset; # legacy: return global $offset
};
$args->{more_events} = 1;
@@ -14352,10 +14388,6 @@ sub main {
# we may just be between iters.
$args->{Runtime}->reset();
$args->{time_left} = undef;
if ( $args->{filename} ) {
$resume{$args->{filename}} = $args->{pos_for}->{$args->{filename}};
}
}
# Continue the pipeline even if we reported and went to the next
@@ -14728,6 +14760,8 @@ sub main {
}
PTDEBUG && _d("Pipeline data:", Dumper($pipeline_data));
save_resume_offset();
# Disconnect all open $dbh's
map {
$dp->disconnect($_);
@@ -14736,8 +14770,6 @@ sub main {
grep { $_ }
($qv_dbh, $qv_dbh2, $ps_dbh, $ep_dbh, $aux_dbh);
save_resume_data();
return 0;
} # End main()
@@ -14880,29 +14912,16 @@ sub print_reports {
return;
}
sub save_resume_data {
return unless $save_resume;
return unless %resume;
if ( open my $resume_fh, q{>}, $resume_file ) {
while ( my ($k, $v) = each %resume ) {
print { $resume_fh } "$k\t$v\n";
}
close $resume_fh;
}
}
# Catches signals so we can exit gracefully.
sub sig_int {
my ( $signal ) = @_;
save_resume_data();
if ( $oktorun ) {
print STDERR "# Caught SIG$signal.\n";
$oktorun = 0;
}
else {
print STDERR "# Exiting on SIG$signal.\n";
save_resume_offset();
exit(1);
}
}
@@ -15148,6 +15167,23 @@ sub verify_run_time {
return $boundary;
}
sub save_resume_offset {
if ( !$resume_file || !$offset ) {
PTDEBUG && _d('Not saving resume offset because there is no '
. 'resume file or offset:', $resume_file, $offset);
return;
}
PTDEBUG && _d('Saving resume at offset', $offset, 'to', $resume_file);
open my $resume_fh, '>', $resume_file
or die "Error opening $resume_file: $OS_ERROR";
print { $resume_fh } $offset, "\n";
close $resume_fh
or die "Error close $resume_file: $OS_ERROR";
warn "\n# Saved resume file offset $offset to $resume_file\n";
return;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
@@ -15697,7 +15733,9 @@ first option on the command line.
default: yes
Continue parsing even if there is an error.
Continue parsing even if there is an error. The tool will not continue
forever: it stops once any process causes 100 errors, in which case there
is probably a bug in the tool or the input is invalid.
=item --create-review-history-table
@@ -16195,8 +16233,11 @@ See L<"OUTPUT"> for more information.
=item --resume
If enabled, the tool will save the furthest it got into the log before exiting;
Future runs on that log with --resume enabled will start from that position.
Resume parsing from the last file offset. When specified, the tool
writes the last file offset to C<FILE.resume> where C<FILE> is the original
file name given on the command line. When ran again with the exact same
file name, the tool reads the last file offset from C<FILE.resume>,
seeks to that position in the file, and resuming parsing events.
=item --review

View File

@@ -5345,7 +5345,7 @@ sub new {
}
my $self = {
instrument => 0,
instrument => PTDEBUG,
continue_on_error => 0,
%args,
@@ -5372,9 +5372,7 @@ sub add {
push @{$self->{procs}}, $process;
push @{$self->{names}}, $name;
if ( my $n = $args{retry_on_error} ) {
$self->{retries}->{$name} = $n;
}
$self->{retries}->{$name} = $args{retry_on_error} || 100;
if ( $self->{instrument} ) {
$self->{instrumentation}->{$name} = { time => 0, calls => 0 };
}
@@ -5443,7 +5441,11 @@ sub execute {
my $msg = "Pipeline process " . ($procno + 1)
. " ($name) caused an error: "
. $EVAL_ERROR;
if ( defined $self->{retries}->{$name} ) {
if ( !$self->{continue_on_error} ) {
die $msg . "Terminating pipeline because --continue-on-error "
. "is false.\n";
}
elsif ( defined $self->{retries}->{$name} ) {
my $n = $self->{retries}->{$name};
if ( $n ) {
warn $msg . "Will retry pipeline process $procno ($name) "
@@ -5455,9 +5457,6 @@ sub execute {
. "($name) caused too many errors.\n";
}
}
elsif ( !$self->{continue_on_error} ) {
die $msg;
}
else {
warn $msg;
}

View File

@@ -71,9 +71,7 @@ sub add {
push @{$self->{procs}}, $process;
push @{$self->{names}}, $name;
if ( my $n = $args{retry_on_error} ) {
$self->{retries}->{$name} = $n;
}
$self->{retries}->{$name} = $args{retry_on_error} || 100;
if ( $self->{instrument} ) {
$self->{instrumentation}->{$name} = { time => 0, calls => 0 };
}
@@ -163,7 +161,11 @@ sub execute {
my $msg = "Pipeline process " . ($procno + 1)
. " ($name) caused an error: "
. $EVAL_ERROR;
if ( defined $self->{retries}->{$name} ) {
if ( !$self->{continue_on_error} ) {
die $msg . "Terminating pipeline because --continue-on-error "
. "is false.\n";
}
elsif ( defined $self->{retries}->{$name} ) {
my $n = $self->{retries}->{$name};
if ( $n ) {
warn $msg . "Will retry pipeline process $procno ($name) "
@@ -175,9 +177,6 @@ sub execute {
. "($name) caused too many errors.\n";
}
}
elsif ( !$self->{continue_on_error} ) {
die $msg;
}
else {
warn $msg;
}

View File

@@ -261,7 +261,7 @@ $pipeline->add(
);
$output = output(
sub { $pipeline->execute(%args) },
sub {$pipeline->execute(%args); },
stderr => 1,
);