diff --git a/bin/pt-table-checksum b/bin/pt-table-checksum index dbc68551..d14f840e 100755 --- a/bin/pt-table-checksum +++ b/bin/pt-table-checksum @@ -4479,67 +4479,25 @@ use English qw(-no_match_vars); use constant MKDEBUG => $ENV{MKDEBUG} || 0; use Time::HiRes qw(sleep time); +use Data::Dumper; sub new { my ( $class, %args ) = @_; - my @required_args = qw(spec slaves get_lag initial_n initial_t target_t); + my @required_args = qw(oktorun get_lag sleep max_lag slaves initial_n initial_t target_t); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } - my ($spec) = @args{@required_args}; - - my %specs = map { - my ($key, $val) = split '=', $_; - MKDEBUG && _d($key, '=', $val); - lc($key) => $val; - } @$spec; my $self = { - max => 1, # max slave lag - timeout => 3600, # max time to wait for all slaves to catch up - check => 1, # sleep time between checking slave lag - continue => 'no', # return true even if timeout - %specs, # slave wait specs from caller - slaves => $args{slaves}, - get_lag => $args{get_lag}, - avg_n => $args{initial_n}, - avg_t => $args{initial_t}, - target_t => $args{target_t}, - weight => $args{weight} || 0.75, + %args, + avg_n => $args{initial_n}, + avg_t => $args{initial_t}, + weight => $args{weight} || 0.75, }; return bless $self, $class; } -sub validate_spec { - shift @_ if $_[0] eq 'ReplicaLagLimiter'; - my ( $spec ) = @_; - if ( @$spec == 0 ) { - die "spec array requires at least a max value\n"; - } - my $have_max; - foreach my $op ( @$spec ) { - my ($key, $val) = split '=', $op; - if ( !$key || !$val ) { - die "invalid spec format, should be option=value: $op\n"; - } - if ( $key !~ m/(?:max|timeout|continue)/i ) { - die "unknown option in spec: $op\n"; - } - if ( $key ne 'continue' && $val !~ m/^\d+$/ ) { - die "value must be an integer: $op\n"; - } - if ( $key eq 'continue' && $val !~ m/(?:yes|no)/i ) { - die "value for $key must be \"yes\" or \"no\"\n"; - } - $have_max = 1 if $key eq 'max'; - } - if ( !$have_max ) { - die "max must be specified" - } - return 1; -} - sub update { my ($self, $n, $t) = @_; MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's'); @@ -4560,60 +4518,68 @@ sub wait { foreach my $arg ( @required_args ) { die "I need a $arg argument" unless $args{$arg}; } - my $pr = $args{Progres}; - my $get_lag = $self->{get_lag}; - my $slaves = $self->{slaves}; - my $n_slaves = @$slaves; + my $pr = $args{Progress}; + my $oktorun = $self->{oktorun}; + my $get_lag = $self->{get_lag}; + my $sleep = $self->{sleep}; + my $slaves = $self->{slaves}; + my $max_lag = $self->{max_lag}; + + my $worst; # most lagging slave my $pr_callback; if ( $pr ) { - my $reported = 0; $pr_callback = sub { - my ($fraction, $elapsed, $remaining, $eta, $slave_no) = @_; - if ( !$reported ) { - print STDERR "Waiting for replica " - . ($slaves->[$slave_no]->{dsn}->{n} || '') - . " to catch up...\n"; - $reported = 1; + my ($fraction, $elapsed, $remaining, $eta, $completed) = @_; + if ( defined $worst->{lag} ) { + print STDERR "Replica lag is $worst->{lag} seconds on " + . "$worst->{n}. Waiting.\n"; } else { - print STDERR "Still waiting ($elapsed seconds)...\n"; + print STDERR "Replica $worst->{n} is stopped. Waiting.\n"; } return; }; $pr->set_callback($pr_callback); } - my ($max, $check, $timeout) = @{$self}{qw(max check timeout)}; - my $slave_no = 0; - my $slave = $slaves->[$slave_no]; - my $t_start = time; - while ($slave && time - $t_start < $timeout) { - MKDEBUG && _d('Checking slave lag on', $slave->{dsn}->{n}); - my $lag = $get_lag->($slave->{dbh}); - if ( !defined $lag || $lag > $max ) { - MKDEBUG && _d('Replica lag', $lag, '>', $max, '; sleeping', $check); - $pr->update(sub { return $slave_no; }) if $pr; - sleep $check; + my @lagged_slaves = @$slaves; # first check all slaves + while ( $oktorun->() && @lagged_slaves ) { + MKDEBUG && _d('Checking slave lag'); + for my $i ( 0..$#lagged_slaves ) { + my $slave = $lagged_slaves[$i]; + my $lag = $get_lag->($slave->{dbh}); + MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag); + if ( !defined $lag || $lag > $max_lag ) { + $slave->{lag} = $lag; + } + else { + delete $lagged_slaves[$i]; + } } - else { - MKDEBUG && _d('Replica ready, lag', $lag, '<=', $max); - $slave = $slaves->[++$slave_no]; - } - } - if ( $slave_no < @$slaves ) { - if ( $self->{continue} eq 'no' ) { - die "Timeout waiting for replica " . $slaves->[$slave_no]->{dsn}->{n} - . " to catch up\n"; - } - else { - MKDEBUG && _d('Some slave are not caught up'); - return 0; # not ready + + @lagged_slaves = grep { defined $_ } @lagged_slaves; + if ( @lagged_slaves ) { + @lagged_slaves = reverse sort { + defined $a && defined $b ? $a <=> $b + : defined $a ? -1 + : 1; + } @lagged_slaves; + $worst = $lagged_slaves[0]; + MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:', + Dumper($worst)); + + if ( $pr ) { + $pr->update(sub { return 0; }); + } + + MKDEBUG && _d('Calling sleep callback'); + $sleep->(); } } MKDEBUG && _d('All slaves caught up'); - return 1; # ready + return; } sub _d { @@ -4693,12 +4659,6 @@ sub main { $o->save_error("--progress $EVAL_ERROR"); } } - - eval { ReplicaLagLimiter::validate_spec($o->get('replica-lag')) }; - if ($EVAL_ERROR) { - chomp $EVAL_ERROR; - $o->save_error("--replica-lag: $EVAL_ERROR"); - } } $o->usage_or_errors(); @@ -4762,11 +4722,11 @@ sub main { MKDEBUG && _d(scalar @$slaves, 'slaves found'); my $slave_lag_cxn; - if ( $o->get('replica-lag-dsn') ) { - MKDEBUG && _d('Will use --replica-lag-dsn to check for slave lag'); + if ( $o->get('check-slave-lag') ) { + MKDEBUG && _d('Will use --check-slave-lag to check for slave lag'); # OptionParser can't auto-copy DSN vals from a cmd line DSN # to an opt DSN, so we copy them manually. - my $dsn = $dp->copy($dsn, $o->get('replica-lag-dsn')); + my $dsn = $dp->copy($dsn, $o->get('check-slave-lag')); my $dbh = get_cxn( dsn => $dsn, DSNParser => $dp, @@ -4783,12 +4743,14 @@ sub main { # Make a lag limiter to help adjust chunk size and wait for slaves. # ######################################################################## my $lag_limiter = new ReplicaLagLimiter( + oktorun => sub { return $oktorun }, + get_lag => sub { return $ms->get_slave_lag(@_) }, + sleep => sub { sleep $o->get('check-interval') }, + max_lag => $o->get('max-lag'), initial_n => $o->get('chunk-size'), initial_t => $o->get('chunk-time'), target_t => $o->get('chunk-time'), - spec => $o->get('replica-lag'), slaves => $slave_lag_cxn, - get_lag => sub { return $ms->get_slave_lag(@_) }, ); # ######################################################################## @@ -4976,23 +4938,10 @@ sub main { $pr = new Progress( jobsize => scalar @$slaves, spec => $o->get('progress'), - name => "Waiting for " . (@$slaves > 1 ? "slaves" : "slave") - . " to catch up", + name => "Waiting for replicas to catch up", ); } - my $caught_up; - eval { - $caught_up = $lag_limiter->wait(); - }; - if ( $EVAL_ERROR ) { # slaves didn't catch up and continue=no. - $tbl->{checksum_results}->{errors}++; - warn $EVAL_ERROR; - $oktorun = 0; - } - elsif ( !$caught_up ) { - warn "Some replicas are lagging, but checksumming will " - . "continue because --replica-lag continue=yes.\n"; - } + $lag_limiter->wait(Progress => $pr); return; }, @@ -5736,6 +5685,12 @@ group: Connection Prompt for a password when connecting to MySQL. +=item --check-interval + +type: time; default: 1; group: Throttle + +Sleep time between checks for L<"--max-lag">. + =item --[no]check-replication-filters default: yes; group: Safety @@ -5749,31 +5704,23 @@ queries won't break replication or simply fail to replicate. If you are sure that it's OK to run the checksum queries, you can negate this option to disable the checks. See also L<"--replicate-database">. -=item --chunk-column +=item --check-slave-lag -type: string +type: DSN; group: Throttle -Prefer this column for dividing tables into chunks. By default, -pt-table-checksum chooses the first suitable column for each table, preferring -to use the primary key. This option lets you specify a preferred column, which -pt-table-checksum uses if it exists in the table and is chunkable. If not, then -pt-table-checksum will revert to its default behavior. Be careful when using -this option; a poor choice could cause bad performance. This is probably best -to use when you are checksumming only a single table, not an entire server. See -also L<"--chunk-index">. +Pause checksumming until the specified slave's lag is less than L<"--max-lag">. =item --chunk-index type: string -Prefer this index for chunking tables. By default, pt-table-checksum chooses an -appropriate index for the L<"--chunk-column"> (even if it chooses the chunk -column automatically). This option lets you specify the index you prefer. If -the index doesn't exist, then pt-table-checksum will fall back to its default -behavior. pt-table-checksum adds the index to the checksum SQL statements in a -C clause. Be careful when using this option; a poor choice of -index could cause bad performance. This is probably best to use when you are -checksumming only a single table, not an entire server. +Prefer this index for chunking tables. By default, pt-table-checksum chooses +an appropriate index for chunking. This option lets you specify the index +that you prefer. If the index doesn't exist, then pt-table-checksum will fall +back to its default behavior. pt-table-checksum adds the index to the checksum +SQL statements in a C clause. Be careful when using this option; +a poor choice of index could cause bad performance. This is probably best to +use when you are checksumming only a single table, not an entire server. =item --chunk-size @@ -5974,6 +5921,22 @@ type: string; group: Filter Ignore tables whose names match the Perl regex. +=item --max-lag + +type: time; default: 1s; group: Throttle + +Suspend checksumming if the slave given by L<"--check-slave-lag"> lags. + +This option causes pt-table-checksum to look at slave lag after each checksum. +If the any slave's lag is greater than the option's value, or if the slave +isn't running (so its lag is NULL), pt-table-checksum sleeps for +L<"--check-interval"> seconds and then looks at the lag again. It repeats +until all slaves are caught up, then continues checksumming. + +This option is useful to let you checksum data as fast as the slaves can handle +it, assuming the slave you directed pt-table-checksum to monitor is +representative of all the slaves that may be replicating from this server. + =item --[no]optimize-xor default: yes @@ -6087,24 +6050,6 @@ t. The DSN table should have the following structure: One row specifies one DSN in the C column. Currently, the DSNs are ordered by C, but C and C are otherwise ignored. -=item --replica-lag - -type: array; default: max=1,timeout=3600,continue=no - -Limit lag on replicas to C seconds. After each checksum, the tool -checks all replica servers, or just the L<"--replica-lag-dsn"> if -specified, and waits until the lag on all replicas is <= C. -The tool waits up to C seconds and if the lag is still too high, -it will exit if C is "no", or it will continue and check replica -lag again after the next checksum. - -=item --replica-lag-dsn - -type: DSN - -Check L<"--replica-lag"> only on this replica. If not specified, all replicas -will be checked. - =item --replicate type: string; default: percona.checksums diff --git a/lib/ReplicaLagLimiter.pm b/lib/ReplicaLagLimiter.pm index 2bf91474..3d2a7761 100644 --- a/lib/ReplicaLagLimiter.pm +++ b/lib/ReplicaLagLimiter.pm @@ -37,16 +37,19 @@ use English qw(-no_match_vars); use constant MKDEBUG => $ENV{MKDEBUG} || 0; use Time::HiRes qw(sleep time); +use Data::Dumper; # Sub: new # # Required Arguments: -# spec - --replicat-lag spec (arrayref of option=value pairs) -# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] -# get_lag - Callback passed slave dbh and returns slave's lag -# initial_n - Initial n value for -# initial_t - Initial t value for -# target_t - Target time for t in +# oktorun - Callback that returns true if it's ok to continue running +# get_lag - Callback passed slave dbh and returns slave's lag +# sleep - Callback to sleep between checking lag. +# max_lag - Max lag +# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] +# initial_n - Initial n value for +# initial_t - Initial t value for +# target_t - Target time for t in # # Optional Arguments: # weight - Weight of previous n/t values (default 0.75). @@ -55,65 +58,21 @@ use Time::HiRes qw(sleep time); # ReplicaLagLimiter object sub new { my ( $class, %args ) = @_; - my @required_args = qw(spec slaves get_lag initial_n initial_t target_t); + my @required_args = qw(oktorun get_lag sleep max_lag slaves initial_n initial_t target_t); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } - my ($spec) = @args{@required_args}; - - my %specs = map { - my ($key, $val) = split '=', $_; - MKDEBUG && _d($key, '=', $val); - lc($key) => $val; - } @$spec; my $self = { - max => 1, # max slave lag - timeout => 3600, # max time to wait for all slaves to catch up - check => 1, # sleep time between checking slave lag - continue => 'no', # return true even if timeout - %specs, # slave wait specs from caller - slaves => $args{slaves}, - get_lag => $args{get_lag}, - avg_n => $args{initial_n}, - avg_t => $args{initial_t}, - target_t => $args{target_t}, - weight => $args{weight} || 0.75, + %args, + avg_n => $args{initial_n}, + avg_t => $args{initial_t}, + weight => $args{weight} || 0.75, }; return bless $self, $class; } -sub validate_spec { - # Permit calling as ReplicaLagLimiter-> or ReplicaLagLimiter:: - shift @_ if $_[0] eq 'ReplicaLagLimiter'; - my ( $spec ) = @_; - if ( @$spec == 0 ) { - die "spec array requires at least a max value\n"; - } - my $have_max; - foreach my $op ( @$spec ) { - my ($key, $val) = split '=', $op; - if ( !$key || !$val ) { - die "invalid spec format, should be option=value: $op\n"; - } - if ( $key !~ m/(?:max|timeout|continue)/i ) { - die "unknown option in spec: $op\n"; - } - if ( $key ne 'continue' && $val !~ m/^\d+$/ ) { - die "value must be an integer: $op\n"; - } - if ( $key eq 'continue' && $val !~ m/(?:yes|no)/i ) { - die "value for $key must be \"yes\" or \"no\"\n"; - } - $have_max = 1 if $key eq 'max'; - } - if ( !$have_max ) { - die "max must be specified" - } - return 1; -} - # Sub: update # Update weighted decaying average of master operation time. Param n is # generic; it's how many of whatever the caller is doing (rows, checksums, @@ -139,7 +98,7 @@ sub update { return $new_n; } -# Sub: wait_for_slave +# Sub: wait # Wait for Seconds_Behind_Master on all slaves to become < max. # # Optional Arguments: @@ -153,62 +112,76 @@ sub wait { foreach my $arg ( @required_args ) { die "I need a $arg argument" unless $args{$arg}; } - my $pr = $args{Progres}; - my $get_lag = $self->{get_lag}; - my $slaves = $self->{slaves}; - my $n_slaves = @$slaves; + my $pr = $args{Progress}; + my $oktorun = $self->{oktorun}; + my $get_lag = $self->{get_lag}; + my $sleep = $self->{sleep}; + my $slaves = $self->{slaves}; + my $max_lag = $self->{max_lag}; + + my $worst; # most lagging slave my $pr_callback; if ( $pr ) { # If you use the default Progress report callback, you'll need to # to add Transformers.pm to this tool. - my $reported = 0; $pr_callback = sub { - my ($fraction, $elapsed, $remaining, $eta, $slave_no) = @_; - if ( !$reported ) { - print STDERR "Waiting for replica " - . ($slaves->[$slave_no]->{dsn}->{n} || '') - . " to catch up...\n"; - $reported = 1; + my ($fraction, $elapsed, $remaining, $eta, $completed) = @_; + if ( defined $worst->{lag} ) { + print STDERR "Replica lag is $worst->{lag} seconds on " + . "$worst->{dsn}->{n}. Waiting.\n"; } else { - print STDERR "Still waiting ($elapsed seconds)...\n"; + print STDERR "Replica $worst->{dsn}->{n} is stopped. Waiting.\n"; } return; }; $pr->set_callback($pr_callback); } - my ($max, $check, $timeout) = @{$self}{qw(max check timeout)}; - my $slave_no = 0; - my $slave = $slaves->[$slave_no]; - my $t_start = time; - while ($slave && time - $t_start < $timeout) { - MKDEBUG && _d('Checking slave lag on', $slave->{dsn}->{n}); - my $lag = $get_lag->($slave->{dbh}); - if ( !defined $lag || $lag > $max ) { - MKDEBUG && _d('Replica lag', $lag, '>', $max, '; sleeping', $check); - $pr->update(sub { return $slave_no; }) if $pr; - sleep $check; + my @lagged_slaves = @$slaves; # first check all slaves + while ( $oktorun->() && @lagged_slaves ) { + MKDEBUG && _d('Checking slave lag'); + for my $i ( 0..$#lagged_slaves ) { + my $slave = $lagged_slaves[$i]; + my $lag = $get_lag->($slave->{dbh}); + MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag); + if ( !defined $lag || $lag > $max_lag ) { + $slave->{lag} = $lag; + } + else { + delete $lagged_slaves[$i]; + } } - else { - MKDEBUG && _d('Replica ready, lag', $lag, '<=', $max); - $slave = $slaves->[++$slave_no]; - } - } - if ( $slave_no < @$slaves ) { - if ( $self->{continue} eq 'no' ) { - die "Timeout waiting for replica " . $slaves->[$slave_no]->{dsn}->{n} - . " to catch up\n"; - } - else { - MKDEBUG && _d('Some slave are not caught up'); - return 0; # not ready + + # Remove slaves that aren't lagging. + @lagged_slaves = grep { defined $_ } @lagged_slaves; + if ( @lagged_slaves ) { + # Sort lag, undef is highest because it means the slave is stopped. + @lagged_slaves = reverse sort { + defined $a && defined $b ? $a <=> $b + : defined $a ? -1 + : 1; + } @lagged_slaves; + $worst = $lagged_slaves[0]; + MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:', + Dumper($worst)); + + if ( $pr ) { + # There's no real progress because we can't estimate how long + # it will take all slaves to catch up. The progress reports + # are just to inform the user every 30s which slave is still + # lagging this most. + $pr->update(sub { return 0; }); + } + + MKDEBUG && _d('Calling sleep callback'); + $sleep->(); } } MKDEBUG && _d('All slaves caught up'); - return 1; # ready + return; } sub _d { diff --git a/t/lib/ReplicaLagLimiter.t b/t/lib/ReplicaLagLimiter.t index 64cd5f93..d4dd12db 100644 --- a/t/lib/ReplicaLagLimiter.t +++ b/t/lib/ReplicaLagLimiter.t @@ -9,62 +9,44 @@ BEGIN { use strict; use warnings FATAL => 'all'; use English qw(-no_match_vars); -use Test::More tests => 17; +use Test::More tests => 10; use ReplicaLagLimiter; use PerconaTest; +my $oktorun = 1; +my @waited = (); +my @lag = (); +my @sleep = (); + +sub oktorun { + return $oktorun; +} + +sub get_lag { + my ($dbh) = @_; + push @waited, $dbh; + my $lag = shift @lag || 0; + return $lag; +} + +sub sleep { + my $t = shift @sleep || 0; + sleep $t; +} + my $rll = new ReplicaLagLimiter( - spec => [qw(max=1 timeout=3600 continue=no)], - slaves => [ + oktorun => \&oktorun, + get_lag => \&get_lag, + sleep => \&sleep, + max_lag => 1, + initial_n => 1000, + initial_t => 1, + target_t => 1, + slaves => [ { dsn=>{n=>'slave1'}, dbh=>1 }, { dsn=>{n=>'slave2'}, dbh=>2 }, ], - get_lag => \&get_lag, - initial_n => 1000, - initial_t => 1, - target_t => 1, -); - -# ############################################################################ -# Validate spec. -# ############################################################################ -is( - ReplicaLagLimiter::validate_spec(['max=1','timeout=3600','continue=no']), - 1, - "Valid spec" -); - -throws_ok( - sub { - ReplicaLagLimiter::validate_spec(['max=1','timeout=3600','foo,bar']) - }, - qr/invalid spec format, should be option=value: foo,bar/, - "Invalid spec format" -); - -throws_ok( - sub { - ReplicaLagLimiter::validate_spec(['max=1','timeout=3600','foo=bar']) - }, - qr/unknown option in spec: foo=bar/, - "Unknown spec option" -); - -throws_ok( - sub { - ReplicaLagLimiter::validate_spec(['max=1','timeout=yes']) - }, - qr/value must be an integer: timeout=yes/, - "Value must be int" -); - -throws_ok( - sub { - ReplicaLagLimiter::validate_spec(['max=1','continue=1']) - }, - qr/value for continue must be "yes" or "no"/, - "Value must be yes or no" ); # ############################################################################ @@ -122,20 +104,12 @@ is( # ############################################################################ # Fake waiting for slaves. # ############################################################################ -my @waited = (); -my @lag = (); -sub get_lag { - my ($dbh) = @_; - push @waited, $dbh; - my $lag = shift @lag || 0; - return $lag; -} - @lag = (0, 0); -is( - $rll->wait(), - 1, - "wait() returns 1 if all slaves catch up" +my $t = time; +$rll->wait(); +ok( + time - $t < 0.5, + "wait() returns immediately if all slaves are ready" ); is_deeply( @@ -146,8 +120,9 @@ is_deeply( @waited = (); @lag = (5, 0, 0); -my $t = time; -my $ret = $rll->wait(), +@sleep = (1, 1, 1); +$t = time; +$rll->wait(), ok( time - $t >= 0.9, "wait() waited a second" @@ -155,52 +130,10 @@ ok( is_deeply( \@waited, - [1, 1, 2], + [1, 2, 1], "wait() waited for first slave" ); -# Lower timeout to check if wait() will die. -$rll = new ReplicaLagLimiter( - spec => [qw(max=1 timeout=0.75 continue=no)], - slaves => [ - { dsn=>{n=>'slave1'}, dbh=>1 }, - { dsn=>{n=>'slave2'}, dbh=>2 }, - ], - get_lag => \&get_lag, - initial_n => 1000, - initial_t => 1, - target_t => 1, -); - -@waited = (); -@lag = (5, 0, 0); -throws_ok( - sub { $rll->wait() }, - qr/Timeout waiting for replica slave1 to catch up/, - "wait() dies on timeout" -); - -# Continue despite not catching up. -$rll = new ReplicaLagLimiter( - spec => [qw(max=1 timeout=0.75 continue=yes)], - slaves => [ - { dsn=>{n=>'slave1'}, dbh=>1 }, - { dsn=>{n=>'slave2'}, dbh=>2 }, - ], - get_lag => \&get_lag, - initial_n => 1000, - initial_t => 1, - target_t => 1, -); - -@waited = (); -@lag = (5, 0, 0); -is( - $rll->wait(), - 0, - "wait() returns 0 if timeout and continue=yes" -); - # ############################################################################# # Done. # #############################################################################