diff --git a/bin/pt-query-digest b/bin/pt-query-digest index 7d4fa6b3..6e98a7d7 100755 --- a/bin/pt-query-digest +++ b/bin/pt-query-digest @@ -12039,9 +12039,7 @@ sub add { push @{$self->{procs}}, $process; push @{$self->{names}}, $name; - if ( my $n = $args{retry_on_error} ) { - $self->{retries}->{$name} = $n; - } + $self->{retries}->{$name} = $args{retry_on_error} || 100; if ( $self->{instrument} ) { $self->{instrumentation}->{$name} = { time => 0, calls => 0 }; } @@ -12110,7 +12108,11 @@ sub execute { my $msg = "Pipeline process " . ($procno + 1) . " ($name) caused an error: " . $EVAL_ERROR; - if ( defined $self->{retries}->{$name} ) { + if ( !$self->{continue_on_error} ) { + die $msg . "Terminating pipeline because --continue-on-error " + . "is false.\n"; + } + elsif ( defined $self->{retries}->{$name} ) { my $n = $self->{retries}->{$name}; if ( $n ) { warn $msg . "Will retry pipeline process $procno ($name) " @@ -12122,9 +12124,6 @@ sub execute { . "($name) caused too many errors.\n"; } } - elsif ( !$self->{continue_on_error} ) { - die $msg; - } else { warn $msg; } @@ -13483,22 +13482,37 @@ sub _d { # ########################################################################### package pt_query_digest; +use strict; +use warnings FATAL => 'all'; use English qw(-no_match_vars); -use Time::Local qw(timelocal); -use Time::HiRes qw(time usleep); -use List::Util qw(max); -use POSIX qw(signal_h); -use File::Spec; -use Data::Dumper; -$Data::Dumper::Indent = 1; -$OUTPUT_AUTOFLUSH = 1; +use constant PTDEBUG => $ENV{PTDEBUG} || 0; -Transformers->import(qw(shorten micro_t percentage_of ts make_checksum - any_unix_timestamp parse_timestamp unix_timestamp crc32)); +use Time::Local qw(timelocal); +use Time::HiRes qw(time usleep); +use List::Util qw(max); +use Scalar::Util qw(looks_like_number); +use POSIX qw(signal_h); +use Data::Dumper; use Percona::Toolkit; -use JSONReportFormatter; -use constant PTDEBUG => $ENV{PTDEBUG} || 0; + +$Data::Dumper::Indent = 1; +$Data::Dumper::Sortkeys = 1; +$Data::Dumper::Quotekeys = 0; + +$OUTPUT_AUTOFLUSH = 1; + +Transformers->import(qw( + shorten + micro_t + percentage_of + ts + make_checksum + any_unix_timestamp + parse_timestamp + unix_timestamp + crc32 +)); use sigtrap 'handler', \&sig_int, 'normal-signals'; @@ -13508,13 +13522,15 @@ my $ep_dbh; # For --explain my $ps_dbh; # For Processlist my $aux_dbh; # For --aux-dsn (--since/--until "MySQL expression") -my %resume; -my $save_resume = undef; -my $resume_file = File::Spec->catfile(File::Spec->tmpdir(), 'pt-query-digest-resume'); +my $resume_file; +my $offset; sub main { - local @ARGV = @_; # set global ARGV for this package - $oktorun = 1; # reset between tests else pipeline won't run + # Reset global vars, else tests will fail. + local @ARGV = @_; + $oktorun = 1; + $resume_file = undef; + $offset = undef; # ########################################################################## # Get configuration information. @@ -13597,8 +13613,6 @@ sub main { $o->usage_or_errors(); - $save_resume = $o->get('resume'); - # ######################################################################## # Common modules. # ####################################################################### @@ -13748,16 +13762,6 @@ sub main { ); } } - - if ( $o->get('resume') ) { - if (open my $resume_fh, q{<}, $resume_file) { - while (my $line = <$resume_fh>) { - chomp $line; - my ($file, $pos) = $line =~ m/\A(.+)\t([0-9]+)\z/; - $resume{$file} = $pos; - } - } - } # ######################################################################## # Create all the pipeline processes that do all the work: get input, @@ -13807,7 +13811,7 @@ sub main { } # prep { # input - my $fi = new FileIterator(resume => \%resume); + my $fi = FileIterator->new(); my $next_file = $fi->get_file_itr(@ARGV); my $input_fh; # the current input fh my $pr; # Progress obj for ^ @@ -13816,20 +13820,52 @@ sub main { name => 'input', process => sub { my ( $args ) = @_; + # Only get the next file when there's no fh or no more events in # the current fh. This allows us to do collect-and-report cycles # (i.e. iterations) on huge files. This doesn't apply to infinite # inputs because they don't set more_events false. if ( !$args->{input_fh} || !$args->{more_events} ) { + + # Close the current file. if ( $args->{input_fh} ) { close $args->{input_fh} or die "Cannot close input fh: $OS_ERROR"; } + + # Open the next file. my ($fh, $filename, $filesize) = $next_file->(); if ( $fh ) { PTDEBUG && _d('Reading', $filename); + PTDEBUG && _d('File size:', $filesize); push @read_files, $filename || "STDIN"; + # Read the file offset for --resume. + if ( $o->get('resume') && $filename ) { + $resume_file = $filename . '.resume'; + if ( -f $resume_file ) { + open my $resume_fh, '<', $resume_file + or die "Error opening $resume_file: $OS_ERROR"; + chomp(my $resume_offset = <$resume_fh>); + close $resume_fh + or die "Error close $resume_file: $OS_ERROR"; + if ( !looks_like_number($resume_offset) ) { + die "Offset $resume_offset in $resume_file " + . "does not look like a number.\n"; + } + PTDEBUG && _d('Resuming at offset', $resume_offset); + seek $fh, $resume_offset, 0 + or die "Error seeking to $resume_offset in " + . "$resume_file: $OS_ERROR"; + warn "Resuming $filename from offset $resume_offset " + . "(file size: $filesize)...\n"; + } + else { + PTDEBUG && _d('Not resuming', $filename, 'because', + $resume_file, 'does not exist'); + } + } + # Create callback to read next event. Some inputs, like # Processlist, may use something else but most next_event. if ( my $read_time = $o->get('read-timeout') ) { @@ -13842,11 +13878,11 @@ sub main { $args->{filename} = $filename; $args->{input_fh} = $fh; $args->{tell} = sub { - my $pos = tell $fh; + $offset = tell $fh; # update global $offset if ( $args->{filename} ) { - $args->{pos_for}->{$args->{filename}} = $pos; + $args->{pos_for}->{$args->{filename}} = $offset; } - return $pos; + return $offset; # legacy: return global $offset }; $args->{more_events} = 1; @@ -14352,10 +14388,6 @@ sub main { # we may just be between iters. $args->{Runtime}->reset(); $args->{time_left} = undef; - - if ( $args->{filename} ) { - $resume{$args->{filename}} = $args->{pos_for}->{$args->{filename}}; - } } # Continue the pipeline even if we reported and went to the next @@ -14728,6 +14760,8 @@ sub main { } PTDEBUG && _d("Pipeline data:", Dumper($pipeline_data)); + save_resume_offset(); + # Disconnect all open $dbh's map { $dp->disconnect($_); @@ -14736,8 +14770,6 @@ sub main { grep { $_ } ($qv_dbh, $qv_dbh2, $ps_dbh, $ep_dbh, $aux_dbh); - save_resume_data(); - return 0; } # End main() @@ -14880,29 +14912,16 @@ sub print_reports { return; } -sub save_resume_data { - return unless $save_resume; - return unless %resume; - if ( open my $resume_fh, q{>}, $resume_file ) { - while ( my ($k, $v) = each %resume ) { - print { $resume_fh } "$k\t$v\n"; - } - close $resume_fh; - } -} - # Catches signals so we can exit gracefully. sub sig_int { my ( $signal ) = @_; - - save_resume_data(); - if ( $oktorun ) { print STDERR "# Caught SIG$signal.\n"; $oktorun = 0; } else { print STDERR "# Exiting on SIG$signal.\n"; + save_resume_offset(); exit(1); } } @@ -15148,6 +15167,23 @@ sub verify_run_time { return $boundary; } +sub save_resume_offset { + if ( !$resume_file || !$offset ) { + PTDEBUG && _d('Not saving resume offset because there is no ' + . 'resume file or offset:', $resume_file, $offset); + return; + } + + PTDEBUG && _d('Saving resume at offset', $offset, 'to', $resume_file); + open my $resume_fh, '>', $resume_file + or die "Error opening $resume_file: $OS_ERROR"; + print { $resume_fh } $offset, "\n"; + close $resume_fh + or die "Error close $resume_file: $OS_ERROR"; + warn "\n# Saved resume file offset $offset to $resume_file\n"; + return; +} + sub _d { my ($package, undef, $line) = caller 0; @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } @@ -15697,7 +15733,9 @@ first option on the command line. default: yes -Continue parsing even if there is an error. +Continue parsing even if there is an error. The tool will not continue +forever: it stops once any process causes 100 errors, in which case there +is probably a bug in the tool or the input is invalid. =item --create-review-history-table @@ -16195,8 +16233,11 @@ See L<"OUTPUT"> for more information. =item --resume -If enabled, the tool will save the furthest it got into the log before exiting; -Future runs on that log with --resume enabled will start from that position. +Resume parsing from the last file offset. When specified, the tool +writes the last file offset to C where C is the original +file name given on the command line. When ran again with the exact same +file name, the tool reads the last file offset from C, +seeks to that position in the file, and resuming parsing events. =item --review diff --git a/bin/pt-table-usage b/bin/pt-table-usage index 69b8c891..f972cbd9 100755 --- a/bin/pt-table-usage +++ b/bin/pt-table-usage @@ -5345,7 +5345,7 @@ sub new { } my $self = { - instrument => 0, + instrument => PTDEBUG, continue_on_error => 0, %args, @@ -5372,9 +5372,7 @@ sub add { push @{$self->{procs}}, $process; push @{$self->{names}}, $name; - if ( my $n = $args{retry_on_error} ) { - $self->{retries}->{$name} = $n; - } + $self->{retries}->{$name} = $args{retry_on_error} || 100; if ( $self->{instrument} ) { $self->{instrumentation}->{$name} = { time => 0, calls => 0 }; } @@ -5443,7 +5441,11 @@ sub execute { my $msg = "Pipeline process " . ($procno + 1) . " ($name) caused an error: " . $EVAL_ERROR; - if ( defined $self->{retries}->{$name} ) { + if ( !$self->{continue_on_error} ) { + die $msg . "Terminating pipeline because --continue-on-error " + . "is false.\n"; + } + elsif ( defined $self->{retries}->{$name} ) { my $n = $self->{retries}->{$name}; if ( $n ) { warn $msg . "Will retry pipeline process $procno ($name) " @@ -5455,9 +5457,6 @@ sub execute { . "($name) caused too many errors.\n"; } } - elsif ( !$self->{continue_on_error} ) { - die $msg; - } else { warn $msg; } diff --git a/lib/Pipeline.pm b/lib/Pipeline.pm index ecea20bf..fcb47226 100644 --- a/lib/Pipeline.pm +++ b/lib/Pipeline.pm @@ -71,9 +71,7 @@ sub add { push @{$self->{procs}}, $process; push @{$self->{names}}, $name; - if ( my $n = $args{retry_on_error} ) { - $self->{retries}->{$name} = $n; - } + $self->{retries}->{$name} = $args{retry_on_error} || 100; if ( $self->{instrument} ) { $self->{instrumentation}->{$name} = { time => 0, calls => 0 }; } @@ -163,7 +161,11 @@ sub execute { my $msg = "Pipeline process " . ($procno + 1) . " ($name) caused an error: " . $EVAL_ERROR; - if ( defined $self->{retries}->{$name} ) { + if ( !$self->{continue_on_error} ) { + die $msg . "Terminating pipeline because --continue-on-error " + . "is false.\n"; + } + elsif ( defined $self->{retries}->{$name} ) { my $n = $self->{retries}->{$name}; if ( $n ) { warn $msg . "Will retry pipeline process $procno ($name) " @@ -175,9 +177,6 @@ sub execute { . "($name) caused too many errors.\n"; } } - elsif ( !$self->{continue_on_error} ) { - die $msg; - } else { warn $msg; } diff --git a/t/lib/Pipeline.t b/t/lib/Pipeline.t index 6cb5f0c3..dc21af96 100644 --- a/t/lib/Pipeline.t +++ b/t/lib/Pipeline.t @@ -261,7 +261,7 @@ $pipeline->add( ); $output = output( - sub { $pipeline->execute(%args) }, + sub {$pipeline->execute(%args); }, stderr => 1, );