diff --git a/bin/pt-table-checksum b/bin/pt-table-checksum index 03585ecc..a01c6e0c 100755 --- a/bin/pt-table-checksum +++ b/bin/pt-table-checksum @@ -4743,26 +4743,6 @@ sub main { $slave_lag_cxn = $slaves; } - # ######################################################################## - # Make a lag limiter to help adjust chunk size and wait for slaves. - # ######################################################################## - my $sleep = sub { - $dbh->do("SELECT 'pt-table-checksum keepalive'"); - sleep $o->get('check-interval'); - return; - }; - - my $lag_limiter = new ReplicaLagLimiter( - oktorun => sub { return $oktorun }, - get_lag => sub { return $ms->get_slave_lag(@_) }, - sleep => $sleep, - max_lag => $o->get('max-lag'), - initial_n => $o->get('chunk-size'), - initial_t => $o->get('chunk-time'), - target_t => $o->get('chunk-time'), - slaves => $slave_lag_cxn, - ); - # ######################################################################## # Check replication slaves if desired. If only --replicate-check is given, # then we will exit here. If --recheck is also given, then we'll continue @@ -4873,6 +4853,22 @@ sub main { "UPDATE $repl_table SET master_crc = ?, master_cnt = ? " . "WHERE db = ? AND tbl = ? AND chunk = ?"); + # ######################################################################## + # Make a ReplicaLagWaiter to help wait for slaves after each chunk. + # ######################################################################## + my $sleep = sub { + $dbh->do("SELECT 'pt-table-checksum keepalive'"); + sleep $o->get('check-interval'); + return; + }; + + my $replica_lag = new ReplicaLagWaiter( + oktorun => sub { return $oktorun }, + get_lag => sub { return $ms->get_slave_lag(@_) }, + sleep => $sleep, + max_lag => $o->get('max-lag'), + slaves => $slave_lag_cxn, + ); # ######################################################################## # Callbacks for the nibble iterator. @@ -4891,7 +4887,7 @@ sub main { MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table', "$tbl->{db}.$tbl->{tbl}", 'is too large'); $tbl->{checksum_results}->{skipped}++; - $nibble_time = 0; + $tbl->{nibble_time} = 0; return 0; # next boundary } @@ -4902,7 +4898,7 @@ sub main { %args, %common_modules, ); - $nibble_time = time - $t_start; + $tbl->{nibble_time} = time - $t_start; return $rows; }, after_nibble => sub { @@ -4918,9 +4914,9 @@ sub main { $tbl->{checksum_results}->{n_rows} += $cnt || 0; $update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno}); - # Adjust chunk size. $nibble_time will be 0 if this chunk was skipped. - if ( $o->get('chunk-time') && $nibble_time ) { - my $new_chunk_size = $lag_limiter->update($cnt, $nibble_time); + # Adjust chunk size. Nibble time will be 0 if this chunk was skipped. + if ( $o->get('chunk-time') && $tbl->{nibble_time} ) { + my $new_chunk_size = $tbl->{rate}->update($cnt, $tbl->{nibble_time}); if ( $new_chunk_size < 1 ) { # This shouldn't happen, but we must know if it does. And # chunk size can't be set less than 1. @@ -4929,7 +4925,8 @@ sub main { . "is not being overloaded, or increase --chunk-time. " . "The last chunk, number $args{nibbleno} of table " . "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took " - . sprintf('%.3f', $nibble_time) . " seconds to execute.\n"; + . sprintf('%.3f', $tbl->{nibble_time}) . " seconds to " + . "execute.\n"; $new_chunk_size = 1; } $args{NibbleIterator}->set_chunk_size($new_chunk_size); @@ -4951,7 +4948,7 @@ sub main { name => "Waiting for replicas to catch up", ); } - $lag_limiter->wait(Progress => $pr); + $replica_lag->wait(Progress => $pr); return; }, @@ -4998,6 +4995,18 @@ sub main { %common_modules, ); + # Init a new weighted avg rate calculator for the table. This + # table may be really different from the previous. E.g., the + # prev may have been all INT cols--really fast--so chunk size + # was increased dramatically, but this table may have lots of + # BLOB cols--potentially really slow--so we want to start + # cautiously. + $tbl->{rate} = new WeightedAvgRate( + intital_n => $o->get('chunk-size'), + initial_t => $o->get('chunk-time'), + target_t => $o->get('chunk-time'), + ); + # The "1 while" loop is necessary because we're executing REPLACE # statements which don't return rows and NibbleIterator only # returns if it has rows to return. So all the work is done via diff --git a/lib/NibbleIterator.pm b/lib/NibbleIterator.pm index 7f30736c..4012a260 100644 --- a/lib/NibbleIterator.pm +++ b/lib/NibbleIterator.pm @@ -32,18 +32,34 @@ $Data::Dumper::Indent = 1; $Data::Dumper::Sortkeys = 1; $Data::Dumper::Quotekeys = 0; +# Sub: new +# +# Required Arguments: +# dbh - dbh +# tbl - Standard tbl ref +# chunk_size - Number of rows to nibble per chunk +# OptionParser - object +# TableNibbler - object +# TableParser - object +# Quoter - object +# +# Optional Arguments: +# chunk_indexd - Index to use for nibbling +# +# Returns: +# NibbleIterator object sub new { my ( $class, %args ) = @_; - my @required_args = qw(dbh tbl OptionParser Quoter TableNibbler TableParser); + my @required_args = qw(dbh tbl chunk_size OptionParser Quoter TableNibbler TableParser); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless $args{$arg}; } - my ($dbh, $tbl, $o, $q) = @args{@required_args}; + my ($dbh, $tbl, $chunk_size, $o, $q) = @args{@required_args}; # Get an index to nibble by. We'll order rows by the index's columns. my $index = $args{TableParser}->find_best_index( $tbl->{tbl_struct}, - $o->get('chunk-index'), + $args{chunk_index}, ); die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index; my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols}; @@ -152,7 +168,7 @@ sub new { . " /*explain one nibble*/"; MKDEBUG && _d('Explain one nibble statement:', $explain_one_nibble_sql); - my $limit = $o->get('chunk-size') - 1; + my $limit = $chunk_size - 1; MKDEBUG && _d('Initial chunk size (LIMIT):', $limit); my $self = { diff --git a/lib/ReplicaLagLimiter.pm b/lib/ReplicaLagWaiter.pm similarity index 66% rename from lib/ReplicaLagLimiter.pm rename to lib/ReplicaLagWaiter.pm index 3d2a7761..226619c3 100644 --- a/lib/ReplicaLagLimiter.pm +++ b/lib/ReplicaLagWaiter.pm @@ -15,21 +15,12 @@ # this program; if not, write to the Free Software Foundation, Inc., 59 Temple # Place, Suite 330, Boston, MA 02111-1307 USA. # ########################################################################### -# ReplicaLagLimiter package +# ReplicaLagWaiter package # ########################################################################### { -# Package: ReplicaLagLimiter -# ReplicaLagLimiter helps limit slave lag when working on the master. -# There are two sides to this problem: operations on the master and -# slave lag. Master ops that replicate can affect slave lag, so they -# should be adjusted to prevent overloading slaves. returns -# an adjusted "n" value (number of whatever the master is doing) based -# on a weighted decaying average of "t", how long operations are taking. -# The desired master op time range is specified by target_t. -# -# Regardless of all that, slaves may still lag, so waits for them -# to catch up based on the spec passed to . -package ReplicaLagLimiter; +# Package: ReplicaLagWaiter +# ReplicaLagWaiter helps limit slave lag when working on the master. +package ReplicaLagWaiter; use strict; use warnings FATAL => 'all'; @@ -42,62 +33,28 @@ use Data::Dumper; # Sub: new # # Required Arguments: -# oktorun - Callback that returns true if it's ok to continue running -# get_lag - Callback passed slave dbh and returns slave's lag -# sleep - Callback to sleep between checking lag. -# max_lag - Max lag -# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] -# initial_n - Initial n value for -# initial_t - Initial t value for -# target_t - Target time for t in -# -# Optional Arguments: -# weight - Weight of previous n/t values (default 0.75). +# oktorun - Callback that returns true if it's ok to continue running +# get_lag - Callback passed slave dbh and returns slave's lag +# sleep - Callback to sleep between checking lag. +# max_lag - Max lag +# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] # # Returns: -# ReplicaLagLimiter object +# ReplicaLagWaiter object sub new { my ( $class, %args ) = @_; - my @required_args = qw(oktorun get_lag sleep max_lag slaves initial_n initial_t target_t); + my @required_args = qw(oktorun get_lag sleep max_lag slaves); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } my $self = { %args, - avg_n => $args{initial_n}, - avg_t => $args{initial_t}, - weight => $args{weight} || 0.75, }; return bless $self, $class; } -# Sub: update -# Update weighted decaying average of master operation time. Param n is -# generic; it's how many of whatever the caller is doing (rows, checksums, -# etc.). Param s is how long this n took, in seconds (hi-res or not). -# -# Parameters: -# n - Number of operations (rows, etc.) -# t - Amount of time in seconds that n took -# -# Returns: -# n adjust to meet target_t based on weighted decaying avg rate -sub update { - my ($self, $n, $t) = @_; - MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's'); - - $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n; - $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t; - $self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; - MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s'); - - my $new_n = int($self->{avg_rate} * $self->{target_t}); - MKDEBUG && _d('Adjust n to', $new_n); - return $new_n; -} - # Sub: wait # Wait for Seconds_Behind_Master on all slaves to become < max. # diff --git a/lib/WeightedAvgRate.pm b/lib/WeightedAvgRate.pm new file mode 100644 index 00000000..2676fd84 --- /dev/null +++ b/lib/WeightedAvgRate.pm @@ -0,0 +1,96 @@ +# This program is copyright 2011 Percona Inc. +# Feedback and improvements are welcome. +# +# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED +# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF +# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar +# systems, you can issue `man perlgpl' or `man perlartistic' to read these +# licenses. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 59 Temple +# Place, Suite 330, Boston, MA 02111-1307 USA. +# ########################################################################### +# WeightedAvgRate package +# ########################################################################### +{ +# Package: WeightedAvgRate +# WeightedAvgRate calculates and returns a weighted average rate. +package WeightedAvgRate; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant MKDEBUG => $ENV{MKDEBUG} || 0; + +# Sub: new +# +# Required Arguments: +# initial_n - Initial n value for +# initial_t - Initial t value for +# target_t - Target time for t in +# +# Optional Arguments: +# weight - Weight of previous n/t values (default 0.75). +# +# Returns: +# WeightedAvgRate +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(initial_n initial_t target_t); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + + my $self = { + %args, + avg_n => $args{initial_n}, + avg_t => $args{initial_t}, + weight => $args{weight} || 0.75, + }; + + return bless $self, $class; +} + +# Sub: update +# Update weighted average rate. Param n is generic; it's how many of +# whatever the caller is doing (rows, checksums, etc.). Param s is how +# long this n took, in seconds (hi-res or not). +# +# Parameters: +# n - Number of operations (rows, etc.) +# t - Amount of time in seconds that n took +# +# Returns: +# n adjust to meet target_t based on weighted decaying avg rate +sub update { + my ($self, $n, $t) = @_; + MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's'); + + $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n; + $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t; + $self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; + MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s'); + + my $new_n = int($self->{avg_rate} * $self->{target_t}); + MKDEBUG && _d('Adjust n to', $new_n); + return $new_n; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End WeightedAvgRate package +# ########################################################################### diff --git a/t/lib/NibbleIterator.t b/t/lib/NibbleIterator.t index 82804273..c20cb8bd 100644 --- a/t/lib/NibbleIterator.t +++ b/t/lib/NibbleIterator.t @@ -77,10 +77,11 @@ sub make_nibble_iter { 1 while $si->next_schema_object(); my $ni = new NibbleIterator( - dbh => $dbh, - tbl => $schema->get_table($args{db}, $args{tbl}), - callbacks => $args{callbacks}, - select => $args{select}, + dbh => $dbh, + tbl => $schema->get_table($args{db}, $args{tbl}), + chunk_size => $o->get('chunk-size'), + callbacks => $args{callbacks}, + select => $args{select}, %common_modules, ); diff --git a/t/lib/ReplicaLagLimiter.t b/t/lib/ReplicaLagWaiter.t similarity index 57% rename from t/lib/ReplicaLagLimiter.t rename to t/lib/ReplicaLagWaiter.t index d4dd12db..06c742f3 100644 --- a/t/lib/ReplicaLagLimiter.t +++ b/t/lib/ReplicaLagWaiter.t @@ -9,9 +9,9 @@ BEGIN { use strict; use warnings FATAL => 'all'; use English qw(-no_match_vars); -use Test::More tests => 10; +use Test::More tests => 5; -use ReplicaLagLimiter; +use ReplicaLagWaiter; use PerconaTest; my $oktorun = 1; @@ -35,75 +35,17 @@ sub sleep { sleep $t; } -my $rll = new ReplicaLagLimiter( +my $rll = new ReplicaLagWaiter( oktorun => \&oktorun, get_lag => \&get_lag, sleep => \&sleep, max_lag => 1, - initial_n => 1000, - initial_t => 1, - target_t => 1, slaves => [ { dsn=>{n=>'slave1'}, dbh=>1 }, { dsn=>{n=>'slave2'}, dbh=>2 }, ], ); -# ############################################################################ -# Update master op, see if we get correct adjustment result. -# ############################################################################ - -# stay the same -for (1..5) { - $rll->update(1000, 1); -} -is( - $rll->update(1000, 1), - 1000, - "Same rate, same n" -); - -# slow down -for (1..5) { - $rll->update(1000, 2); -} -is( - $rll->update(1000, 2), - 542, - "Decrease rate, decrease n" -); - -for (1..15) { - $rll->update(1000, 2); -} -is( - $rll->update(1000, 2), - 500, - "limit n=500 decreasing" -); - -# speed up -for (1..5) { - $rll->update(1000, 1); -} -is( - $rll->update(1000, 1), - 849, - "Increase rate, increase n" -); - -for (1..20) { - $rll->update(1000, 1); -} -is( - $rll->update(1000, 1), - 999, - "limit n=1000 increasing" -); - -# ############################################################################ -# Fake waiting for slaves. -# ############################################################################ @lag = (0, 0); my $t = time; $rll->wait(); diff --git a/t/lib/WeightedAvgRate.t b/t/lib/WeightedAvgRate.t new file mode 100644 index 00000000..7a1730a2 --- /dev/null +++ b/t/lib/WeightedAvgRate.t @@ -0,0 +1,85 @@ +#!/usr/bin/perl + +BEGIN { + die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n" + unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH}; + unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib"; +}; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use Test::More tests => 6; + +use WeightedAvgRate; +use PerconaTest; + +my $rll = new WeightedAvgRate( + initial_n => 1000, + initial_t => 1, + target_t => 1, +); + +# stay the same +for (1..5) { + $rll->update(1000, 1); +} +is( + $rll->update(1000, 1), + 1000, + "Same rate, same n" +); + +# slow down +for (1..5) { + $rll->update(1000, 2); +} +is( + $rll->update(1000, 2), + 542, + "Decrease rate, decrease n" +); + +for (1..15) { + $rll->update(1000, 2); +} +is( + $rll->update(1000, 2), + 500, + "limit n=500 decreasing" +); + +# speed up +for (1..5) { + $rll->update(1000, 1); +} +is( + $rll->update(1000, 1), + 849, + "Increase rate, increase n" +); + +for (1..20) { + $rll->update(1000, 1); +} +is( + $rll->update(1000, 1), + 999, + "limit n=1000 increasing" +); + +# ############################################################################# +# Done. +# ############################################################################# +my $output = ''; +{ + local *STDERR; + open STDERR, '>', \$output; + $rll->_d('Complete test coverage'); +} +like( + $output, + qr/Complete test coverage/, + '_d() works' +); +exit;