Create ReplicaLagLimiter. Replace --max-lag, --check-interval, and --check-slave-lag with --replica-lag and --replica-lag-dsn. Use TableParser::get_table_status() in NibbleItertor. Eval SHOW TABLE STATUS. Auto-add "n" (name) part to parsed DSNs.

This commit is contained in:
Daniel Nichter
2011-09-16 17:35:40 -06:00
parent 32ade00663
commit 006b93ddf9
6 changed files with 352 additions and 131 deletions

View File

@@ -4618,18 +4618,32 @@ sub main {
MKDEBUG && _d(scalar @$slaves, 'slaves found');
my $slave_lag_cxn;
if ( $o->get('check-slave-lag') ) {
MKDEBUG && _d('Will use --check-slave-lag DSN to check for slave lag');
if ( $o->get('replicat-lag-dsn') ) {
MKDEBUG && _d('Will use --replica-lag-dsn to check for slave lag');
# OptionParser can't auto-copy DSN vals from a cmd line DSN
# to an opt DSN, so we copy them manually.
my $dsn = $dp->copy($dsn, $o->get('check-slave-lag'));
my $dsn = $dp->copy($dsn, $o->get('replica-lag-dsn'));
my $dbh = get_cxn(
dsn => $dsn,
DSNParser => $dp,
OptionParser => $o,
);
$slave_lag_cxn = {dsn=>$dsn, dbh=>$dbh};
$slave_lag_cxn = [ {dsn=>$dsn, dbh=>$dbh} ];
}
else {
MKDEBUG && _d('Will check slave lag on all slaves');
$slave_lag_cxn = $slaves;
}
# ########################################################################
# Make a lag limiter to help adjust chunk size and wait for slaves.
# ########################################################################
my $lag_limiter = new SlaveLagLimiter(
target_time => 0.5,
spec => $o->get('replica-lag'),
slaves => $slave_lag_cxn,
get_lag => sub { return $ms->get_slave_lag(@_) },
);
# ########################################################################
# Check replication slaves if desired. If only --replicate-check is given,
@@ -4759,10 +4773,20 @@ sub main {
return 0; # next boundary
}
# Exec and time the chunk checksum query. If it fails, retry.
return exec_nibble(
my $t_start = time;
my $rows = exec_nibble(
%args,
%common_modules,
);
my $t_total = time - $t_start;
my $adjust = $lag_limiter->update($t_total);
MKDEBUG && _d('Checksum time:', $t_total, 'adjust:', $adjust);
if ( $adjust == -1 ) {
# Checksum took longer than target time; decrease chunk size.
}
elsif ( $adjust == 1 ) {
# Checksum took less than target time; increase chunk size.
}
},
after_nibble => sub {
my (%args) = @_;
@@ -4781,12 +4805,9 @@ sub main {
. " to catch up",
);
}
wait_for_slaves(
slaves => $slaves,
slave_lag_cxn => $slave_lag_cxn,
Progress => $pr,
%common_modules,
);
if (!$lag_limiter->wait() ) {
warn "Slaves did not catchup";
}
return;
},
@@ -5144,73 +5165,6 @@ sub create_repl_table {
return;
}
# Returns when Seconds_Behind_Master on all the given slaves
# is < max_lag, waits check_interval seconds between checks
# if a slave is lagging too much.
sub wait_for_slaves {
my ( %args ) = @_;
my @required_args = qw(Progress OptionParser DSNParser MasterSlave);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($pr, $o, $dp, $ms) = @args{@required_args};
my $slaves;
my $n_slaves;
if ( $args{slave_lag_cxn} ) {
push @$slaves, $args{slave_lag_cxn};
$n_slaves = 1;
}
elsif ( $args{slaves} ) {
$slaves = $args{slaves};
$n_slaves = scalar @$slaves;
}
else {
die "I need a slaves or slave_lag_cxn argument";
}
my $max_lag = $o->get('max-lag'),
my $check_interval = $o->get('check-interval'),
my $pr_callback;
if ( $pr ) {
# If you use the default Progress report callback, you'll need to
# to add Transformers.pm to this tool.
my $reported = 0;
$pr_callback = sub {
my ($fraction, $elapsed, $remaining, $eta, $slave_no) = @_;
if ( !$reported ) {
print STDERR "Waiting for " . ($n_slaves > 1 ? "slave" : "slave")
. " to catchup...\n";
$reported = 1;
}
else {
print STDERR "Still waiting ($elapsed seconds)...\n";
}
return;
};
$pr->set_callback($pr_callback);
}
for my $slave_no ( 0..($n_slaves-1) ) {
my $slave = $slaves->[$slave_no];
MKDEBUG && _d('Checking slave lag on', $dp->as_string($slave->{dsn}));
my $lag = $ms->get_slave_lag($slave->{dbh});
while ( !defined $lag || $lag > $max_lag ) {
MKDEBUG && _d('Slave lag', $lag, '>', $max_lag,
'; sleeping', $check_interval);
# Report what we're waiting for before we wait.
$pr->update(sub { return $slave_no; }) if $pr;
sleep $check_interval;
$lag = $ms->get_slave_lag($slave->{dbh});
}
MKDEBUG && _d('Slave ready, lag', $lag, '<=', $max_lag);
}
return;
}
# Sub: is_oversize_chunk
# Determine if the chunk is oversize.
#
@@ -5593,12 +5547,6 @@ group: Connection
Prompt for a password when connecting to MySQL.
=item --check-interval
type: time; group: Throttle; default: 1s
How often to check for slave lag if L<"--check-slave-lag"> is given.
=item --[no]check-replication-filters
default: yes; group: Safety
@@ -5612,12 +5560,6 @@ queries won't break replication or simply fail to replicate. If you are sure
that it's OK to run the checksum queries, you can negate this option to
disable the checks. See also L<"--replicate-database">.
=item --check-slave-lag
type: DSN; group: Throttle
Pause checksumming until the specified slave's lag is less than L<"--max-lag">.
=item --chunk-column
type: string
@@ -5837,22 +5779,6 @@ type: string
Ignore tables whose names match the Perl regex.
=item --max-lag
type: time; group: Throttle; default: 1s
Suspend checksumming if the slave given by L<"--check-slave-lag"> lags.
This option causes pt-table-checksum to look at the slave every time it's about
to checksum a chunk. If the slave's lag is greater than the option's value, or
if the slave isn't running (so its lag is NULL), pt-table-checksum sleeps for
L<"--check-interval"> seconds and then looks at the lag again. It repeats until
the slave is caught up, then proceeds to checksum the chunk.
This option is useful to let you checksum data as fast as the slaves can handle
it, assuming the slave you directed pt-table-checksum to monitor is
representative of all the slaves that may be replicating from this server.
=item --[no]optimize-xor
default: yes
@@ -5966,6 +5892,24 @@ t. The DSN table should have the following structure:
One row specifies one DSN in the C<dsn> column. Currently, the DSNs are
ordered by C<id>, but C<id> and C<parent_id> are otherwise ignored.
=item --replica-lag
type: string; default: max=1,timeout=3600,continue=no; group: Throttle
Limit lag on replicas to C<max> seconds. After each checksum, the tool
checks all replica servers, or just the L<"--replica-lag-dsn"> if
specified, and waits until the lag on all replicas is <= C<max>.
The tool waits up to C<timeout> seconds and if the lag is still too high,
it will exit if C<continue> is "no", or it will continue and check replica
lag again after the next checksum.
=item --replica-lag-dsn
type: DSN; group: Throttle
Check L<"--replica-lag"> only on this replica. If not specified, all replicas
will be checked.
=item --replicate
type: string; default: percona.checksums

View File

@@ -170,6 +170,10 @@ sub parse {
}
}
if ( !$final_props{n} ) { # name
$final_props{n} = $self->as_string(\%final_props, [qw(h P S F)]);
}
return \%final_props;
}
@@ -194,12 +198,14 @@ sub parse_options {
sub as_string {
my ( $self, $dsn, $props ) = @_;
return $dsn unless ref $dsn;
my %allowed = $props ? map { $_=>1 } @$props : ();
my @keys = $props ? @$props : sort keys %$dsn;
return join(',',
map { "$_=" . ($_ eq 'p' ? '...' : $dsn->{$_}) }
grep { defined $dsn->{$_} && $self->{opts}->{$_} }
grep { !$props || $allowed{$_} }
sort keys %$dsn );
map { "$_=" . ($_ eq 'p' ? '...' : $dsn->{$_}) }
grep {
exists $self->{opts}->{$_}
&& exists $dsn->{$_}
&& defined $dsn->{$_}
} @keys);
}
sub usage {

View File

@@ -301,24 +301,11 @@ sub _make_ub_sql {
sub _can_nibble_once {
my ($self) = @_;
my ($dbh, $tbl, $q) = @{$self}{qw(dbh tbl Quoter)};
my $table_status;
eval {
my $sql = "SHOW TABLE STATUS FROM " . $q->quote($tbl->{db})
. " LIKE " . $q->literal_like($tbl->{tbl});
MKDEBUG && _d($sql);
$table_status = $dbh->selectrow_hashref($sql);
MKDEBUG && _d('Table status:', Dumper($table_status));
};
if ( $EVAL_ERROR ) {
warn $EVAL_ERROR;
return 0;
}
my $n_rows = defined $table_status->{Rows} ? $table_status->{Rows}
: defined $table_status->{rows} ? $table_status->{rows}
: 0;
my $chunk_size = $self->{OptionParser}->get('chunk-size') || 1;
$self->{one_nibble} = $n_rows <= $chunk_size ? 1 : 0;
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
my ($table_status) = $tp->get_table_status($dbh, $tbl->{db}, $tbl->{tbl});
my $n_rows = $table_status->{rows} || 0;
my $chunk_size = $self->{OptionParser}->get('chunk-size') || 1;
$self->{one_nibble} = $n_rows <= $chunk_size ? 1 : 0;
MKDEBUG && _d('One nibble:', $self->{one_nibble} ? 'yes' : 'no');
return $self->{one_nibble};
}

201
lib/ReplicaLagLimiter.pm Normal file
View File

@@ -0,0 +1,201 @@
# This program is copyright 2011 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# ReplicaLagLimiter package
# ###########################################################################
{
# Package: ReplicaLagLimiter
# ReplicaLagLimiter helps limit slave lag when working on the master.
# There are two sides to this problem: operations on the master and
# slave lag. Master ops that replicate can affect slave lag, so they
# should be adjusted to prevent overloading slaves. <update()> returns
# and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on
# a moving average of how long operations are taking on the master.
# Regardless of that, slaves may still lag, so <wait()> waits for them
# to catchup based on the spec passed to <new()>.
package ReplicaLagLimiter;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(spec slaves get_lag);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($spec) = @args{@required_args};
my %specs = map {
my ($key, $val) = split '=', $_;
MKDEBUG && _d($key, '=', $val);
lc($key) => $val;
} @$spec;
my $self = {
target_time => 1, # optimal time for master ops
sample_size => 5, # number of master ops to use for moving average
max => 1, # max slave lag
timeout => 3600, # max time to wait for all slaves to catchup
check => 1, # sleep time between checking slave lag
continue => 'no', # return true even if timeout
%specs, # slave wait specs from caller
samples => [], # master op times
moving_avg => 0, # moving avgerge of samples
get_lag => $args{get_lag},
};
return bless $self, $class;
}
sub validate_spec {
# Permit calling as ReplicaLagLimiter-> or ReplicaLagLimiter::
shift @_ if $_[0] eq 'ReplicaLagLimiter';
my ( $spec ) = @_;
if ( @$spec == 0 ) {
die "spec array requires at least a max value\n";
}
my $have_max;
foreach my $op ( @$spec ) {
my ($key, $val) = split '=', $op;
if ( !$key ) {
die "invalid spec format, should be key=value: $spec\n";
}
if ( $key !~ m/(?:max|timeout|continue)/i ) {
die "invalid spec: $spec\n";
}
if ( !$val ) {
die "spec has no value: $spec\n";
}
if ( $key ne 'continue' && $val !~ m/^\d+$/ ) {
die "value must be an integer: $spec\n";
}
if ( $key eq 'continue' && $val !~ m/(?:yes|no)/i ) {
die "value for $key must be \"yes\" or \"no\"\n";
}
$have_max = 1 if $key eq 'max';
}
if ( !$have_max ) {
die "max must be specified"
}
}
sub update {
my ($self, $t) = @_;
MKDEBUG && _d('Sample time:', $t);
my $sample_size = $self->{sample_size};
my $samples = $self->{samples};
my $adjust = 0;
if ( @$samples == $sample_size ) {
shift @$samples;
push @$samples, $t;
my $sum = 0;
map { $sum += $_ } @$samples;
$self->{moving_avg} = $sum / $sample_size;
MKDEBUG && _d('Moving average:', $self->{moving_avg});
$adjust = $self->{target_time} <=> $self->{moving_avg};
}
else {
MKDEBUG && _d('Saving sample', @$samples + 1, 'of', $sample_size);
push @$samples, $t;
}
return $adjust;
}
# Sub: wait_for_slave
# Wait for Seconds_Behind_Master on all slaves to become < max.
#
# Optional Arguments:
# Progress - <Progress> object.
#
# Returns:
# True if all slaves caught up, else 0 (timeout)
sub wait {
my ( $self, %args ) = @_;
my @required_args = qw();
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $pr = $args{Progres};
my $get_lag = $self->{get_lag};
my $slaves = $self->{slaves};
my $n_slaves = @$slaves;
my $pr_callback;
if ( $pr ) {
# If you use the default Progress report callback, you'll need to
# to add Transformers.pm to this tool.
my $reported = 0;
$pr_callback = sub {
my ($fraction, $elapsed, $remaining, $eta, $slave_no) = @_;
if ( !$reported ) {
print STDERR "Waiting for replica "
. ($slaves->[$slave_no]->{dsn}->{n} || '')
. " to catchup...\n";
$reported = 1;
}
else {
print STDERR "Still waiting ($elapsed seconds)...\n";
}
return;
};
$pr->set_callback($pr_callback);
}
my ($max, $check, $timeout) = @{$self}{qw(max check timeout)};
my $slave_no = 0;
my $slave = $slaves->[$slave_no];
my $t_start = time;
while ($slave && time - $t_start < $timeout) {
MKDEBUG && _d('Checking slave lag on', $slave->{n});
my $lag = $get_lag->($slave->{dbh});
if ( !defined $lag || $lag > $max ) {
MKDEBUG && _d('Replica lag', $lag, '>', $max, '; sleeping', $check);
$pr->update(sub { return $slave_no; }) if $pr;
sleep $check;
}
else {
MKDEBUG && _d('Replica ready, lag', $lag, '<=', $max);
$slave = $slaves->[++$slave_no];
}
}
if ( $slave_no >= @$slave ) {
MKDEBUG && _d('Timeout waiting for', $slaves->[$slave_no]->{dsn}->{n});
return 0 unless $self->{continue};
}
MKDEBUG && _d('All slaves caught up');
return 1;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End ReplicaLagLimiter package
# ###########################################################################

View File

@@ -499,7 +499,11 @@ sub get_table_status {
}
MKDEBUG && _d($sql, @params);
my $sth = $dbh->prepare($sql);
$sth->execute(@params);
eval { $sth->execute(@params); };
if ($EVAL_ERROR) {
MKDEBUG && _d($EVAL_ERROR);
return;
}
my @tables = @{$sth->fetchall_arrayref({})};
@tables = map {
my %tbl; # Make a copy with lowercased keys

79
t/lib/ReplicaLagLimiter.t Normal file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/perl
BEGIN {
die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n"
unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH};
unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib";
};
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Test::More tests => 5;
use ReplicaLagLimiter;
use PerconaTest;
my $lag = 0;
sub get_lag {
my ($dbh) = @_;
return $lag;
}
my $sll = new ReplicaLagLimiter(
spec => [qw(max=1 timeout=3600 continue=no)],
slaves => [[]],
get_lag => \&get_lag,
);
for (1..4) {
$sll->update(1);
}
is(
$sll->update(1),
0,
"5 time samples, no adjustmenet"
);
for (1..4) {
$sll->update(1);
}
is(
$sll->update(1),
0,
"Moving avg hasn't changed"
);
$sll->update(2);
$sll->update(2);
$sll->update(2);
is(
$sll->update(2),
-1,
"Adjust down (moving avg = 1.8)"
);
$sll->update(0.3);
$sll->update(0.3);
is(
$sll->update(0.3),
1,
"Adjust up (moving avg = 0.98)"
);
# #############################################################################
# Done.
# #############################################################################
my $output = '';
{
local *STDERR;
open STDERR, '>', \$output;
$sll->_d('Complete test coverage');
}
like(
$output,
qr/Complete test coverage/,
'_d() works'
);
exit;