Merge lp:~percona-toolkit-dev/percona-toolkit/pqd-enhanced-resume-file r582.

This commit is contained in:
Daniel Nichter
2013-05-07 20:19:37 -07:00
parent a5f04f27e4
commit e8eca8dde1
3 changed files with 148 additions and 53 deletions

View File

@@ -4877,6 +4877,7 @@ sub new {
my ( $class ) = @_;
my $self = {
pending => [],
last_event_offset => undef,
};
return bless $self, $class;
}
@@ -4913,6 +4914,7 @@ sub parse_event {
or defined($stmt = $next_event->())
) {
my @properties = ('cmd', 'Query', 'pos_in_log', $pos_in_log);
$self->{last_event_offset} = $pos_in_log;
$pos_in_log = $tell->();
if ( $stmt =~ s/$slow_log_hd_line//go ){ # Throw away header lines in log
@@ -5038,9 +5040,15 @@ sub parse_event {
PTDEBUG && _d('Properties of event:', Dumper(\@properties));
my $event = { @properties };
if ( $args{stats} ) {
$args{stats}->{events_read}++;
$args{stats}->{events_parsed}++;
if ( !$event->{arg} ) {
PTDEBUG && _d('Partial event, no arg');
}
else {
$self->{last_event_offset} = undef;
if ( $args{stats} ) {
$args{stats}->{events_read}++;
$args{stats}->{events_parsed}++;
}
}
return $event;
} # EVENT
@@ -12464,6 +12472,7 @@ my $ps_dbh; # For Processlist
my $aux_dbh; # For --aux-dsn (--since/--until "MySQL expression")
my $resume_file;
my $resume = {};
my $offset;
(my $tool = __PACKAGE__) =~ tr/_/-/;
@@ -12472,7 +12481,7 @@ sub main {
# Reset global vars, else tests will fail.
local @ARGV = @_;
$oktorun = 1;
$resume_file = undef;
$resume = {};
$offset = undef;
# ##########################################################################
@@ -12762,25 +12771,63 @@ sub main {
# Read the file offset for --resume.
if ( ($resume_file = $o->get('resume')) && $filename ) {
if ( -s $resume_file ) {
open my $resume_fh, '<', $resume_file
or die "Error opening $resume_file: $OS_ERROR";
chomp(my $resume_offset = <$resume_fh>);
open my $resume_fh, "<", $resume_file
or die "Cannot open $resume_file: $OS_ERROR";
my $resume_offset = do { local $/; <$resume_fh> };
close $resume_fh
or die "Error close $resume_file: $OS_ERROR";
if ( !looks_like_number($resume_offset) ) {
die "Offset $resume_offset in $resume_file "
. "does not look like a number.\n";
chomp($resume_offset) if $resume_offset;
if ( looks_like_number($resume_offset) ) {
PTDEBUG && _d('Resuming at offset', $resume_offset);
$resume->{simple} = 1;
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "Resuming $filename from offset $resume_offset "
. "(file size: $filesize)...\n";
}
else {
$resume->{simple} = 0; # enhanced resume file
map {
my $line = $_;
chomp $line;
my ($key, $value) = split('=', $line);
$resume->{$key} = $value;
} split("\n", $resume_offset);
if ( $resume->{end_offset} &&
$resume->{end_offset} <=
($resume->{stop_offset} || 0) )
{
close $args->{input_fh} if $args->{input_fh};
$args->{input_fh} = undef;
$args->{more_events} = 0;
$oktorun = 0;
$resume_file = '';
warn "# Not resuming $filename because "
. "end_offset $resume->{end_offset} is "
. "less than or equal to stop_offset "
. ($resume->{stop_offset} || 0) . "\n";
}
else {
$resume_offset = $resume->{stop_offset}
|| $resume->{start_offset}
|| 0;
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "Resuming $filename from offset "
. "$resume_offset to "
. ($resume->{end_offset} ? $resume->{end_offset}
: "end of file")
. " (file size: $filesize)...\n";
}
}
PTDEBUG && _d('Resuming at offset', $resume_offset);
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "Resuming $filename from offset $resume_offset "
. "(file size: $filesize)...\n";
}
else {
PTDEBUG && _d('Not resuming', $filename, 'because',
$resume_file, 'does not exist');
$resume->{simple} = 0;
$resume->{start_offset} = 0;
}
}
@@ -12824,13 +12871,24 @@ sub main {
$args->{more_events} = 0;
}
}
$pr->update($args->{tell}) if $pr;
elsif ( $resume->{end_offset}
&& $offset >= $resume->{end_offset} ) {
PTDEBUG && _d('Offset', $offset, 'at end_offset',
$resume->{end_offset});
close $args->{input_fh} if $args->{input_fh};
$args->{input_fh} = undef;
$args->{more_events} = 0;
}
else {
$pr->update($args->{tell}) if $pr;
}
return $args;
},
);
} # input
my $ps_dsn;
my @parsers;
{ # event
my $misc;
if ( $ps_dsn = $o->get('processlist') ) {
@@ -12932,7 +12990,8 @@ sub main {
}
die "Failed to load $module module: $EVAL_ERROR";
}
push @parsers, $parser;
$pipeline->add(
name => ref $parser,
process => sub {
@@ -13634,7 +13693,9 @@ open my $events_fh, '>', $events_file or die "Cannot open $events_file: $OS_ERRO
}
PTDEBUG && _d("Pipeline data:", Dumper($pipeline_data));
save_resume_offset();
save_resume_offset(
last_event_offset => $parsers[0]->{last_event_offset},
);
# Disconnect all open $dbh's
map {
@@ -14158,6 +14219,9 @@ sub verify_run_time {
}
sub save_resume_offset {
my (%args) = @_;
my $last_event_offset = $args{last_event_offset};
if ( !$resume_file || !$offset ) {
PTDEBUG && _d('Not saving resume offset because there is no '
. 'resume file or offset:', $resume_file, $offset);
@@ -14167,10 +14231,26 @@ sub save_resume_offset {
PTDEBUG && _d('Saving resume at offset', $offset, 'to', $resume_file);
open my $resume_fh, '>', $resume_file
or die "Error opening $resume_file: $OS_ERROR";
print { $resume_fh } $offset, "\n";
if ( $resume->{simple} ) {
print { $resume_fh } $offset, "\n";
warn "\n# Saved resume file offset $offset to $resume_file\n";
}
else {
# 2.2.3+ enhanced resume file
$resume->{stop_offset} = defined $last_event_offset ? $last_event_offset
: $offset;
foreach my $key ( sort keys %$resume ) {
next if $key eq 'simple';
print { $resume_fh } "$key=$resume->{$key}\n";
}
warn "\n# Saved resume file stop_offset $resume->{stop_offset} to "
. "$resume_file\n";
}
close $resume_fh
or die "Error close $resume_file: $OS_ERROR";
warn "\n# Saved resume file offset $offset to $resume_file\n";
return;
}