From 380cf4b0bd02f7d3caa2b3425dfbc43a72f1a41e Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Thu, 22 Sep 2011 10:13:44 -0600 Subject: [PATCH] Init talbe 2+ chunk size based on total server rate. --- bin/pt-table-checksum | 164 +++++++++++++++++++++++++----------------- 1 file changed, 97 insertions(+), 67 deletions(-) diff --git a/bin/pt-table-checksum b/bin/pt-table-checksum index b468f4c1..89f89997 100755 --- a/bin/pt-table-checksum +++ b/bin/pt-table-checksum @@ -4597,15 +4597,15 @@ use constant MKDEBUG => $ENV{MKDEBUG} || 0; sub new { my ( $class, %args ) = @_; - my @required_args = qw(initial_n initial_t target_t); + my @required_args = qw(target_t); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } my $self = { %args, - avg_n => $args{initial_n}, - avg_t => $args{initial_t}, + avg_n => 0, + avg_t => 0, weight => $args{weight} || 0.75, }; @@ -4616,10 +4616,18 @@ sub update { my ($self, $n, $t) = @_; MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's'); - $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n; - $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t; - $self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; - MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s'); + if ( $self->{avg_n} && $self->{avg_t} ) { + $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n; + $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t; + $self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; + MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s'); + } + else { + $self->{avg_n} = $n; + $self->{avg_t} = $t; + $self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; + MKDEBUG && _d('Initial avg rate:', $self->{avg_rate}, 'n/s'); + } my $new_n = int($self->{avg_rate} * $self->{target_t}); MKDEBUG && _d('Adjust n to', $new_n); @@ -4901,6 +4909,8 @@ sub main { # Make a ReplicaLagWaiter to help wait for slaves after each chunk. # ######################################################################## my $sleep = sub { + # Don't let the master dbh die while waiting for slaves because we + # may wait a very long time for slaves. $dbh->do("SELECT 'pt-table-checksum keepalive'"); sleep $o->get('check-interval'); return; @@ -4915,18 +4925,32 @@ sub main { ); # ######################################################################## - # Callbacks for the nibble iterator. + # Variables for adjusting chunk size for each table and chunk. + # ######################################################################## + my $total_rows = 0; + my $total_time = 0; + my $total_rate = 0; + my $chunk_size = $o->get('chunk-size'); + my $chunk_time = $o->get('chunk-time'); + + # ######################################################################## + # Callbacks for each table's nibble iterator. All checksum work is done + # in these callbacks and the subs that they call. # ######################################################################## my $callbacks = { exec_nibble => sub { my (%args) = @_; my $tbl = $args{tbl}; $tbl->{checksum_results}->{n_chunks}++; - + # Check if the chunk is too large. If yes, then return 0 to # skip this chunk and get fetch the next boundary. - if ( $o->get('chunk-size-limit') - && is_oversize_chunk(%args, %common_modules) ) { + my $is_oversize = is_oversize_chunk( + %args, + chunk_size => $chunk_size, + chunk_size_limit => $o->get('chunk-size-limit'), + ); + if ( $is_oversize ) { MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table', "$tbl->{db}.$tbl->{tbl}", 'is too large'); $tbl->{checksum_results}->{skipped}++; @@ -4957,12 +4981,20 @@ sub main { $tbl->{checksum_results}->{n_rows} += $cnt || 0; $update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno}); - # Adjust chunk size. Nibble time will be 0 if this chunk was skipped. + # Update the rate of rows per second for the entire server. + # This is used for the initial chunk size of the next table. + $total_rows += $cnt; + $total_time += $tbl->{nibble_time}; + $total_rate = int($total_rows / $total_time); + MKDEBUG && _d('Total avg rate:', $total_rate); + + # Adjust chunk size. This affects the next chunk. Nibble time + # will be 0 if this chunk was skipped. if ( $o->get('chunk-time') && $tbl->{nibble_time} ) { - $tbl->{chunk_size} = $tbl->{rate}->update($cnt, $tbl->{nibble_time}); - if ( $tbl->{chunk_size} < 1 ) { - # This shouldn't happen, but we must know if it does. And - # chunk size can't be set less than 1. + $chunk_size = $tbl->{rate}->update($cnt, $tbl->{nibble_time}); + if ( $chunk_size < 1 ) { + # This shouldn't happen. WeightedAvgRate::update() may return + # a value < 1, but minimum chunk size is 1. warn "Checksums are executing very slowly. --chunk-size has " . "been automatically reduced to 1. Check that the server " . "is not being overloaded, or increase --chunk-time. " @@ -4970,12 +5002,12 @@ sub main { . "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took " . sprintf('%.3f', $tbl->{nibble_time}) . " seconds to " . "execute.\n"; - $tbl->{chunk_size} = 1; + $chunk_size = 1; } - $args{NibbleIterator}->set_chunk_size($tbl->{chunk_size}); + $args{NibbleIterator}->set_chunk_size($chunk_size); } - # Wait for slaves to catch up. + # Wait forever for slaves to catch up. my $pr; if ( $o->get('progress') ) { $pr = new Progress( @@ -4988,7 +5020,7 @@ sub main { return; }, - done => sub { + done => sub { # done nibbling table my (%args) = @_; return print_checksum_results(%args); }, @@ -5006,50 +5038,46 @@ sub main { TABLE: while ( $oktorun && (my $tbl = $schema_iter->next_schema_object()) ) { eval { - use_repl_db( - dbh => $dbh, - tbl => $tbl, - repl_table => $repl_table, - %common_modules - ); - # Results, stats, and info related to checksuming this table can # be saved here. print_checksum_results() uses this info. $tbl->{checksum_results} = {}; + use_repl_db( + dbh => $dbh, + tbl => $tbl, + repl_table => $repl_table, + OptionParser => $o, + Quoter => $q, + ); + my $checksum_cols = $rc->make_chunk_checksum( dbh => $dbh, tbl => $tbl, %crc_args ); my $nibble_iter = new NibbleIterator( - dbh => $dbh, - tbl => $tbl, - chunk_size => $o->get('chunk-size'), - chunk_index => $o->get('chunk-index'), - dms => $checksum_dms, - select => $checksum_cols, - callbacks => $callbacks, - %common_modules, + dbh => $dbh, + tbl => $tbl, + chunk_size => $total_rate ? int($total_rate * $chunk_time) + : $o->get('chunk-size'), + chunk_index => $o->get('chunk-index'), + dms => $checksum_dms, + select => $checksum_cols, + callbacks => $callbacks, + OptionParser => $o, + Quoter => $q, + TableNibbler => $tn, + TableParser => $tp, ); - # Init a new weighted avg rate calculator for the table. This - # table may be really different from the previous. E.g., the - # prev may have been all INT cols--really fast--so chunk size - # was increased dramatically, but this table may have lots of - # BLOB cols--potentially really slow--so we want to start - # cautiously. - $tbl->{chunk_size} = $o->get('chunk-size'); - $tbl->{rate} = new WeightedAvgRate( - initial_n => $o->get('chunk-size'), - initial_t => $o->get('chunk-time'), - target_t => $o->get('chunk-time'), - ); + # Init a new weighted avg rate calculator for the table. + $tbl->{rate} = new WeightedAvgRate(target_t => $o->get('chunk-time')); # The "1 while" loop is necessary because we're executing REPLACE # statements which don't return rows and NibbleIterator only # returns if it has rows to return. So all the work is done via - # the callbacks. + # the callbacks. -- print_checksum_results(), which is called + # from the done callback, uses this start time. $tbl->{checksum_results}->{start_time} = time; 1 while $oktorun && $nibble_iter->next(); }; @@ -5059,6 +5087,7 @@ sub main { print_checksum_results(tbl => $tbl); } + # Update the tool's exit status. $exit_status |= 1 if $tbl->{checksum_results}->{errors}; } @@ -5288,6 +5317,9 @@ sub check_repl_table { # OptionParser - # Quoter - # +# Optional Arguments: +# tbl - Standard tbl hashref of table being checksummed +# # Returns: # Nothing or dies on error { @@ -5367,26 +5399,25 @@ sub create_repl_table { # Sub: is_oversize_chunk # Determine if the chunk is oversize. # -# Parameters: -# %args - Arguments -# # Required Arguments: -# * dbh - dbh -# * tbl - Tbl ref -# * sth - sth -# * lb - Lower boundary arrayref -# * ub - Upper boundary arrayref -# * OptionParser - +# * tbl - Standard tbl hashref +# * explain_sth - Sth to EXPLAIN the chunking query +# * lb - Arrayref with lower boundary values for explain_sth +# * ub - Arrayref with upper boundary values for explain_sth +# * chunk_size - Chunk size +# * chunk_size_limit - Chunk size limit # # Returns: -# True if EXPLAIN rows is >= chunk_size * limit, else false +# True if EXPLAIN rows is >= chunk-size * chunk-size-limit, else false sub is_oversize_chunk { my ( %args ) = @_; - my @required_args = qw(tbl explain_sth lb ub OptionParser); + my @required_args = qw(tbl explain_sth lb ub chunk_size chunk_size_limit); foreach my $arg ( @required_args ) { - die "I need a $arg argument" unless $args{$arg}; + die "I need a $arg argument" unless defined $args{$arg}; } - my ($tbl, $expl_sth, $lb, $ub, $o) = @args{@required_args}; + my ($tbl, $expl_sth, $lb, $ub, $chunk_size, $limit) = @args{@required_args}; + + return 0 if $limit == 0; # no limit, all chunk sizes allowed my $expl_res; eval { @@ -5396,15 +5427,14 @@ sub is_oversize_chunk { $expl_sth->finish(); }; if ( $EVAL_ERROR ) { - # This shouldn't happen in production but happens in testing because - # we chunk tables that don't actually exist. - warn "Failed to " . $expl_sth->{Statement} . ": $EVAL_ERROR"; - return 0; + # This shouldn't happen. + warn "Failed to " . $expl_sth->{Statement} . ": $EVAL_ERROR\n"; + $tbl->{checksum_results}->{errors}++; + return 0; # assume chunk size is ok } MKDEBUG && _d('EXPLAIN result:', Dumper($expl_res)); - return ($expl_res->{rows} || 0) - >= $tbl->{chunk_size} * $o->get('chunk-size-limit') ? 1 : 0; + return ($expl_res->{rows} || 0) >= $chunk_size * $limit ? 1 : 0; } sub print_inconsistent_tbls {