From b2f92391a7945199af91e9802f96313301a8c6c6 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Mon, 10 Oct 2011 10:56:08 -0600 Subject: [PATCH] Fatal error if slave dies. Make Cxn::connect() if dbh id dead. Sort slave lag properly and use Cxn instead of dbh in ReplicaLagWaiter. Check master cxn before keepalive. Sleep N+0.25 waiting for chunks. --- bin/pt-table-checksum | 80 ++++++++++++++++++++++++--------- lib/Cxn.pm | 2 +- lib/ReplicaLagWaiter.pm | 28 ++++++------ t/lib/ReplicaLagWaiter.t | 39 +++++++++++----- t/pt-table-checksum/replicate.t | 6 ++- 5 files changed, 107 insertions(+), 48 deletions(-) diff --git a/bin/pt-table-checksum b/bin/pt-table-checksum index 2c7ced9c..e762ced5 100755 --- a/bin/pt-table-checksum +++ b/bin/pt-table-checksum @@ -1447,7 +1447,7 @@ sub connect { my $o = $self->{OptionParser}; my $dbh = $self->{dbh}; - if ( !$dbh ) { + if ( !$dbh || !$dbh->ping() ) { if ( $o->get('ask-pass') && !$self->{asked_for_pass} ) { $dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: "); $self->{asked_for_pass} = 1; @@ -5116,27 +5116,28 @@ sub wait { if ( $pr ) { $pr_callback = sub { my ($fraction, $elapsed, $remaining, $eta, $completed) = @_; + my $dsn_name = $worst->{cxn}->dsn()->{n} || '?'; if ( defined $worst->{lag} ) { - print STDERR "Replica lag is $worst->{lag} seconds on " - . "$worst->{dsn}->{n}. Waiting.\n"; + print STDERR "Replica lag is " . ($worst->{lag} || '?') + . " seconds on $dsn_name. Waiting.\n"; } else { - print STDERR "Replica $worst->{dsn}->{n} is stopped. Waiting.\n"; + print STDERR "Replica $dsn_name is stopped. Waiting.\n"; } return; }; $pr->set_callback($pr_callback); } - my @lagged_slaves = @$slaves; # first check all slaves + my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves; while ( $oktorun->() && @lagged_slaves ) { MKDEBUG && _d('Checking slave lag'); for my $i ( 0..$#lagged_slaves ) { - my $slave = $lagged_slaves[$i]; - my $lag = $get_lag->($slave->dbh()); - MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag); + my $lag = $get_lag->($lagged_slaves[$i]->{cxn}); + MKDEBUG && _d($lagged_slaves[$i]->{cxn}->dsn()->{n}, + 'slave lag:', $lag); if ( !defined $lag || $lag > $max_lag ) { - $slave->{lag} = $lag; + $lagged_slaves[$i]->{lag} = $lag; } else { delete $lagged_slaves[$i]; @@ -5146,20 +5147,20 @@ sub wait { @lagged_slaves = grep { defined $_ } @lagged_slaves; if ( @lagged_slaves ) { @lagged_slaves = reverse sort { - defined $a && defined $b ? $a <=> $b - : defined $a ? -1 - : 1; + defined $a->{lag} && defined $b->{lag} ? $a->{lag} <=> $b->{lag} + : defined $a->{lag} ? -1 + : 1; } @lagged_slaves; $worst = $lagged_slaves[0]; MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:', - Dumper($worst)); + $worst->{lag}, 'on', Dumper($worst->{cxn}->dsn())); if ( $pr ) { $pr->update(sub { return 0; }); } MKDEBUG && _d('Calling sleep callback'); - $sleep->(); + $sleep->($worst->{cxn}, $worst->{lag}); } } @@ -5543,17 +5544,45 @@ sub main { my $sleep = sub { # Don't let the master dbh die while waiting for slaves because we # may wait a very long time for slaves. + my $dbh = $master_cxn->dbh(); + if ( !$dbh || !$dbh->ping() ) { + MKDEBUG && _d('Lost connection to master while waiting for slave lag'); + eval { $dbh = $master_cxn->connect() }; # connect or die trying + if ( $EVAL_ERROR ) { + $oktorun = 0; # Fatal error + chomp $EVAL_ERROR; + die "Lost connection to master while waiting for replica lag " + . "($EVAL_ERROR)"; + } + } $dbh->do("SELECT 'pt-table-checksum keepalive'"); sleep $o->get('check-interval'); return; }; + my $get_lag = sub { + my ($cxn) = @_; + my $dbh = $cxn->dbh(); + if ( !$dbh || !$dbh->ping() ) { + MKDEBUG && _d('Lost connection to slave', $cxn->dsn()->{n}, + 'while waiting for slave lag'); + eval { $dbh = $cxn->connect() }; # connect or die trying + if ( $EVAL_ERROR ) { + $oktorun = 0; # Fatal error + chomp $EVAL_ERROR; + die "Lost connection to replica " . $cxn->dsn()->{n} + . " while attempting to get its lag ($EVAL_ERROR)"; + } + } + return $ms->get_slave_lag($dbh); + }; + my $replica_lag = new ReplicaLagWaiter( - oktorun => sub { return $oktorun }, - get_lag => sub { return $ms->get_slave_lag(@_) }, - sleep => $sleep, - max_lag => $o->get('max-lag'), slaves => $slave_lag_cxns, + max_lag => $o->get('max-lag'), + oktorun => sub { return $oktorun }, + get_lag => $get_lag, + sleep => $sleep, ); # ######################################################################## @@ -5836,8 +5865,9 @@ sub main { . "AND master_crc IS NOT NULL"; MKDEBUG && _d($sql); - my $n_slaves = scalar @$slaves - 1; - my @chunks = (0); + my $sleep_time = 0; + my $n_slaves = scalar @$slaves - 1; + my @chunks = (0); while ( $chunks[0] < $max_chunk ) { for my $i ( 0..$n_slaves ) { my $slave = $slaves->[$i]; @@ -5848,7 +5878,12 @@ sub main { @chunks = sort { $a <=> $b } @chunks; if ( $chunks[0] < $max_chunk ) { $check_pr->update(sub { return $chunks[0]; }); - sleep 1; + + # We shouldn't have to wait long here because we already + # waited for all slaves to catchup at least until --max-lag. + $sleep_time += 0.25 if $sleep_time <= $o->get('max-lag'); + MKDEBUG && _d('Sleeping', $sleep_time, 'to wait for chunks'); + sleep $sleep_time; } } @@ -5978,7 +6013,8 @@ sub main { if ( $EVAL_ERROR ) { # This should not happen. If it does, it's probably some bug # or error that we're not catching. - warn ts("Error checksumming table $tbl->{db}.$tbl->{tbl}: " + warn ts(($oktorun ? "Error " : "Fatal error ") + . "checksumming table $tbl->{db}.$tbl->{tbl}: " . "$EVAL_ERROR\n"); $tbl->{checksum_results}->{errors}++; diff --git a/lib/Cxn.pm b/lib/Cxn.pm index 480ed258..7161465d 100644 --- a/lib/Cxn.pm +++ b/lib/Cxn.pm @@ -87,7 +87,7 @@ sub connect { my $o = $self->{OptionParser}; my $dbh = $self->{dbh}; - if ( !$dbh ) { + if ( !$dbh || !$dbh->ping() ) { # Ask for password once. if ( $o->get('ask-pass') && !$self->{asked_for_pass} ) { $dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: "); diff --git a/lib/ReplicaLagWaiter.pm b/lib/ReplicaLagWaiter.pm index 356aaee3..32858f88 100644 --- a/lib/ReplicaLagWaiter.pm +++ b/lib/ReplicaLagWaiter.pm @@ -84,27 +84,29 @@ sub wait { # to add Transformers.pm to this tool. $pr_callback = sub { my ($fraction, $elapsed, $remaining, $eta, $completed) = @_; + my $dsn_name = $worst->{cxn}->dsn()->{n} || '?'; if ( defined $worst->{lag} ) { - print STDERR "Replica lag is $worst->{lag} seconds on " - . "$worst->{dsn}->{n}. Waiting.\n"; + print STDERR "Replica lag is " . ($worst->{lag} || '?') + . " seconds on $dsn_name. Waiting.\n"; } else { - print STDERR "Replica $worst->{dsn}->{n} is stopped. Waiting.\n"; + print STDERR "Replica $dsn_name is stopped. Waiting.\n"; } return; }; $pr->set_callback($pr_callback); } - my @lagged_slaves = @$slaves; # first check all slaves + # First check all slaves. + my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves; while ( $oktorun->() && @lagged_slaves ) { MKDEBUG && _d('Checking slave lag'); for my $i ( 0..$#lagged_slaves ) { - my $slave = $lagged_slaves[$i]; - my $lag = $get_lag->($slave->dbh()); - MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag); + my $lag = $get_lag->($lagged_slaves[$i]->{cxn}); + MKDEBUG && _d($lagged_slaves[$i]->{cxn}->dsn()->{n}, + 'slave lag:', $lag); if ( !defined $lag || $lag > $max_lag ) { - $slave->{lag} = $lag; + $lagged_slaves[$i]->{lag} = $lag; } else { delete $lagged_slaves[$i]; @@ -116,13 +118,13 @@ sub wait { if ( @lagged_slaves ) { # Sort lag, undef is highest because it means the slave is stopped. @lagged_slaves = reverse sort { - defined $a && defined $b ? $a <=> $b - : defined $a ? -1 - : 1; + defined $a->{lag} && defined $b->{lag} ? $a->{lag} <=> $b->{lag} + : defined $a->{lag} ? -1 + : 1; } @lagged_slaves; $worst = $lagged_slaves[0]; MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:', - Dumper($worst)); + $worst->{lag}, 'on', Dumper($worst->{cxn}->dsn())); if ( $pr ) { # There's no real progress because we can't estimate how long @@ -133,7 +135,7 @@ sub wait { } MKDEBUG && _d('Calling sleep callback'); - $sleep->(); + $sleep->($worst->{cxn}, $worst->{lag}); } } diff --git a/t/lib/ReplicaLagWaiter.t b/t/lib/ReplicaLagWaiter.t index e386efb7..81e0ee21 100644 --- a/t/lib/ReplicaLagWaiter.t +++ b/t/lib/ReplicaLagWaiter.t @@ -9,7 +9,7 @@ BEGIN { use strict; use warnings FATAL => 'all'; use English qw(-no_match_vars); -use Test::More tests => 5; +use Test::More tests => 7; use ReplicaLagWaiter; use Cxn; @@ -19,32 +19,34 @@ my $oktorun = 1; my @waited = (); my @lag = (); my @sleep = (); +my @worst = (); sub oktorun { return $oktorun; } sub get_lag { - my ($dbh) = @_; - push @waited, $dbh; + my ($cxn) = @_; + push @waited, $cxn->dbh(); my $lag = shift @lag || 0; return $lag; } sub sleep { + push @worst, [@_]; my $t = shift @sleep || 0; sleep $t; } +my $r1 = new Cxn(dsn=>{n=>'slave1'}, dbh=>1, DSNParser=>1, OptionParser=>1); +my $r2 = new Cxn(dsn=>{n=>'slave2'}, dbh=>2, DSNParser=>1, OptionParser=>1); + my $rll = new ReplicaLagWaiter( oktorun => \&oktorun, get_lag => \&get_lag, sleep => \&sleep, max_lag => 1, - slaves => [ - new Cxn(dsn=>{n=>'slave1'}, dbh=>1, DSNParser=>1, OptionParser=>1), - new Cxn(dsn=>{n=>'slave2'}, dbh=>2, DSNParser=>1, OptionParser=>1), - ], + slaves => [$r1, $r2], ); @lag = (0, 0); @@ -61,9 +63,16 @@ is_deeply( "Waited for all slaves" ); +is_deeply( + \@worst, + [], + "Did not call sleep callback" +); + @waited = (); -@lag = (5, 0, 0); -@sleep = (1, 1, 1); +@lag = (3, 5, 0, 0); +@sleep = (1, 1, 1, 1); +@worst = (); $t = time; $rll->wait(), ok( @@ -73,10 +82,20 @@ ok( is_deeply( \@waited, - [1, 2, 1], + [1, 2, 2, 1], "wait() waited for first slave" ); +# This tests that the code sorts the lag correctly. r2 is lagging +# the worst (5 > 3) so it should be passed to the sleep callback. +is_deeply( + \@worst, + [ + [ $r2, 5 ], + ], + "Called sleep callback with worst lagger" +); + # ############################################################################# # Done. # ############################################################################# diff --git a/t/pt-table-checksum/replicate.t b/t/pt-table-checksum/replicate.t index 282420c4..f46e3630 100644 --- a/t/pt-table-checksum/replicate.t +++ b/t/pt-table-checksum/replicate.t @@ -89,7 +89,8 @@ diag(`rm $outfile >/dev/null 2>&1`); # the tool runs without errors or warnings and checksums all tables. ok( no_diff( - sub { pt_table_checksum::main($dsn, '--create-replicate-table') }, + sub { pt_table_checksum::main($dsn, '--create-replicate-table', + qw(--lock-wait-timeout 3)) }, "$sample/default-results-5.1.txt", post_pipe => 'awk \'{print $2 " " $3 " " $4 " " $6 " " $8}\'', ), @@ -112,7 +113,8 @@ cmp_ok( ok( no_diff( - sub { pt_table_checksum::main($dsn, qw(--chunk-time 0)) }, + sub { pt_table_checksum::main($dsn, qw(--chunk-time 0), + qw(--lock-wait-timeout 3)) }, "$sample/static-chunk-size-results-5.1.txt", post_pipe => 'awk \'{print $2 " " $3 " " $4 " " $5 " " $6 " " $8}\'', ),