Init talbe 2+ chunk size based on total server rate.

This commit is contained in:
Daniel Nichter
2011-09-22 10:13:44 -06:00
parent bbae1ba9cf
commit 380cf4b0bd

View File

@@ -4597,15 +4597,15 @@ use constant MKDEBUG => $ENV{MKDEBUG} || 0;
sub new { sub new {
my ( $class, %args ) = @_; my ( $class, %args ) = @_;
my @required_args = qw(initial_n initial_t target_t); my @required_args = qw(target_t);
foreach my $arg ( @required_args ) { foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg}; die "I need a $arg argument" unless defined $args{$arg};
} }
my $self = { my $self = {
%args, %args,
avg_n => $args{initial_n}, avg_n => 0,
avg_t => $args{initial_t}, avg_t => 0,
weight => $args{weight} || 0.75, weight => $args{weight} || 0.75,
}; };
@@ -4616,10 +4616,18 @@ sub update {
my ($self, $n, $t) = @_; my ($self, $n, $t) = @_;
MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's'); MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's');
$self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n; if ( $self->{avg_n} && $self->{avg_t} ) {
$self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t; $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n;
$self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t;
MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s'); $self->{avg_rate} = $self->{avg_n} / $self->{avg_t};
MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s');
}
else {
$self->{avg_n} = $n;
$self->{avg_t} = $t;
$self->{avg_rate} = $self->{avg_n} / $self->{avg_t};
MKDEBUG && _d('Initial avg rate:', $self->{avg_rate}, 'n/s');
}
my $new_n = int($self->{avg_rate} * $self->{target_t}); my $new_n = int($self->{avg_rate} * $self->{target_t});
MKDEBUG && _d('Adjust n to', $new_n); MKDEBUG && _d('Adjust n to', $new_n);
@@ -4901,6 +4909,8 @@ sub main {
# Make a ReplicaLagWaiter to help wait for slaves after each chunk. # Make a ReplicaLagWaiter to help wait for slaves after each chunk.
# ######################################################################## # ########################################################################
my $sleep = sub { my $sleep = sub {
# Don't let the master dbh die while waiting for slaves because we
# may wait a very long time for slaves.
$dbh->do("SELECT 'pt-table-checksum keepalive'"); $dbh->do("SELECT 'pt-table-checksum keepalive'");
sleep $o->get('check-interval'); sleep $o->get('check-interval');
return; return;
@@ -4915,7 +4925,17 @@ sub main {
); );
# ######################################################################## # ########################################################################
# Callbacks for the nibble iterator. # Variables for adjusting chunk size for each table and chunk.
# ########################################################################
my $total_rows = 0;
my $total_time = 0;
my $total_rate = 0;
my $chunk_size = $o->get('chunk-size');
my $chunk_time = $o->get('chunk-time');
# ########################################################################
# Callbacks for each table's nibble iterator. All checksum work is done
# in these callbacks and the subs that they call.
# ######################################################################## # ########################################################################
my $callbacks = { my $callbacks = {
exec_nibble => sub { exec_nibble => sub {
@@ -4925,8 +4945,12 @@ sub main {
# Check if the chunk is too large. If yes, then return 0 to # Check if the chunk is too large. If yes, then return 0 to
# skip this chunk and get fetch the next boundary. # skip this chunk and get fetch the next boundary.
if ( $o->get('chunk-size-limit') my $is_oversize = is_oversize_chunk(
&& is_oversize_chunk(%args, %common_modules) ) { %args,
chunk_size => $chunk_size,
chunk_size_limit => $o->get('chunk-size-limit'),
);
if ( $is_oversize ) {
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table', MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
"$tbl->{db}.$tbl->{tbl}", 'is too large'); "$tbl->{db}.$tbl->{tbl}", 'is too large');
$tbl->{checksum_results}->{skipped}++; $tbl->{checksum_results}->{skipped}++;
@@ -4957,12 +4981,20 @@ sub main {
$tbl->{checksum_results}->{n_rows} += $cnt || 0; $tbl->{checksum_results}->{n_rows} += $cnt || 0;
$update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno}); $update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno});
# Adjust chunk size. Nibble time will be 0 if this chunk was skipped. # Update the rate of rows per second for the entire server.
# This is used for the initial chunk size of the next table.
$total_rows += $cnt;
$total_time += $tbl->{nibble_time};
$total_rate = int($total_rows / $total_time);
MKDEBUG && _d('Total avg rate:', $total_rate);
# Adjust chunk size. This affects the next chunk. Nibble time
# will be 0 if this chunk was skipped.
if ( $o->get('chunk-time') && $tbl->{nibble_time} ) { if ( $o->get('chunk-time') && $tbl->{nibble_time} ) {
$tbl->{chunk_size} = $tbl->{rate}->update($cnt, $tbl->{nibble_time}); $chunk_size = $tbl->{rate}->update($cnt, $tbl->{nibble_time});
if ( $tbl->{chunk_size} < 1 ) { if ( $chunk_size < 1 ) {
# This shouldn't happen, but we must know if it does. And # This shouldn't happen. WeightedAvgRate::update() may return
# chunk size can't be set less than 1. # a value < 1, but minimum chunk size is 1.
warn "Checksums are executing very slowly. --chunk-size has " warn "Checksums are executing very slowly. --chunk-size has "
. "been automatically reduced to 1. Check that the server " . "been automatically reduced to 1. Check that the server "
. "is not being overloaded, or increase --chunk-time. " . "is not being overloaded, or increase --chunk-time. "
@@ -4970,12 +5002,12 @@ sub main {
. "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took " . "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took "
. sprintf('%.3f', $tbl->{nibble_time}) . " seconds to " . sprintf('%.3f', $tbl->{nibble_time}) . " seconds to "
. "execute.\n"; . "execute.\n";
$tbl->{chunk_size} = 1; $chunk_size = 1;
} }
$args{NibbleIterator}->set_chunk_size($tbl->{chunk_size}); $args{NibbleIterator}->set_chunk_size($chunk_size);
} }
# Wait for slaves to catch up. # Wait forever for slaves to catch up.
my $pr; my $pr;
if ( $o->get('progress') ) { if ( $o->get('progress') ) {
$pr = new Progress( $pr = new Progress(
@@ -4988,7 +5020,7 @@ sub main {
return; return;
}, },
done => sub { done => sub { # done nibbling table
my (%args) = @_; my (%args) = @_;
return print_checksum_results(%args); return print_checksum_results(%args);
}, },
@@ -5006,50 +5038,46 @@ sub main {
TABLE: TABLE:
while ( $oktorun && (my $tbl = $schema_iter->next_schema_object()) ) { while ( $oktorun && (my $tbl = $schema_iter->next_schema_object()) ) {
eval { eval {
use_repl_db(
dbh => $dbh,
tbl => $tbl,
repl_table => $repl_table,
%common_modules
);
# Results, stats, and info related to checksuming this table can # Results, stats, and info related to checksuming this table can
# be saved here. print_checksum_results() uses this info. # be saved here. print_checksum_results() uses this info.
$tbl->{checksum_results} = {}; $tbl->{checksum_results} = {};
use_repl_db(
dbh => $dbh,
tbl => $tbl,
repl_table => $repl_table,
OptionParser => $o,
Quoter => $q,
);
my $checksum_cols = $rc->make_chunk_checksum( my $checksum_cols = $rc->make_chunk_checksum(
dbh => $dbh, dbh => $dbh,
tbl => $tbl, tbl => $tbl,
%crc_args %crc_args
); );
my $nibble_iter = new NibbleIterator( my $nibble_iter = new NibbleIterator(
dbh => $dbh, dbh => $dbh,
tbl => $tbl, tbl => $tbl,
chunk_size => $o->get('chunk-size'), chunk_size => $total_rate ? int($total_rate * $chunk_time)
chunk_index => $o->get('chunk-index'), : $o->get('chunk-size'),
dms => $checksum_dms, chunk_index => $o->get('chunk-index'),
select => $checksum_cols, dms => $checksum_dms,
callbacks => $callbacks, select => $checksum_cols,
%common_modules, callbacks => $callbacks,
OptionParser => $o,
Quoter => $q,
TableNibbler => $tn,
TableParser => $tp,
); );
# Init a new weighted avg rate calculator for the table. This # Init a new weighted avg rate calculator for the table.
# table may be really different from the previous. E.g., the $tbl->{rate} = new WeightedAvgRate(target_t => $o->get('chunk-time'));
# prev may have been all INT cols--really fast--so chunk size
# was increased dramatically, but this table may have lots of
# BLOB cols--potentially really slow--so we want to start
# cautiously.
$tbl->{chunk_size} = $o->get('chunk-size');
$tbl->{rate} = new WeightedAvgRate(
initial_n => $o->get('chunk-size'),
initial_t => $o->get('chunk-time'),
target_t => $o->get('chunk-time'),
);
# The "1 while" loop is necessary because we're executing REPLACE # The "1 while" loop is necessary because we're executing REPLACE
# statements which don't return rows and NibbleIterator only # statements which don't return rows and NibbleIterator only
# returns if it has rows to return. So all the work is done via # returns if it has rows to return. So all the work is done via
# the callbacks. # the callbacks. -- print_checksum_results(), which is called
# from the done callback, uses this start time.
$tbl->{checksum_results}->{start_time} = time; $tbl->{checksum_results}->{start_time} = time;
1 while $oktorun && $nibble_iter->next(); 1 while $oktorun && $nibble_iter->next();
}; };
@@ -5059,6 +5087,7 @@ sub main {
print_checksum_results(tbl => $tbl); print_checksum_results(tbl => $tbl);
} }
# Update the tool's exit status.
$exit_status |= 1 if $tbl->{checksum_results}->{errors}; $exit_status |= 1 if $tbl->{checksum_results}->{errors};
} }
@@ -5288,6 +5317,9 @@ sub check_repl_table {
# OptionParser - <OptionParser> # OptionParser - <OptionParser>
# Quoter - <Quoter> # Quoter - <Quoter>
# #
# Optional Arguments:
# tbl - Standard tbl hashref of table being checksummed
#
# Returns: # Returns:
# Nothing or dies on error # Nothing or dies on error
{ {
@@ -5367,26 +5399,25 @@ sub create_repl_table {
# Sub: is_oversize_chunk # Sub: is_oversize_chunk
# Determine if the chunk is oversize. # Determine if the chunk is oversize.
# #
# Parameters:
# %args - Arguments
#
# Required Arguments: # Required Arguments:
# * dbh - dbh # * tbl - Standard tbl hashref
# * tbl - Tbl ref # * explain_sth - Sth to EXPLAIN the chunking query
# * sth - sth # * lb - Arrayref with lower boundary values for explain_sth
# * lb - Lower boundary arrayref # * ub - Arrayref with upper boundary values for explain_sth
# * ub - Upper boundary arrayref # * chunk_size - Chunk size
# * OptionParser - <OptionParser> # * chunk_size_limit - Chunk size limit
# #
# Returns: # Returns:
# True if EXPLAIN rows is >= chunk_size * limit, else false # True if EXPLAIN rows is >= chunk-size * chunk-size-limit, else false
sub is_oversize_chunk { sub is_oversize_chunk {
my ( %args ) = @_; my ( %args ) = @_;
my @required_args = qw(tbl explain_sth lb ub OptionParser); my @required_args = qw(tbl explain_sth lb ub chunk_size chunk_size_limit);
foreach my $arg ( @required_args ) { foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg}; die "I need a $arg argument" unless defined $args{$arg};
} }
my ($tbl, $expl_sth, $lb, $ub, $o) = @args{@required_args}; my ($tbl, $expl_sth, $lb, $ub, $chunk_size, $limit) = @args{@required_args};
return 0 if $limit == 0; # no limit, all chunk sizes allowed
my $expl_res; my $expl_res;
eval { eval {
@@ -5396,15 +5427,14 @@ sub is_oversize_chunk {
$expl_sth->finish(); $expl_sth->finish();
}; };
if ( $EVAL_ERROR ) { if ( $EVAL_ERROR ) {
# This shouldn't happen in production but happens in testing because # This shouldn't happen.
# we chunk tables that don't actually exist. warn "Failed to " . $expl_sth->{Statement} . ": $EVAL_ERROR\n";
warn "Failed to " . $expl_sth->{Statement} . ": $EVAL_ERROR"; $tbl->{checksum_results}->{errors}++;
return 0; return 0; # assume chunk size is ok
} }
MKDEBUG && _d('EXPLAIN result:', Dumper($expl_res)); MKDEBUG && _d('EXPLAIN result:', Dumper($expl_res));
return ($expl_res->{rows} || 0) return ($expl_res->{rows} || 0) >= $chunk_size * $limit ? 1 : 0;
>= $tbl->{chunk_size} * $o->get('chunk-size-limit') ? 1 : 0;
} }
sub print_inconsistent_tbls { sub print_inconsistent_tbls {