Implement ReplicaLagLimiter. Add --chunk-time.

This commit is contained in:
Daniel Nichter
2011-09-20 11:52:35 -06:00
parent 30dc37e3a5
commit bfb5b7ae6a

View File

@@ -122,6 +122,10 @@ sub parse {
}
}
if ( !$final_props{n} ) { # name
$final_props{n} = $self->as_string(\%final_props, [qw(h P S F)]);
}
return \%final_props;
}
@@ -141,12 +145,14 @@ sub parse_options {
sub as_string {
my ( $self, $dsn, $props ) = @_;
return $dsn unless ref $dsn;
my %allowed = $props ? map { $_=>1 } @$props : ();
my @keys = $props ? @$props : sort keys %$dsn;
return join(',',
map { "$_=" . ($_ eq 'p' ? '...' : $dsn->{$_}) }
grep { defined $dsn->{$_} && $self->{opts}->{$_} }
grep { !$props || $allowed{$_} }
sort keys %$dsn );
map { "$_=" . ($_ eq 'p' ? '...' : $dsn->{$_}) }
grep {
exists $self->{opts}->{$_}
&& exists $dsn->{$_}
&& defined $dsn->{$_}
} @keys);
}
sub usage {
@@ -1947,7 +1953,11 @@ sub get_table_status {
}
MKDEBUG && _d($sql, @params);
my $sth = $dbh->prepare($sql);
$sth->execute(@params);
eval { $sth->execute(@params); };
if ($EVAL_ERROR) {
MKDEBUG && _d($EVAL_ERROR);
return;
}
my @tables = @{$sth->fetchall_arrayref({})};
@tables = map {
my %tbl; # Make a copy with lowercased keys
@@ -3361,15 +3371,16 @@ sub new {
. " /*last upper boundary*/";
MKDEBUG && _d('Last upper boundary statement:', $last_ub_sql);
my $ub_sql = _make_ub_sql(
cols => $asc->{scols},
from => $from,
where => $asc->{boundaries}->{'>='}
. ($args{where} ? " AND ($args{where})" : ''),
order_by => $order_by,
limit => $o->get('chunk-size'),
Quoter => $q,
);
my $ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='}
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " LIMIT ?, 2"
. " /*upper boundary*/";
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
@@ -3415,12 +3426,16 @@ sub new {
. " /*explain one nibble*/";
MKDEBUG && _d('Explain one nibble statement:', $explain_one_nibble_sql);
my $limit = $o->get('chunk-size') - 1;
MKDEBUG && _d('Initial chunk size (LIMIT):', $limit);
my $self = {
%args,
asc => $asc,
index => $index,
from => $from,
order_by => $order_by,
limit => $limit,
first_lb_sql => $first_lb_sql,
last_ub_sql => $last_ub_sql,
ub_sql => $ub_sql,
@@ -3456,13 +3471,14 @@ sub next {
join(', ', (@{$self->{lb}}, @{$self->{ub}})));
if ( my $callback = $self->{callbacks}->{exec_nibble} ) {
$self->{have_rows} = $callback->(
dbh => $self->{dbh},
tbl => $self->{tbl},
sth => $self->{nibble_sth},
lb => $self->{lb},
ub => $self->{ub},
nibbleno => $self->{nibbleno},
explain_sth => $self->{explain_sth},
dbh => $self->{dbh},
tbl => $self->{tbl},
sth => $self->{nibble_sth},
lb => $self->{lb},
ub => $self->{ub},
nibbleno => $self->{nibbleno},
explain_sth => $self->{explain_sth},
NibbleIterator => $self,
);
}
else {
@@ -3484,10 +3500,11 @@ sub next {
MKDEBUG && _d('No rows in nibble or nibble skipped');
if ( my $callback = $self->{callbacks}->{after_nibble} ) {
$callback->(
dbh => $self->{dbh},
tbl => $self->{tbl},
nibbleno => $self->{nibbleno},
explain_sth => $self->{explain_sth},
dbh => $self->{dbh},
tbl => $self->{tbl},
nibbleno => $self->{nibbleno},
explain_sth => $self->{explain_sth},
NibbleIterator => $self,
);
}
$self->{rowno} = 0;
@@ -3512,66 +3529,17 @@ sub nibble_number {
sub set_chunk_size {
my ($self, $limit) = @_;
MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit);
$self->{ub_sql} = _make_ub_sql(
cols => $self->{asc}->{scols},
from => $self->{from},
where => $self->{asc}->{boundaries}->{'>='}
. ($self->{where} ? " AND ($self->{where})" : ''),
order_by => $self->{order_by},
limit => $limit,
Quoter => $self->{Quoter},
);
if ($self->{ub_sth}) {
$self->{ub_sth}->finish();
$self->{ub_sth} = undef;
}
$self->_prepare_sths();
$self->{limit} = $limit - 1;
return;
}
sub _make_ub_sql {
my (%args) = @_;
my @required_args = qw(cols from where order_by limit Quoter);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($cols, $from, $where, $order_by, $limit, $q) = @args{@required_args};
my $ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$cols})
. " FROM $from"
. " WHERE $where"
. " ORDER BY $order_by"
. " LIMIT 2 OFFSET " . ((int($limit) || 1) - 1)
. " /*upper boundary*/";
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
return $ub_sql;
}
sub _can_nibble_once {
my ($self) = @_;
my ($dbh, $tbl, $q) = @{$self}{qw(dbh tbl Quoter)};
my $table_status;
eval {
my $sql = "SHOW TABLE STATUS FROM " . $q->quote($tbl->{db})
. " LIKE " . $q->literal_like($tbl->{tbl});
MKDEBUG && _d($sql);
$table_status = $dbh->selectrow_hashref($sql);
MKDEBUG && _d('Table status:', Dumper($table_status));
};
if ( $EVAL_ERROR ) {
warn $EVAL_ERROR;
return 0;
}
my $n_rows = defined $table_status->{Rows} ? $table_status->{Rows}
: defined $table_status->{rows} ? $table_status->{rows}
: 0;
my $chunk_size = $self->{OptionParser}->get('chunk-size') || 1;
$self->{one_nibble} = $n_rows <= $chunk_size ? 1 : 0;
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
my ($table_status) = $tp->get_table_status($dbh, $tbl->{db}, $tbl->{tbl});
my $n_rows = $table_status->{rows} || 0;
my $chunk_size = $self->{OptionParser}->get('chunk-size') || 1;
$self->{one_nibble} = $n_rows <= $chunk_size ? 1 : 0;
MKDEBUG && _d('One nibble:', $self->{one_nibble} ? 'yes' : 'no');
return $self->{one_nibble};
}
@@ -3648,8 +3616,8 @@ sub _next_boundaries {
$self->{lb} = $self->{next_lb};
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
join(', ', @{$self->{lb}}));
$self->{ub_sth}->execute(@{$self->{lb}});
join(', ', @{$self->{lb}}), $self->{limit});
$self->{ub_sth}->execute(@{$self->{lb}}, $self->{limit});
my $boundary = $self->{ub_sth}->fetchall_arrayref();
MKDEBUG && _d('Next boundary:', Dumper($boundary));
if ( $boundary && @$boundary ) {
@@ -4494,6 +4462,174 @@ sub _d {
# End Progress package
# ###########################################################################
# ###########################################################################
# ReplicaLagLimiter package
# This package is a copy without comments from the original. The original
# with comments and its test file can be found in the Bazaar repository at,
# lib/ReplicaLagLimiter.pm
# t/lib/ReplicaLagLimiter.t
# See https://launchpad.net/percona-toolkit for more information.
# ###########################################################################
{
package ReplicaLagLimiter;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
use Time::HiRes qw(sleep time);
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(spec slaves get_lag initial_n initial_t target_t);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my ($spec) = @args{@required_args};
my %specs = map {
my ($key, $val) = split '=', $_;
MKDEBUG && _d($key, '=', $val);
lc($key) => $val;
} @$spec;
my $self = {
max => 1, # max slave lag
timeout => 3600, # max time to wait for all slaves to catch up
check => 1, # sleep time between checking slave lag
continue => 'no', # return true even if timeout
%specs, # slave wait specs from caller
slaves => $args{slaves},
get_lag => $args{get_lag},
avg_n => $args{initial_n},
avg_t => $args{initial_t},
target_t => $args{target_t},
weight => $args{weight} || 0.75,
};
return bless $self, $class;
}
sub validate_spec {
shift @_ if $_[0] eq 'ReplicaLagLimiter';
my ( $spec ) = @_;
if ( @$spec == 0 ) {
die "spec array requires at least a max value\n";
}
my $have_max;
foreach my $op ( @$spec ) {
my ($key, $val) = split '=', $op;
if ( !$key || !$val ) {
die "invalid spec format, should be option=value: $op\n";
}
if ( $key !~ m/(?:max|timeout|continue)/i ) {
die "unknown option in spec: $op\n";
}
if ( $key ne 'continue' && $val !~ m/^\d+$/ ) {
die "value must be an integer: $op\n";
}
if ( $key eq 'continue' && $val !~ m/(?:yes|no)/i ) {
die "value for $key must be \"yes\" or \"no\"\n";
}
$have_max = 1 if $key eq 'max';
}
if ( !$have_max ) {
die "max must be specified"
}
return 1;
}
sub update {
my ($self, $n, $t) = @_;
MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's');
$self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n;
$self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t;
$self->{avg_rate} = $self->{avg_n} / $self->{avg_t};
MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s');
my $new_n = int($self->{avg_rate} * $self->{target_t});
MKDEBUG && _d('Adjust n to', $new_n);
return $new_n;
}
sub wait {
my ( $self, %args ) = @_;
my @required_args = qw();
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $pr = $args{Progres};
my $get_lag = $self->{get_lag};
my $slaves = $self->{slaves};
my $n_slaves = @$slaves;
my $pr_callback;
if ( $pr ) {
my $reported = 0;
$pr_callback = sub {
my ($fraction, $elapsed, $remaining, $eta, $slave_no) = @_;
if ( !$reported ) {
print STDERR "Waiting for replica "
. ($slaves->[$slave_no]->{dsn}->{n} || '')
. " to catch up...\n";
$reported = 1;
}
else {
print STDERR "Still waiting ($elapsed seconds)...\n";
}
return;
};
$pr->set_callback($pr_callback);
}
my ($max, $check, $timeout) = @{$self}{qw(max check timeout)};
my $slave_no = 0;
my $slave = $slaves->[$slave_no];
my $t_start = time;
while ($slave && time - $t_start < $timeout) {
MKDEBUG && _d('Checking slave lag on', $slave->{dsn}->{n});
my $lag = $get_lag->($slave->{dbh});
if ( !defined $lag || $lag > $max ) {
MKDEBUG && _d('Replica lag', $lag, '>', $max, '; sleeping', $check);
$pr->update(sub { return $slave_no; }) if $pr;
sleep $check;
}
else {
MKDEBUG && _d('Replica ready, lag', $lag, '<=', $max);
$slave = $slaves->[++$slave_no];
}
}
if ( $slave_no < @$slaves ) {
if ( $self->{continue} eq 'no' ) {
die "Timeout waiting for replica " . $slaves->[$slave_no]->{dsn}->{n}
. " to catch up\n";
}
else {
MKDEBUG && _d('Some slave are not caught up');
return 0; # not ready
}
}
MKDEBUG && _d('All slaves caught up');
return 1; # ready
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End ReplicaLagLimiter package
# ###########################################################################
# ###########################################################################
# This is a combination of modules and programs in one -- a runnable module.
# http://www.perl.com/pub/a/2006/07/13/lightning-articles.html?page=last
@@ -4520,6 +4656,7 @@ sub main {
@ARGV = @_; # set global ARGV for this package
my $exit_status = 0;
my $oktorun = 1;
# ########################################################################
# Get configuration information.
@@ -4533,6 +4670,7 @@ sub main {
my $dp = $o->DSNParser();
$dp->prop('set-vars', $o->get('set-vars'));
if ( !$o->get('help') ) {
if ( !@ARGV ) {
$o->save_error("No host specified");
@@ -4555,6 +4693,12 @@ sub main {
$o->save_error("--progress $EVAL_ERROR");
}
}
eval { ReplicaLagLimiter::validate_spec($o->get('replica-lag')) };
if ($EVAL_ERROR) {
chomp $EVAL_ERROR;
$o->save_error("--replica-lag: $EVAL_ERROR");
}
}
$o->usage_or_errors();
@@ -4618,7 +4762,7 @@ sub main {
MKDEBUG && _d(scalar @$slaves, 'slaves found');
my $slave_lag_cxn;
if ( $o->get('replicat-lag-dsn') ) {
if ( $o->get('replica-lag-dsn') ) {
MKDEBUG && _d('Will use --replica-lag-dsn to check for slave lag');
# OptionParser can't auto-copy DSN vals from a cmd line DSN
# to an opt DSN, so we copy them manually.
@@ -4638,11 +4782,13 @@ sub main {
# ########################################################################
# Make a lag limiter to help adjust chunk size and wait for slaves.
# ########################################################################
my $lag_limiter = new SlaveLagLimiter(
target_time => 0.5,
spec => $o->get('replica-lag'),
slaves => $slave_lag_cxn,
get_lag => sub { return $ms->get_slave_lag(@_) },
my $lag_limiter = new ReplicaLagLimiter(
initial_n => $o->get('chunk-size'),
initial_t => $o->get('chunk-time'),
target_t => $o->get('chunk-time'),
spec => $o->get('replica-lag'),
slaves => $slave_lag_cxn,
get_lag => sub { return $ms->get_slave_lag(@_) },
);
# ########################################################################
@@ -4759,43 +4905,72 @@ sub main {
# ########################################################################
# Callbacks for the nibble iterator.
# ########################################################################
my $callbacks = {
my $nibble_time = 0;
my $callbacks = {
exec_nibble => sub {
my (%args) = @_;
my $tbl = $args{tbl};
$tbl->{checksum_results}->{n_chunks}++;
# First, check if the chunk is too large.
# Check if the chunk is too large. If yes, then return 0 to
# skip this chunk and get fetch the next boundary.
if ( $o->get('chunk-size-limit')
&& is_oversize_chunk(%args, %common_modules) ) {
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
"$tbl->{db}.$tbl->{tbl}", 'is too large');
$tbl->{checksum_results}->{skipped}++;
$nibble_time = 0;
return 0; # next boundary
}
# Exec and time the chunk checksum query. If it fails, retry.
# Should return 0 rows which will fetch the next boundary.
my $t_start = time;
my $rows = exec_nibble(
my $rows = exec_nibble(
%args,
%common_modules,
);
my $t_total = time - $t_start;
my $adjust = $lag_limiter->update($t_total);
MKDEBUG && _d('Checksum time:', $t_total, 'adjust:', $adjust);
if ( $adjust == -1 ) {
# Checksum took longer than target time; decrease chunk size.
}
elsif ( $adjust == 1 ) {
# Checksum took less than target time; increase chunk size.
}
$nibble_time = time - $t_start;
return $rows;
},
after_nibble => sub {
my (%args) = @_;
my $tbl = $args{tbl};
# Fetch the checksum that we just executed from the replicate table.
$fetch_sth->execute(@{$tbl}{qw(db tbl)}, $args{nibbleno});
my ($crc, $cnt) = $fetch_sth->fetchrow_array();
# We're working on the master, so update the checksum's master_cnt
# and master_crc.
$tbl->{checksum_results}->{n_rows} += $cnt || 0;
$update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno});
# Adjust chunk size. $nibble_time will be 0 if this chunk was skipped.
if ( $o->get('chunk-time') && $nibble_time ) {
my $new_chunk_size = $lag_limiter->update($cnt, $nibble_time);
if ( $new_chunk_size < 1 ) {
# This shouldn't happen, but we must know if it does. And
# chunk size can't be set less than 1.
warn "Checksums are executing very slowly. --chunk-size has "
. "been automatically reduced to 1. Check that the server "
. "is not being overloaded, or increase --chunk-time.\n\n"
. "The last chunk, number $args{nibbleno} of table "
. "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took "
. sprintf('%.3f', $nibble_time) . " seconds to execute.\n";
$new_chunk_size = 1;
}
$args{NibbleIterator}->set_chunk_size($new_chunk_size);
# Don't use a var like $nibble_time because various modules and
# subs access $o->get('chunk-size') directly. This also means
# that new NibbleIterators for new tables inherit the adjusted
# chunk size from all previous tables. I.e., in effect, all
# tables are single stream of rows.
$o->set('chunk-size', $new_chunk_size);
}
# Wait for slaves to catch up.
my $pr;
if ( $o->get('progress') ) {
$pr = new Progress(
@@ -4805,8 +4980,18 @@ sub main {
. " to catch up",
);
}
if (!$lag_limiter->wait() ) {
warn "Slaves did not catchup";
my $caught_up;
eval {
$caught_up = $lag_limiter->wait();
};
if ( $EVAL_ERROR ) { # slaves didn't catch up and continue=no.
$tbl->{checksum_results}->{errors}++;
warn $EVAL_ERROR;
$oktorun = 0;
}
elsif ( !$caught_up ) {
warn "Some replicas are lagging, but checksumming will "
. "continue because --replica-lag continue=yes.\n";
}
return;
@@ -4827,7 +5012,7 @@ sub main {
);
TABLE:
while ( my $tbl = $schema_iter->next_schema_object() ) {
while ( $oktorun && (my $tbl = $schema_iter->next_schema_object()) ) {
eval {
use_repl_db(
dbh => $dbh,
@@ -4854,8 +5039,12 @@ sub main {
%common_modules,
);
# The "1 while" loop is necessary because we're executing REPLACE
# statements which don't return rows and NibbleIterator only
# returns if it has rows to return. So all the work is done via
# the callbacks.
$tbl->{checksum_results}->{start_time} = time;
1 while $nibble_iter->next();
1 while $oktorun && $nibble_iter->next();
};
if ($EVAL_ERROR) {
warn "Error checksumming $tbl->{db}.$tbl->{tbl}: $EVAL_ERROR\n";
@@ -5639,6 +5828,12 @@ they are oversize, you might want to specify a value larger than 2.
You can disable oversize chunk checking by specifying L<"--chunk-size-limit"> 0.
=item --chunk-time
type: float; default: 0.5
Taget time for each chunk. Set to 0 to disable.
=item --columns
short form: -c; type: array; group: Filter
@@ -5674,7 +5869,7 @@ Only checksum this comma-separated list of databases.
=item --databases-regex
type: string
type: string; group: Filter
Only checksum databases whose names match this Perl regex.
@@ -5755,7 +5950,7 @@ Ignore this comma-separated list of databases.
=item --ignore-databases-regex
type: string
type: string; group: Filter
Ignore databases whose names match this Perl regex.
@@ -5775,7 +5970,7 @@ Table names may be qualified with the database name.
=item --ignore-tables-regex
type: string
type: string; group: Filter
Ignore tables whose names match the Perl regex.
@@ -5852,7 +6047,7 @@ Re-checksum chunks that L<"--replicate-check"> found to be different.
=item --recurse
type: int; group: Throttle
type: int
Number of levels to recurse in the hierarchy when discovering slaves.
Default is infinite.
@@ -5894,7 +6089,7 @@ ordered by C<id>, but C<id> and C<parent_id> are otherwise ignored.
=item --replica-lag
type: string; default: max=1,timeout=3600,continue=no; group: Throttle
type: array; default: max=1,timeout=3600,continue=no
Limit lag on replicas to C<max> seconds. After each checksum, the tool
checks all replica servers, or just the L<"--replica-lag-dsn"> if
@@ -5905,7 +6100,7 @@ lag again after the next checksum.
=item --replica-lag-dsn
type: DSN; group: Throttle
type: DSN
Check L<"--replica-lag"> only on this replica. If not specified, all replicas
will be checked.
@@ -6073,7 +6268,7 @@ Table names may be qualified with the database name.
=item --tables-regex
type: string
type: string; group: Filter
Only checksum tables whose names match this Perl regex.