mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-10 13:11:32 +00:00
Fatal error if slave dies. Make Cxn::connect() if dbh id dead. Sort slave lag properly and use Cxn instead of dbh in ReplicaLagWaiter. Check master cxn before keepalive. Sleep N+0.25 waiting for chunks.
This commit is contained in:
@@ -1447,7 +1447,7 @@ sub connect {
|
||||
my $o = $self->{OptionParser};
|
||||
|
||||
my $dbh = $self->{dbh};
|
||||
if ( !$dbh ) {
|
||||
if ( !$dbh || !$dbh->ping() ) {
|
||||
if ( $o->get('ask-pass') && !$self->{asked_for_pass} ) {
|
||||
$dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: ");
|
||||
$self->{asked_for_pass} = 1;
|
||||
@@ -5116,27 +5116,28 @@ sub wait {
|
||||
if ( $pr ) {
|
||||
$pr_callback = sub {
|
||||
my ($fraction, $elapsed, $remaining, $eta, $completed) = @_;
|
||||
my $dsn_name = $worst->{cxn}->dsn()->{n} || '?';
|
||||
if ( defined $worst->{lag} ) {
|
||||
print STDERR "Replica lag is $worst->{lag} seconds on "
|
||||
. "$worst->{dsn}->{n}. Waiting.\n";
|
||||
print STDERR "Replica lag is " . ($worst->{lag} || '?')
|
||||
. " seconds on $dsn_name. Waiting.\n";
|
||||
}
|
||||
else {
|
||||
print STDERR "Replica $worst->{dsn}->{n} is stopped. Waiting.\n";
|
||||
print STDERR "Replica $dsn_name is stopped. Waiting.\n";
|
||||
}
|
||||
return;
|
||||
};
|
||||
$pr->set_callback($pr_callback);
|
||||
}
|
||||
|
||||
my @lagged_slaves = @$slaves; # first check all slaves
|
||||
my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves;
|
||||
while ( $oktorun->() && @lagged_slaves ) {
|
||||
MKDEBUG && _d('Checking slave lag');
|
||||
for my $i ( 0..$#lagged_slaves ) {
|
||||
my $slave = $lagged_slaves[$i];
|
||||
my $lag = $get_lag->($slave->dbh());
|
||||
MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag);
|
||||
my $lag = $get_lag->($lagged_slaves[$i]->{cxn});
|
||||
MKDEBUG && _d($lagged_slaves[$i]->{cxn}->dsn()->{n},
|
||||
'slave lag:', $lag);
|
||||
if ( !defined $lag || $lag > $max_lag ) {
|
||||
$slave->{lag} = $lag;
|
||||
$lagged_slaves[$i]->{lag} = $lag;
|
||||
}
|
||||
else {
|
||||
delete $lagged_slaves[$i];
|
||||
@@ -5146,20 +5147,20 @@ sub wait {
|
||||
@lagged_slaves = grep { defined $_ } @lagged_slaves;
|
||||
if ( @lagged_slaves ) {
|
||||
@lagged_slaves = reverse sort {
|
||||
defined $a && defined $b ? $a <=> $b
|
||||
: defined $a ? -1
|
||||
defined $a->{lag} && defined $b->{lag} ? $a->{lag} <=> $b->{lag}
|
||||
: defined $a->{lag} ? -1
|
||||
: 1;
|
||||
} @lagged_slaves;
|
||||
$worst = $lagged_slaves[0];
|
||||
MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:',
|
||||
Dumper($worst));
|
||||
$worst->{lag}, 'on', Dumper($worst->{cxn}->dsn()));
|
||||
|
||||
if ( $pr ) {
|
||||
$pr->update(sub { return 0; });
|
||||
}
|
||||
|
||||
MKDEBUG && _d('Calling sleep callback');
|
||||
$sleep->();
|
||||
$sleep->($worst->{cxn}, $worst->{lag});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5543,17 +5544,45 @@ sub main {
|
||||
my $sleep = sub {
|
||||
# Don't let the master dbh die while waiting for slaves because we
|
||||
# may wait a very long time for slaves.
|
||||
my $dbh = $master_cxn->dbh();
|
||||
if ( !$dbh || !$dbh->ping() ) {
|
||||
MKDEBUG && _d('Lost connection to master while waiting for slave lag');
|
||||
eval { $dbh = $master_cxn->connect() }; # connect or die trying
|
||||
if ( $EVAL_ERROR ) {
|
||||
$oktorun = 0; # Fatal error
|
||||
chomp $EVAL_ERROR;
|
||||
die "Lost connection to master while waiting for replica lag "
|
||||
. "($EVAL_ERROR)";
|
||||
}
|
||||
}
|
||||
$dbh->do("SELECT 'pt-table-checksum keepalive'");
|
||||
sleep $o->get('check-interval');
|
||||
return;
|
||||
};
|
||||
|
||||
my $get_lag = sub {
|
||||
my ($cxn) = @_;
|
||||
my $dbh = $cxn->dbh();
|
||||
if ( !$dbh || !$dbh->ping() ) {
|
||||
MKDEBUG && _d('Lost connection to slave', $cxn->dsn()->{n},
|
||||
'while waiting for slave lag');
|
||||
eval { $dbh = $cxn->connect() }; # connect or die trying
|
||||
if ( $EVAL_ERROR ) {
|
||||
$oktorun = 0; # Fatal error
|
||||
chomp $EVAL_ERROR;
|
||||
die "Lost connection to replica " . $cxn->dsn()->{n}
|
||||
. " while attempting to get its lag ($EVAL_ERROR)";
|
||||
}
|
||||
}
|
||||
return $ms->get_slave_lag($dbh);
|
||||
};
|
||||
|
||||
my $replica_lag = new ReplicaLagWaiter(
|
||||
oktorun => sub { return $oktorun },
|
||||
get_lag => sub { return $ms->get_slave_lag(@_) },
|
||||
sleep => $sleep,
|
||||
max_lag => $o->get('max-lag'),
|
||||
slaves => $slave_lag_cxns,
|
||||
max_lag => $o->get('max-lag'),
|
||||
oktorun => sub { return $oktorun },
|
||||
get_lag => $get_lag,
|
||||
sleep => $sleep,
|
||||
);
|
||||
|
||||
# ########################################################################
|
||||
@@ -5836,6 +5865,7 @@ sub main {
|
||||
. "AND master_crc IS NOT NULL";
|
||||
MKDEBUG && _d($sql);
|
||||
|
||||
my $sleep_time = 0;
|
||||
my $n_slaves = scalar @$slaves - 1;
|
||||
my @chunks = (0);
|
||||
while ( $chunks[0] < $max_chunk ) {
|
||||
@@ -5848,7 +5878,12 @@ sub main {
|
||||
@chunks = sort { $a <=> $b } @chunks;
|
||||
if ( $chunks[0] < $max_chunk ) {
|
||||
$check_pr->update(sub { return $chunks[0]; });
|
||||
sleep 1;
|
||||
|
||||
# We shouldn't have to wait long here because we already
|
||||
# waited for all slaves to catchup at least until --max-lag.
|
||||
$sleep_time += 0.25 if $sleep_time <= $o->get('max-lag');
|
||||
MKDEBUG && _d('Sleeping', $sleep_time, 'to wait for chunks');
|
||||
sleep $sleep_time;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5978,7 +6013,8 @@ sub main {
|
||||
if ( $EVAL_ERROR ) {
|
||||
# This should not happen. If it does, it's probably some bug
|
||||
# or error that we're not catching.
|
||||
warn ts("Error checksumming table $tbl->{db}.$tbl->{tbl}: "
|
||||
warn ts(($oktorun ? "Error " : "Fatal error ")
|
||||
. "checksumming table $tbl->{db}.$tbl->{tbl}: "
|
||||
. "$EVAL_ERROR\n");
|
||||
$tbl->{checksum_results}->{errors}++;
|
||||
|
||||
|
@@ -87,7 +87,7 @@ sub connect {
|
||||
my $o = $self->{OptionParser};
|
||||
|
||||
my $dbh = $self->{dbh};
|
||||
if ( !$dbh ) {
|
||||
if ( !$dbh || !$dbh->ping() ) {
|
||||
# Ask for password once.
|
||||
if ( $o->get('ask-pass') && !$self->{asked_for_pass} ) {
|
||||
$dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: ");
|
||||
|
@@ -84,27 +84,29 @@ sub wait {
|
||||
# to add Transformers.pm to this tool.
|
||||
$pr_callback = sub {
|
||||
my ($fraction, $elapsed, $remaining, $eta, $completed) = @_;
|
||||
my $dsn_name = $worst->{cxn}->dsn()->{n} || '?';
|
||||
if ( defined $worst->{lag} ) {
|
||||
print STDERR "Replica lag is $worst->{lag} seconds on "
|
||||
. "$worst->{dsn}->{n}. Waiting.\n";
|
||||
print STDERR "Replica lag is " . ($worst->{lag} || '?')
|
||||
. " seconds on $dsn_name. Waiting.\n";
|
||||
}
|
||||
else {
|
||||
print STDERR "Replica $worst->{dsn}->{n} is stopped. Waiting.\n";
|
||||
print STDERR "Replica $dsn_name is stopped. Waiting.\n";
|
||||
}
|
||||
return;
|
||||
};
|
||||
$pr->set_callback($pr_callback);
|
||||
}
|
||||
|
||||
my @lagged_slaves = @$slaves; # first check all slaves
|
||||
# First check all slaves.
|
||||
my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves;
|
||||
while ( $oktorun->() && @lagged_slaves ) {
|
||||
MKDEBUG && _d('Checking slave lag');
|
||||
for my $i ( 0..$#lagged_slaves ) {
|
||||
my $slave = $lagged_slaves[$i];
|
||||
my $lag = $get_lag->($slave->dbh());
|
||||
MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag);
|
||||
my $lag = $get_lag->($lagged_slaves[$i]->{cxn});
|
||||
MKDEBUG && _d($lagged_slaves[$i]->{cxn}->dsn()->{n},
|
||||
'slave lag:', $lag);
|
||||
if ( !defined $lag || $lag > $max_lag ) {
|
||||
$slave->{lag} = $lag;
|
||||
$lagged_slaves[$i]->{lag} = $lag;
|
||||
}
|
||||
else {
|
||||
delete $lagged_slaves[$i];
|
||||
@@ -116,13 +118,13 @@ sub wait {
|
||||
if ( @lagged_slaves ) {
|
||||
# Sort lag, undef is highest because it means the slave is stopped.
|
||||
@lagged_slaves = reverse sort {
|
||||
defined $a && defined $b ? $a <=> $b
|
||||
: defined $a ? -1
|
||||
defined $a->{lag} && defined $b->{lag} ? $a->{lag} <=> $b->{lag}
|
||||
: defined $a->{lag} ? -1
|
||||
: 1;
|
||||
} @lagged_slaves;
|
||||
$worst = $lagged_slaves[0];
|
||||
MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:',
|
||||
Dumper($worst));
|
||||
$worst->{lag}, 'on', Dumper($worst->{cxn}->dsn()));
|
||||
|
||||
if ( $pr ) {
|
||||
# There's no real progress because we can't estimate how long
|
||||
@@ -133,7 +135,7 @@ sub wait {
|
||||
}
|
||||
|
||||
MKDEBUG && _d('Calling sleep callback');
|
||||
$sleep->();
|
||||
$sleep->($worst->{cxn}, $worst->{lag});
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -9,7 +9,7 @@ BEGIN {
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use Test::More tests => 5;
|
||||
use Test::More tests => 7;
|
||||
|
||||
use ReplicaLagWaiter;
|
||||
use Cxn;
|
||||
@@ -19,32 +19,34 @@ my $oktorun = 1;
|
||||
my @waited = ();
|
||||
my @lag = ();
|
||||
my @sleep = ();
|
||||
my @worst = ();
|
||||
|
||||
sub oktorun {
|
||||
return $oktorun;
|
||||
}
|
||||
|
||||
sub get_lag {
|
||||
my ($dbh) = @_;
|
||||
push @waited, $dbh;
|
||||
my ($cxn) = @_;
|
||||
push @waited, $cxn->dbh();
|
||||
my $lag = shift @lag || 0;
|
||||
return $lag;
|
||||
}
|
||||
|
||||
sub sleep {
|
||||
push @worst, [@_];
|
||||
my $t = shift @sleep || 0;
|
||||
sleep $t;
|
||||
}
|
||||
|
||||
my $r1 = new Cxn(dsn=>{n=>'slave1'}, dbh=>1, DSNParser=>1, OptionParser=>1);
|
||||
my $r2 = new Cxn(dsn=>{n=>'slave2'}, dbh=>2, DSNParser=>1, OptionParser=>1);
|
||||
|
||||
my $rll = new ReplicaLagWaiter(
|
||||
oktorun => \&oktorun,
|
||||
get_lag => \&get_lag,
|
||||
sleep => \&sleep,
|
||||
max_lag => 1,
|
||||
slaves => [
|
||||
new Cxn(dsn=>{n=>'slave1'}, dbh=>1, DSNParser=>1, OptionParser=>1),
|
||||
new Cxn(dsn=>{n=>'slave2'}, dbh=>2, DSNParser=>1, OptionParser=>1),
|
||||
],
|
||||
slaves => [$r1, $r2],
|
||||
);
|
||||
|
||||
@lag = (0, 0);
|
||||
@@ -61,9 +63,16 @@ is_deeply(
|
||||
"Waited for all slaves"
|
||||
);
|
||||
|
||||
is_deeply(
|
||||
\@worst,
|
||||
[],
|
||||
"Did not call sleep callback"
|
||||
);
|
||||
|
||||
@waited = ();
|
||||
@lag = (5, 0, 0);
|
||||
@sleep = (1, 1, 1);
|
||||
@lag = (3, 5, 0, 0);
|
||||
@sleep = (1, 1, 1, 1);
|
||||
@worst = ();
|
||||
$t = time;
|
||||
$rll->wait(),
|
||||
ok(
|
||||
@@ -73,10 +82,20 @@ ok(
|
||||
|
||||
is_deeply(
|
||||
\@waited,
|
||||
[1, 2, 1],
|
||||
[1, 2, 2, 1],
|
||||
"wait() waited for first slave"
|
||||
);
|
||||
|
||||
# This tests that the code sorts the lag correctly. r2 is lagging
|
||||
# the worst (5 > 3) so it should be passed to the sleep callback.
|
||||
is_deeply(
|
||||
\@worst,
|
||||
[
|
||||
[ $r2, 5 ],
|
||||
],
|
||||
"Called sleep callback with worst lagger"
|
||||
);
|
||||
|
||||
# #############################################################################
|
||||
# Done.
|
||||
# #############################################################################
|
||||
|
@@ -89,7 +89,8 @@ diag(`rm $outfile >/dev/null 2>&1`);
|
||||
# the tool runs without errors or warnings and checksums all tables.
|
||||
ok(
|
||||
no_diff(
|
||||
sub { pt_table_checksum::main($dsn, '--create-replicate-table') },
|
||||
sub { pt_table_checksum::main($dsn, '--create-replicate-table',
|
||||
qw(--lock-wait-timeout 3)) },
|
||||
"$sample/default-results-5.1.txt",
|
||||
post_pipe => 'awk \'{print $2 " " $3 " " $4 " " $6 " " $8}\'',
|
||||
),
|
||||
@@ -112,7 +113,8 @@ cmp_ok(
|
||||
|
||||
ok(
|
||||
no_diff(
|
||||
sub { pt_table_checksum::main($dsn, qw(--chunk-time 0)) },
|
||||
sub { pt_table_checksum::main($dsn, qw(--chunk-time 0),
|
||||
qw(--lock-wait-timeout 3)) },
|
||||
"$sample/static-chunk-size-results-5.1.txt",
|
||||
post_pipe => 'awk \'{print $2 " " $3 " " $4 " " $5 " " $6 " " $8}\'',
|
||||
),
|
||||
|
Reference in New Issue
Block a user