mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-25 13:46:22 +00:00
Catch and retry certain errors.
This commit is contained in:
@@ -1395,6 +1395,113 @@ if ( MKDEBUG ) {
|
||||
# End OptionParser package
|
||||
# ###########################################################################
|
||||
|
||||
# ###########################################################################
|
||||
# Cxn package
|
||||
# This package is a copy without comments from the original. The original
|
||||
# with comments and its test file can be found in the Bazaar repository at,
|
||||
# lib/Cxn.pm
|
||||
# t/lib/Cxn.t
|
||||
# See https://launchpad.net/percona-toolkit for more information.
|
||||
# ###########################################################################
|
||||
{
|
||||
package Cxn;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
|
||||
|
||||
sub new {
|
||||
my ( $class, %args ) = @_;
|
||||
my @required_args = qw(DSNParser OptionParser);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
};
|
||||
die "I need a dsn or dsn_string argument"
|
||||
unless $args{dsn} || $args{dsn_string};
|
||||
my ($dp, $o) = @args{@required_args};
|
||||
|
||||
my $dsn = $args{dsn};
|
||||
if ( !$dsn ) {
|
||||
$dsn = $dp->parse(
|
||||
$args{dsn_string}, $args{prev_dsn}, $dp->parse_options($o));
|
||||
}
|
||||
|
||||
my $self = {
|
||||
dsn_string => $args{dsn_string},
|
||||
dsn => $dsn,
|
||||
dbh => $args{dbh},
|
||||
OptionParser => $o,
|
||||
DSNParser => $dp,
|
||||
};
|
||||
|
||||
MKDEBUG && _d('New connection to', $dsn->{n});
|
||||
return bless $self, $class;
|
||||
}
|
||||
|
||||
sub connect {
|
||||
my ( $self ) = @_;
|
||||
my $dsn = $self->{dsn};
|
||||
my $dp = $self->{DSNParser};
|
||||
my $o = $self->{OptionParser};
|
||||
|
||||
my $dbh = $self->{dbh};
|
||||
if ( !$dbh ) {
|
||||
if ( $o->get('ask-pass') ) {
|
||||
$dsn->{p} = OptionParser::prompt_noecho("Enter password: ");
|
||||
}
|
||||
|
||||
$dbh = $dp->get_dbh($dp->get_cxn_params($dsn), { AutoCommit => 1 });
|
||||
MKDEBUG && _d('Connected dbh', $dbh, $dsn->{n});
|
||||
}
|
||||
|
||||
return $self->set_dbh($dbh);
|
||||
}
|
||||
|
||||
sub set_dbh {
|
||||
my ($self, $dbh) = @_;
|
||||
|
||||
return $dbh if $self->{dbh} && $self->{dbh} == $dbh;
|
||||
|
||||
$dbh->{FetchHashKeyName} = 'NAME_lc';
|
||||
|
||||
$self->{dbh} = $dbh;
|
||||
return $dbh;
|
||||
}
|
||||
|
||||
sub dbh {
|
||||
my ($self) = @_;
|
||||
return $self->{dbh};
|
||||
}
|
||||
|
||||
sub dsn {
|
||||
my ($self) = @_;
|
||||
return $self->{dsn};
|
||||
}
|
||||
|
||||
sub DESTROY {
|
||||
my ($self) = @_;
|
||||
if ( $self->{dbh} ) {
|
||||
MKDEBUG && _d('Disconnecting dbh', $self->{dbh}, $self->{dsn}->{n});
|
||||
$self->{dbh}->disconnect();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
sub _d {
|
||||
my ($package, undef, $line) = caller 0;
|
||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||
map { defined $_ ? $_ : 'undef' }
|
||||
@_;
|
||||
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
||||
}
|
||||
|
||||
1;
|
||||
}
|
||||
# ###########################################################################
|
||||
# End Cxn package
|
||||
# ###########################################################################
|
||||
|
||||
# ###########################################################################
|
||||
# Quoter package
|
||||
# This package is a copy without comments from the original. The original
|
||||
@@ -2267,11 +2374,11 @@ sub new {
|
||||
|
||||
sub get_slaves {
|
||||
my ($self, %args) = @_;
|
||||
my @required_args = qw(OptionParser DSNParser Quoter);
|
||||
my @required_args = qw(make_cxn OptionParser DSNParser Quoter);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my ($o, $dp) = @args{@required_args};
|
||||
my ($make_cxn, $o, $dp) = @args{@required_args};
|
||||
|
||||
my $slaves = [];
|
||||
my $method = $o->get('recursion-method');
|
||||
@@ -2292,9 +2399,7 @@ sub get_slaves {
|
||||
my ( $dsn, $dbh, $level, $parent ) = @_;
|
||||
return unless $level;
|
||||
MKDEBUG && _d('Found slave:', $dp->as_string($dsn));
|
||||
$dbh->{InactiveDestroy} = 1; # Prevent destroying on fork.
|
||||
$dbh->{FetchHashKeyName} = 'NAME_lc';
|
||||
push @$slaves, { dsn=>$dsn, dbh=>$dbh };
|
||||
push @$slaves, $make_cxn->(dsn => $dsn, dbh => $dbh);
|
||||
return;
|
||||
},
|
||||
}
|
||||
@@ -2893,11 +2998,11 @@ sub reset_known_replication_threads {
|
||||
|
||||
sub get_cxn_from_dsn_table {
|
||||
my ($self, %args) = @_;
|
||||
my @required_args = qw(dsn_table_dsn DSNParser Quoter);
|
||||
my @required_args = qw(dsn_table_dsn make_cxn DSNParser Quoter);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my ($dsn_table_dsn, $dp, $q) = @args{@required_args};
|
||||
my ($dsn_table_dsn, $make_cxn, $dp, $q) = @args{@required_args};
|
||||
MKDEBUG && _d('DSN table DSN:', $dsn_table_dsn);
|
||||
|
||||
my $dsn = $dp->parse($dsn_table_dsn);
|
||||
@@ -2913,20 +3018,18 @@ sub get_cxn_from_dsn_table {
|
||||
. "or a database-qualified table (t)";
|
||||
}
|
||||
|
||||
my @cxn;
|
||||
my $dbh = $dp->get_dbh($dp->get_cxn_params($dsn));
|
||||
my $sql = "SELECT dsn FROM $dsn_table ORDER BY id";
|
||||
my $dsn_tbl_cxn = $make_cxn->(dsn => $dsn);
|
||||
my $dbh = $dsn_tbl_cxn->connect();
|
||||
my $sql = "SELECT dsn FROM $dsn_table ORDER BY id";
|
||||
MKDEBUG && _d($sql);
|
||||
my $dsns = $dbh->selectcol_arrayref($sql);
|
||||
if ( $dsns ) {
|
||||
foreach my $dsn ( @$dsns ) {
|
||||
MKDEBUG && _d('DSN from DSN table:', $dsn);
|
||||
my $dsn = $dp->parse($dsn);
|
||||
my $dbh = $dp->get_dbh($dp->get_cxn_params($dsn));
|
||||
push @cxn, {dsn=>$dsn, dbh=>$dbh};
|
||||
my $dsn_strings = $dbh->selectcol_arrayref($sql);
|
||||
my @cxn;
|
||||
if ( $dsn_strings ) {
|
||||
foreach my $dsn_string ( @$dsn_strings ) {
|
||||
MKDEBUG && _d('DSN from DSN table:', $dsn_string);
|
||||
push @cxn, $make_cxn->(dsn_string => $dsn_string);
|
||||
}
|
||||
}
|
||||
$dbh->disconnect();
|
||||
return \@cxn;
|
||||
}
|
||||
|
||||
@@ -3346,17 +3449,17 @@ $Data::Dumper::Quotekeys = 0;
|
||||
|
||||
sub new {
|
||||
my ( $class, %args ) = @_;
|
||||
my @required_args = qw(dbh tbl chunk_size OptionParser Quoter TableNibbler TableParser);
|
||||
my @required_args = qw(Cxn tbl chunk_size OptionParser Quoter TableNibbler TableParser);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my ($dbh, $tbl, $chunk_size, $o, $q) = @args{@required_args};
|
||||
my ($cxn, $tbl, $chunk_size, $o, $q) = @args{@required_args};
|
||||
|
||||
my $one_nibble = !defined $args{one_nibble} || $args{one_nibble}
|
||||
? _can_nibble_once(%args)
|
||||
? _can_nibble_once(dbh => $cxn->dbh(), %args)
|
||||
: 0;
|
||||
|
||||
my $index = _find_best_index(%args);
|
||||
my $index = _find_best_index(dbh => $cxn->dbh(), %args);
|
||||
if ( !$index && !$one_nibble ) {
|
||||
die "There is no good index and the table is oversized.";
|
||||
}
|
||||
@@ -3498,7 +3601,7 @@ sub next {
|
||||
my ($self) = @_;
|
||||
|
||||
my %callback_args = (
|
||||
dbh => $self->{dbh},
|
||||
Cxn => $self->{Cxn},
|
||||
tbl => $self->{tbl},
|
||||
NibbleIterator => $self,
|
||||
);
|
||||
@@ -3721,14 +3824,17 @@ sub _can_nibble_once {
|
||||
sub _prepare_sths {
|
||||
my ($self) = @_;
|
||||
MKDEBUG && _d('Preparing statement handles');
|
||||
$self->{nibble_sth}
|
||||
= $self->{dbh}->prepare($self->{nibble_sql});
|
||||
$self->{explain_nibble_sth}
|
||||
= $self->{dbh}->prepare($self->{explain_nibble_sql});
|
||||
|
||||
my $dbh = $self->{Cxn}->dbh();
|
||||
|
||||
$self->{nibble_sth} = $dbh->prepare($self->{nibble_sql});
|
||||
$self->{explain_nibble_sth} = $dbh->prepare($self->{explain_nibble_sql});
|
||||
|
||||
if ( !$self->{one_nibble} ) {
|
||||
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql});
|
||||
$self->{explain_ub_sth} = $self->{dbh}->prepare($self->{explain_ub_sql});
|
||||
$self->{ub_sth} = $dbh->prepare($self->{ub_sql});
|
||||
$self->{explain_ub_sth} = $dbh->prepare($self->{explain_ub_sql});
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3736,10 +3842,12 @@ sub _get_bounds {
|
||||
my ($self) = @_;
|
||||
return if $self->{one_nibble};
|
||||
|
||||
$self->{next_lower} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql});
|
||||
my $dbh = $self->{Cxn}->dbh();
|
||||
|
||||
$self->{next_lower} = $dbh->selectrow_arrayref($self->{first_lb_sql});
|
||||
MKDEBUG && _d('First lower boundary:', Dumper($self->{next_lower}));
|
||||
|
||||
$self->{last_upper} = $self->{dbh}->selectrow_arrayref($self->{last_ub_sql});
|
||||
$self->{last_upper} = $dbh->selectrow_arrayref($self->{last_ub_sql});
|
||||
MKDEBUG && _d('Last upper boundary:', Dumper($self->{last_upper}));
|
||||
|
||||
return;
|
||||
@@ -3780,7 +3888,7 @@ sub _next_boundaries {
|
||||
|
||||
if ( my $callback = $self->{callbacks}->{next_boundaries} ) {
|
||||
my $oktonibble = $callback->(
|
||||
dbh => $self->{dbh},
|
||||
Cxn => $self->{Cxn},
|
||||
tbl => $self->{tbl},
|
||||
NibbleIterator => $self,
|
||||
);
|
||||
@@ -4501,48 +4609,42 @@ sub new {
|
||||
|
||||
sub retry {
|
||||
my ( $self, %args ) = @_;
|
||||
my @required_args = qw(try wait);
|
||||
my @required_args = qw(try fail final_fail);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
};
|
||||
my ($try, $wait) = @args{@required_args};
|
||||
my ($try, $fail, $final_fail) = @args{@required_args};
|
||||
my $wait = $args{wait} || sub { sleep 1; };
|
||||
my $tries = $args{tries} || 3;
|
||||
|
||||
my $last_error;
|
||||
my $tryno = 0;
|
||||
TRY:
|
||||
while ( ++$tryno <= $tries ) {
|
||||
MKDEBUG && _d("Retry", $tryno, "of", $tries);
|
||||
MKDEBUG && _d("Try", $tryno, "of", $tries);
|
||||
my $result;
|
||||
eval {
|
||||
$result = $try->(tryno=>$tryno);
|
||||
};
|
||||
if ( $EVAL_ERROR ) {
|
||||
MKDEBUG && _d("Try code failed:", $EVAL_ERROR);
|
||||
$last_error = $EVAL_ERROR;
|
||||
|
||||
if ( defined $result ) {
|
||||
MKDEBUG && _d("Try code succeeded");
|
||||
if ( my $on_success = $args{on_success} ) {
|
||||
MKDEBUG && _d("Calling on_success code");
|
||||
$on_success->(tryno=>$tryno, result=>$result);
|
||||
if ( $tryno < $tries ) { # more retries
|
||||
my $retry = $fail->(tryno=>$tryno, error=>$last_error);
|
||||
last TRY unless $retry;
|
||||
MKDEBUG && _d("Calling wait code");
|
||||
$wait->(tryno=>$tryno);
|
||||
}
|
||||
}
|
||||
else {
|
||||
MKDEBUG && _d("Try code succeeded");
|
||||
return $result;
|
||||
}
|
||||
|
||||
if ( $EVAL_ERROR ) {
|
||||
MKDEBUG && _d("Try code died:", $EVAL_ERROR);
|
||||
die $EVAL_ERROR unless $args{retry_on_die};
|
||||
}
|
||||
|
||||
if ( $tryno < $tries ) {
|
||||
MKDEBUG && _d("Try code failed, calling wait code");
|
||||
$wait->(tryno=>$tryno);
|
||||
}
|
||||
}
|
||||
|
||||
MKDEBUG && _d("Try code did not succeed");
|
||||
if ( my $on_failure = $args{on_failure} ) {
|
||||
MKDEBUG && _d("Calling on_failure code");
|
||||
$on_failure->();
|
||||
}
|
||||
|
||||
return;
|
||||
MKDEBUG && _d('Try code did not succeed');
|
||||
return $final_fail->(error=>$last_error);
|
||||
}
|
||||
|
||||
sub _d {
|
||||
@@ -5025,7 +5127,7 @@ sub wait {
|
||||
MKDEBUG && _d('Checking slave lag');
|
||||
for my $i ( 0..$#lagged_slaves ) {
|
||||
my $slave = $lagged_slaves[$i];
|
||||
my $lag = $get_lag->($slave->{dbh});
|
||||
my $lag = $get_lag->($slave->dbh());
|
||||
MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag);
|
||||
if ( !defined $lag || $lag > $max_lag ) {
|
||||
$slave->{lag} = $lag;
|
||||
@@ -5142,55 +5244,6 @@ sub _d {
|
||||
# End WeightedAvgRate package
|
||||
# ###########################################################################
|
||||
|
||||
# ###########################################################################
|
||||
# CleanupTask package
|
||||
# This package is a copy without comments from the original. The original
|
||||
# with comments and its test file can be found in the Bazaar repository at,
|
||||
# lib/CleanupTask.pm
|
||||
# t/lib/CleanupTask.t
|
||||
# See https://launchpad.net/percona-toolkit for more information.
|
||||
# ###########################################################################
|
||||
{
|
||||
package CleanupTask;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
|
||||
|
||||
sub new {
|
||||
my ( $class, $task ) = @_;
|
||||
die "I need a task parameter" unless $task;
|
||||
die "The task parameter must be a coderef" unless ref $task eq 'CODE';
|
||||
my $self = {
|
||||
task => $task,
|
||||
};
|
||||
MKDEBUG && _d('Created cleanup task', $task);
|
||||
return bless $self, $class;
|
||||
}
|
||||
|
||||
sub DESTROY {
|
||||
my ($self) = @_;
|
||||
my $task = $self->{task};
|
||||
MKDEBUG && _d('Calling cleanup task', $task);
|
||||
$task->();
|
||||
return;
|
||||
}
|
||||
|
||||
sub _d {
|
||||
my ($package, undef, $line) = caller 0;
|
||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||
map { defined $_ ? $_ : 'undef' }
|
||||
@_;
|
||||
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
||||
}
|
||||
|
||||
1;
|
||||
}
|
||||
# ###########################################################################
|
||||
# End CleanupTask package
|
||||
# ###########################################################################
|
||||
|
||||
# ###########################################################################
|
||||
# This is a combination of modules and programs in one -- a runnable module.
|
||||
# http://www.perl.com/pub/a/2006/07/13/lightning-articles.html?page=last
|
||||
@@ -5287,13 +5340,13 @@ sub main {
|
||||
# ########################################################################
|
||||
# Connect to the master.
|
||||
# ########################################################################
|
||||
my $dsn_defaults = $dp->parse_options($o);
|
||||
my $dsn = $dp->parse(shift @ARGV, undef, $dsn_defaults);
|
||||
my $dbh = get_cxn(
|
||||
dsn => $dsn,
|
||||
my $master_cxn = new Cxn(
|
||||
dsn_string => shift @ARGV,
|
||||
DSNParser => $dp,
|
||||
OptionParser => $o,
|
||||
);
|
||||
my $dbh = $master_cxn->connect(); # connect or die trying
|
||||
my $dsn = $master_cxn->dsn();
|
||||
|
||||
# ########################################################################
|
||||
# Find and connect to slaves.
|
||||
@@ -5307,42 +5360,37 @@ sub main {
|
||||
OptionParser => $o,
|
||||
DSNParser => $dp,
|
||||
Quoter => $q,
|
||||
make_cxn => sub {
|
||||
my $cxn = new Cxn(
|
||||
@_,
|
||||
DSNParser => $dp,
|
||||
OptionParser => $o,
|
||||
);
|
||||
$cxn->connect();
|
||||
return $cxn;
|
||||
},
|
||||
);
|
||||
MKDEBUG && _d(scalar @$slaves, 'slaves found');
|
||||
|
||||
my $slave_lag_cxn;
|
||||
my $slave_lag_cxns;
|
||||
if ( $o->get('check-slave-lag') ) {
|
||||
MKDEBUG && _d('Will use --check-slave-lag to check for slave lag');
|
||||
# OptionParser can't auto-copy DSN vals from a cmd line DSN
|
||||
# to an opt DSN, so we copy them manually.
|
||||
my $dsn = $dp->copy($dsn, $o->get('check-slave-lag'));
|
||||
my $dbh = get_cxn(
|
||||
my $dsn = $dp->copy($master_cxn->dsn(), $o->get('check-slave-lag'));
|
||||
my $cxn = new Cxn(
|
||||
dsn => $dsn,
|
||||
DSNParser => $dp,
|
||||
OptionParser => $o,
|
||||
);
|
||||
$slave_lag_cxn = [ {dsn=>$dsn, dbh=>$dbh} ];
|
||||
$cxn->connect(); # connect or die trying
|
||||
$slave_lag_cxns = [ $cxn ];
|
||||
}
|
||||
else {
|
||||
MKDEBUG && _d('Will check slave lag on all slaves');
|
||||
$slave_lag_cxn = $slaves;
|
||||
$slave_lag_cxns = $slaves;
|
||||
}
|
||||
|
||||
# When we exit main(), either nicely or via death, this object will
|
||||
# be destoryed and its deconstructor will call this callback.
|
||||
my $close_all_dbh = new CleanupTask(
|
||||
sub {
|
||||
$dbh->disconnect() if $dbh;
|
||||
foreach my $slave ( @$slaves ) {
|
||||
$slave->{dbh}->disconnect() if $slave->{dbh};
|
||||
}
|
||||
if ( $o->get('check-slave-lag') ) {
|
||||
$slave_lag_cxn->[0]->{dbh}->disconnect()
|
||||
if $slave_lag_cxn->[0]->{dbh};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
# ########################################################################
|
||||
# Check replication slaves and possibly exit.
|
||||
# ########################################################################
|
||||
@@ -5470,7 +5518,7 @@ sub main {
|
||||
get_lag => sub { return $ms->get_slave_lag(@_) },
|
||||
sleep => $sleep,
|
||||
max_lag => $o->get('max-lag'),
|
||||
slaves => $slave_lag_cxn,
|
||||
slaves => $slave_lag_cxns,
|
||||
);
|
||||
|
||||
# ########################################################################
|
||||
@@ -5521,7 +5569,7 @@ sub main {
|
||||
# of a table. So we just start with the next table.
|
||||
MKDEBUG && _d('Resuming from last chunk in table;',
|
||||
'getting next table');
|
||||
$oktonibble = 0;
|
||||
$oktonibble = 0; # stop nibbling table
|
||||
}
|
||||
else {
|
||||
$nibble_iter->set_nibble_number($last_chunk->{chunk});
|
||||
@@ -5538,7 +5586,7 @@ sub main {
|
||||
$last_chunk = undef;
|
||||
}
|
||||
|
||||
return $oktonibble; # continue nibbling?
|
||||
return $oktonibble; # continue nibbling table?
|
||||
},
|
||||
next_boundaries => sub {
|
||||
my (%args) = @_;
|
||||
@@ -5565,7 +5613,7 @@ sub main {
|
||||
. ($nibble_iter->nibble_number() + 1)
|
||||
. " cannot be nibbled safely.\n";
|
||||
}
|
||||
$tbl->{checksum_results}->{skipped}++;
|
||||
$tbl->{checksum_results}->{errors}++;
|
||||
return 0; # stop nibbling table
|
||||
}
|
||||
|
||||
@@ -5602,7 +5650,6 @@ sub main {
|
||||
if ( ($expl->{key} || '') ne $nibble_iter->nibble_index() ) {
|
||||
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
|
||||
"$tbl->{db}.$tbl->{tbl} not using chunk index, skipping");
|
||||
$tbl->{checksum_results}->{skipped}++;
|
||||
$tbl->{nibble_time} = 0;
|
||||
return 0; # next boundary
|
||||
}
|
||||
@@ -5616,7 +5663,6 @@ sub main {
|
||||
&& $oversize_chunk ) {
|
||||
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
|
||||
"$tbl->{db}.$tbl->{tbl} is too large, skipping");
|
||||
$tbl->{checksum_results}->{skipped}++;
|
||||
$tbl->{nibble_time} = 0;
|
||||
return 0; # next boundary
|
||||
}
|
||||
@@ -5641,12 +5687,16 @@ sub main {
|
||||
my (%args) = @_;
|
||||
my $tbl = $args{tbl};
|
||||
my $nibble_iter = $args{NibbleIterator};
|
||||
|
||||
# Chunk/nibble number that we just inserted or skipped.
|
||||
my $chunk = $nibble_iter->nibble_number();
|
||||
|
||||
# Nibble time will be zero if the chunk was skipped.
|
||||
return unless $tbl->{nibble_time};
|
||||
|
||||
# Chunk/nibble number that we just inserted.
|
||||
my $chunk = $nibble_iter->nibble_number();
|
||||
if ( $tbl->{nibble_time} == 0 ) {
|
||||
MKDEBUG && _d('Skipping chunk', $chunk);
|
||||
$tbl->{checksum_results}->{skipped}++;
|
||||
return;
|
||||
}
|
||||
|
||||
# Fetch the checksum that we just executed from the replicate table.
|
||||
$fetch_sth->execute(@{$tbl}{qw(db tbl)}, $chunk);
|
||||
@@ -5693,7 +5743,7 @@ sub main {
|
||||
. "has been automatically reduced to 1. Check that "
|
||||
. "the server is not being overloaded, or increase "
|
||||
. "--chunk-time. The last chunk, number "
|
||||
. "$args{nibbleno} of table $tbl->{db}.$tbl->{tbl}, "
|
||||
. "$chunk of table $tbl->{db}.$tbl->{tbl}, "
|
||||
. "selected $cnt rows and took "
|
||||
. sprintf('%.3f', $tbl->{nibble_time})
|
||||
. " seconds to execute.\n";
|
||||
@@ -5839,7 +5889,7 @@ sub main {
|
||||
my $nibble_iter;
|
||||
eval {
|
||||
$nibble_iter = new NibbleIterator(
|
||||
dbh => $dbh,
|
||||
Cxn => $master_cxn,
|
||||
tbl => $tbl,
|
||||
chunk_size => $tbl->{chunk_size},
|
||||
chunk_index => $o->get('chunk-index'),
|
||||
@@ -5859,36 +5909,36 @@ sub main {
|
||||
. "$EVAL_ERROR\n";
|
||||
}
|
||||
$exit_status |= 1;
|
||||
next TABLE;
|
||||
}
|
||||
else {
|
||||
# Init a new weighted avg rate calculator for the table.
|
||||
$tbl->{rate} = new WeightedAvgRate(target_t => $chunk_time);
|
||||
|
||||
# Init a new weighted avg rate calculator for the table.
|
||||
$tbl->{rate} = new WeightedAvgRate(target_t => $chunk_time);
|
||||
# Make a Progress obj for this table. It may not be used;
|
||||
# depends on how many rows, chunk size, how fast the server
|
||||
# is, etc. But just in case, all tables have a Progress obj.
|
||||
if ( $o->get('progress') ) {
|
||||
$tbl->{progress} = table_progress(
|
||||
dbh => $dbh,
|
||||
tbl => $tbl,
|
||||
OptionParser => $o,
|
||||
Quoter => $q,
|
||||
);
|
||||
}
|
||||
|
||||
# Make a Progress obj for this table. It may not be used;
|
||||
# depends on how many rows, chunk size, how fast the server
|
||||
# is, etc. But just in case, all tables have a Progress obj.
|
||||
if ( $o->get('progress') ) {
|
||||
$tbl->{progress} = table_progress(
|
||||
dbh => $dbh,
|
||||
tbl => $tbl,
|
||||
OptionParser => $o,
|
||||
Quoter => $q,
|
||||
);
|
||||
# Results, stats, and info related to checksuming this table can
|
||||
# be saved here. print_checksum_results() uses this info.
|
||||
$tbl->{checksum_results} = {};
|
||||
$tbl->{checksum_results}->{start_time} = time;
|
||||
|
||||
# Finally, checksum the table.
|
||||
# The "1 while" loop is necessary because we're executing REPLACE
|
||||
# statements which don't return rows and NibbleIterator only
|
||||
# returns if it has rows to return. So all the work is done via
|
||||
# the callbacks. -- print_checksum_results(), which is called
|
||||
# from the done callback, uses this start time.
|
||||
1 while $nibble_iter->next();
|
||||
}
|
||||
|
||||
# Results, stats, and info related to checksuming this table can
|
||||
# be saved here. print_checksum_results() uses this info.
|
||||
$tbl->{checksum_results} = {};
|
||||
$tbl->{checksum_results}->{start_time} = time;
|
||||
|
||||
# Finally, checksum the table.
|
||||
# The "1 while" loop is necessary because we're executing REPLACE
|
||||
# statements which don't return rows and NibbleIterator only
|
||||
# returns if it has rows to return. So all the work is done via
|
||||
# the callbacks. -- print_checksum_results(), which is called
|
||||
# from the done callback, uses this start time.
|
||||
1 while $nibble_iter->next();
|
||||
};
|
||||
if ( $EVAL_ERROR ) {
|
||||
# This should not happen. If it does, it's probably some bug
|
||||
@@ -5915,26 +5965,25 @@ sub main {
|
||||
# ############################################################################
|
||||
# Subroutines
|
||||
# ############################################################################
|
||||
sub get_cxn {
|
||||
my ( %args ) = @_;
|
||||
my ($dsn, $dp, $o) = @args{qw(dsn DSNParser OptionParser)};
|
||||
|
||||
if ( $o->get('ask-pass') ) {
|
||||
$dsn->{p} = OptionParser::prompt_noecho("Enter password: ");
|
||||
}
|
||||
my $dbh = $dp->get_dbh($dp->get_cxn_params($dsn), { AutoCommit => 1 });
|
||||
$dbh->{FetchHashKeyName} = 'NAME_lc';
|
||||
return $dbh;
|
||||
}
|
||||
{
|
||||
my %ignore_warning_code = (
|
||||
# Error: 1592 SQLSTATE: HY000 (ER_BINLOG_UNSAFE_STATEMENT)
|
||||
# Message: Statement may not be safe to log in statement format.
|
||||
# Ignore this warning because we have purposely set statement-based
|
||||
# replication.
|
||||
1592 => 1,
|
||||
);
|
||||
|
||||
sub exec_nibble {
|
||||
my (%args) = @_;
|
||||
my @required_args = qw(dbh tbl NibbleIterator Retry Quoter OptionParser);
|
||||
my @required_args = qw(Cxn tbl NibbleIterator Retry Quoter OptionParser);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my ($dbh, $tbl, $nibble_iter, $retry, $q, $o)= @args{@required_args};
|
||||
|
||||
my ($cxn, $tbl, $nibble_iter, $retry, $q, $o)= @args{@required_args};
|
||||
|
||||
my $dbh = $cxn->dbh();
|
||||
my $sth = $nibble_iter->statements();
|
||||
my $boundary = $nibble_iter->boundaries();
|
||||
my $lb_quoted = join(',', @{$boundary->{lower}});
|
||||
@@ -5943,10 +5992,9 @@ sub exec_nibble {
|
||||
my $chunk_index = $nibble_iter->nibble_index();
|
||||
|
||||
return $retry->retry(
|
||||
tries => 2,
|
||||
wait => sub { return; },
|
||||
retry_on_die => 1,
|
||||
try => sub {
|
||||
tries => 2,
|
||||
wait => sub { return; },
|
||||
try => sub {
|
||||
# ###################################################################
|
||||
# Start timing the checksum query.
|
||||
# ###################################################################
|
||||
@@ -5985,17 +6033,10 @@ sub exec_nibble {
|
||||
MKDEBUG && _d($sql_warn);
|
||||
my $warnings = $dbh->selectall_arrayref($sql_warn, { Slice => {} } );
|
||||
foreach my $warning ( @$warnings ) {
|
||||
if ( $warning->{message}
|
||||
=~ m/Data truncated for column 'boundaries'/ ) {
|
||||
_d('Warning: WHERE clause too large for boundaries column;',
|
||||
'pt-table-sync may fail');
|
||||
}
|
||||
elsif ( ($warning->{code} || 0) == 1592 ) {
|
||||
# Error: 1592 SQLSTATE: HY000 (ER_BINLOG_UNSAFE_STATEMENT)
|
||||
# Message: Statement may not be safe to log in statement format.
|
||||
# Ignore this warning because we have purposely set
|
||||
# statement-based replication.
|
||||
MKDEBUG && _d('Ignoring warning:', $warning->{message});
|
||||
if ( $ignore_warning_code{ ($warning->{code} || 0) } ) {
|
||||
MKDEBUG && _d('Ignoring warning:', $warning->{code},
|
||||
$warning->{message});
|
||||
next;
|
||||
}
|
||||
else {
|
||||
die "Checksum query caused a warning:\n"
|
||||
@@ -6006,15 +6047,63 @@ sub exec_nibble {
|
||||
}
|
||||
}
|
||||
|
||||
return $t_end - $t_start; # success, return nibble time
|
||||
# Success: no warnings, no errors. Return nibble time.
|
||||
return $t_end - $t_start;
|
||||
},
|
||||
on_failure => sub {
|
||||
# Checksum query caused an error, or something in the try sub died.
|
||||
warn "Error executing checksum query: $EVAL_ERROR\n";
|
||||
$tbl->{checksum_results}->{errors}++;
|
||||
fail => sub {
|
||||
my (%args) = @_;
|
||||
my $error = $args{error};
|
||||
|
||||
if ( $error =~ m/Lock wait timeout exceeded/
|
||||
|| $error =~ m/Query execution was interrupted/
|
||||
) {
|
||||
# These errors/warnings can be retried, so don't print
|
||||
# a warning yet; do that in final_fail.
|
||||
return 1;
|
||||
}
|
||||
elsif ( $error =~ m/MySQL server has gone away/
|
||||
|| $error =~ m/Lost connection to MySQL server/
|
||||
) {
|
||||
# The 2nd pattern means that MySQL itself died or was stopped.
|
||||
# The 3rd pattern means that our cxn was killed (KILL <id>).
|
||||
eval { $dbh = $cxn->connect(); };
|
||||
if ( !$EVAL_ERROR ) {
|
||||
# Reconnected, retry checksum query.
|
||||
return 1;
|
||||
}
|
||||
|
||||
# Failed to reconnect, exit tool.
|
||||
$oktorun = 0;
|
||||
}
|
||||
|
||||
# At this point, either the error/warning cannot be retried,
|
||||
# or we failed to reconnect. So stop trying and call final_fail.
|
||||
return 0;
|
||||
},
|
||||
final_fail => sub {
|
||||
my (%args) = @_;
|
||||
my $error = $args{error};
|
||||
|
||||
if ( $error =~ /Lock wait timeout exceeded/
|
||||
|| $error =~ /Query execution was interrupted/
|
||||
) {
|
||||
# These errors/warnings are not fatal but only cause this
|
||||
# nibble to be skipped.
|
||||
warn "$error\n";
|
||||
return 0; # zero nibble time, skip this nibble
|
||||
}
|
||||
|
||||
# This die will be caught by the eval inside the TABLE loop.
|
||||
# Checksumming for this table will stop, which is probably
|
||||
# good because by this point the error or warning indicates
|
||||
# that something fundamental is broken or wrong. Checksumming
|
||||
# will continue with the next table, unless the fail code set
|
||||
# oktorun=0, in which case the error/warning is fatal.
|
||||
die "Error executing checksum query: $args{error}\n";
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
my $line_fmt = "%14s %6s %6s %7s %7s %7s %7s %-s\n";
|
||||
@@ -6343,7 +6432,7 @@ sub next_lower_boundary {
|
||||
. "$last_chunk->{chunk} because the chunk index are different: "
|
||||
. "$last_chunk->{chunk_index} was used originally but "
|
||||
. $nibble_iter->nibble_index() . " is used now.\n";
|
||||
$tbl->{checksum_results}->{skipped}++;
|
||||
$tbl->{checksum_results}->{errors}++;
|
||||
return;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user