Decompose ReplicaLagLimiter into ReplicaLagWaiter and WeightedAvgRate. Don't use OptionParser in NibbleIterator; use chunk_size arg instead.

This commit is contained in:
Daniel Nichter
2011-09-21 11:19:49 -06:00
parent 31d6171355
commit f6fb8b44bd
7 changed files with 256 additions and 150 deletions

View File

@@ -4743,26 +4743,6 @@ sub main {
$slave_lag_cxn = $slaves; $slave_lag_cxn = $slaves;
} }
# ########################################################################
# Make a lag limiter to help adjust chunk size and wait for slaves.
# ########################################################################
my $sleep = sub {
$dbh->do("SELECT 'pt-table-checksum keepalive'");
sleep $o->get('check-interval');
return;
};
my $lag_limiter = new ReplicaLagLimiter(
oktorun => sub { return $oktorun },
get_lag => sub { return $ms->get_slave_lag(@_) },
sleep => $sleep,
max_lag => $o->get('max-lag'),
initial_n => $o->get('chunk-size'),
initial_t => $o->get('chunk-time'),
target_t => $o->get('chunk-time'),
slaves => $slave_lag_cxn,
);
# ######################################################################## # ########################################################################
# Check replication slaves if desired. If only --replicate-check is given, # Check replication slaves if desired. If only --replicate-check is given,
# then we will exit here. If --recheck is also given, then we'll continue # then we will exit here. If --recheck is also given, then we'll continue
@@ -4873,6 +4853,22 @@ sub main {
"UPDATE $repl_table SET master_crc = ?, master_cnt = ? " "UPDATE $repl_table SET master_crc = ?, master_cnt = ? "
. "WHERE db = ? AND tbl = ? AND chunk = ?"); . "WHERE db = ? AND tbl = ? AND chunk = ?");
# ########################################################################
# Make a ReplicaLagWaiter to help wait for slaves after each chunk.
# ########################################################################
my $sleep = sub {
$dbh->do("SELECT 'pt-table-checksum keepalive'");
sleep $o->get('check-interval');
return;
};
my $replica_lag = new ReplicaLagWaiter(
oktorun => sub { return $oktorun },
get_lag => sub { return $ms->get_slave_lag(@_) },
sleep => $sleep,
max_lag => $o->get('max-lag'),
slaves => $slave_lag_cxn,
);
# ######################################################################## # ########################################################################
# Callbacks for the nibble iterator. # Callbacks for the nibble iterator.
@@ -4891,7 +4887,7 @@ sub main {
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table', MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
"$tbl->{db}.$tbl->{tbl}", 'is too large'); "$tbl->{db}.$tbl->{tbl}", 'is too large');
$tbl->{checksum_results}->{skipped}++; $tbl->{checksum_results}->{skipped}++;
$nibble_time = 0; $tbl->{nibble_time} = 0;
return 0; # next boundary return 0; # next boundary
} }
@@ -4902,7 +4898,7 @@ sub main {
%args, %args,
%common_modules, %common_modules,
); );
$nibble_time = time - $t_start; $tbl->{nibble_time} = time - $t_start;
return $rows; return $rows;
}, },
after_nibble => sub { after_nibble => sub {
@@ -4918,9 +4914,9 @@ sub main {
$tbl->{checksum_results}->{n_rows} += $cnt || 0; $tbl->{checksum_results}->{n_rows} += $cnt || 0;
$update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno}); $update_sth->execute($crc, $cnt, @{$tbl}{qw(db tbl)}, $args{nibbleno});
# Adjust chunk size. $nibble_time will be 0 if this chunk was skipped. # Adjust chunk size. Nibble time will be 0 if this chunk was skipped.
if ( $o->get('chunk-time') && $nibble_time ) { if ( $o->get('chunk-time') && $tbl->{nibble_time} ) {
my $new_chunk_size = $lag_limiter->update($cnt, $nibble_time); my $new_chunk_size = $tbl->{rate}->update($cnt, $tbl->{nibble_time});
if ( $new_chunk_size < 1 ) { if ( $new_chunk_size < 1 ) {
# This shouldn't happen, but we must know if it does. And # This shouldn't happen, but we must know if it does. And
# chunk size can't be set less than 1. # chunk size can't be set less than 1.
@@ -4929,7 +4925,8 @@ sub main {
. "is not being overloaded, or increase --chunk-time. " . "is not being overloaded, or increase --chunk-time. "
. "The last chunk, number $args{nibbleno} of table " . "The last chunk, number $args{nibbleno} of table "
. "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took " . "$tbl->{db}.$tbl->{tbl}, selected $cnt rows and took "
. sprintf('%.3f', $nibble_time) . " seconds to execute.\n"; . sprintf('%.3f', $tbl->{nibble_time}) . " seconds to "
. "execute.\n";
$new_chunk_size = 1; $new_chunk_size = 1;
} }
$args{NibbleIterator}->set_chunk_size($new_chunk_size); $args{NibbleIterator}->set_chunk_size($new_chunk_size);
@@ -4951,7 +4948,7 @@ sub main {
name => "Waiting for replicas to catch up", name => "Waiting for replicas to catch up",
); );
} }
$lag_limiter->wait(Progress => $pr); $replica_lag->wait(Progress => $pr);
return; return;
}, },
@@ -4998,6 +4995,18 @@ sub main {
%common_modules, %common_modules,
); );
# Init a new weighted avg rate calculator for the table. This
# table may be really different from the previous. E.g., the
# prev may have been all INT cols--really fast--so chunk size
# was increased dramatically, but this table may have lots of
# BLOB cols--potentially really slow--so we want to start
# cautiously.
$tbl->{rate} = new WeightedAvgRate(
intital_n => $o->get('chunk-size'),
initial_t => $o->get('chunk-time'),
target_t => $o->get('chunk-time'),
);
# The "1 while" loop is necessary because we're executing REPLACE # The "1 while" loop is necessary because we're executing REPLACE
# statements which don't return rows and NibbleIterator only # statements which don't return rows and NibbleIterator only
# returns if it has rows to return. So all the work is done via # returns if it has rows to return. So all the work is done via

View File

@@ -32,18 +32,34 @@ $Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1; $Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0; $Data::Dumper::Quotekeys = 0;
# Sub: new
#
# Required Arguments:
# dbh - dbh
# tbl - Standard tbl ref
# chunk_size - Number of rows to nibble per chunk
# OptionParser - <OptionParser> object
# TableNibbler - <TableNibbler> object
# TableParser - <TableParser> object
# Quoter - <Quoter> object
#
# Optional Arguments:
# chunk_indexd - Index to use for nibbling
#
# Returns:
# NibbleIterator object
sub new { sub new {
my ( $class, %args ) = @_; my ( $class, %args ) = @_;
my @required_args = qw(dbh tbl OptionParser Quoter TableNibbler TableParser); my @required_args = qw(dbh tbl chunk_size OptionParser Quoter TableNibbler TableParser);
foreach my $arg ( @required_args ) { foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg}; die "I need a $arg argument" unless $args{$arg};
} }
my ($dbh, $tbl, $o, $q) = @args{@required_args}; my ($dbh, $tbl, $chunk_size, $o, $q) = @args{@required_args};
# Get an index to nibble by. We'll order rows by the index's columns. # Get an index to nibble by. We'll order rows by the index's columns.
my $index = $args{TableParser}->find_best_index( my $index = $args{TableParser}->find_best_index(
$tbl->{tbl_struct}, $tbl->{tbl_struct},
$o->get('chunk-index'), $args{chunk_index},
); );
die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index; die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index;
my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols}; my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols};
@@ -152,7 +168,7 @@ sub new {
. " /*explain one nibble*/"; . " /*explain one nibble*/";
MKDEBUG && _d('Explain one nibble statement:', $explain_one_nibble_sql); MKDEBUG && _d('Explain one nibble statement:', $explain_one_nibble_sql);
my $limit = $o->get('chunk-size') - 1; my $limit = $chunk_size - 1;
MKDEBUG && _d('Initial chunk size (LIMIT):', $limit); MKDEBUG && _d('Initial chunk size (LIMIT):', $limit);
my $self = { my $self = {

View File

@@ -15,21 +15,12 @@
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple # this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA. # Place, Suite 330, Boston, MA 02111-1307 USA.
# ########################################################################### # ###########################################################################
# ReplicaLagLimiter package # ReplicaLagWaiter package
# ########################################################################### # ###########################################################################
{ {
# Package: ReplicaLagLimiter # Package: ReplicaLagWaiter
# ReplicaLagLimiter helps limit slave lag when working on the master. # ReplicaLagWaiter helps limit slave lag when working on the master.
# There are two sides to this problem: operations on the master and package ReplicaLagWaiter;
# slave lag. Master ops that replicate can affect slave lag, so they
# should be adjusted to prevent overloading slaves. <update()> returns
# an adjusted "n" value (number of whatever the master is doing) based
# on a weighted decaying average of "t", how long operations are taking.
# The desired master op time range is specified by target_t.
#
# Regardless of all that, slaves may still lag, so <wait()> waits for them
# to catch up based on the spec passed to <new()>.
package ReplicaLagLimiter;
use strict; use strict;
use warnings FATAL => 'all'; use warnings FATAL => 'all';
@@ -42,62 +33,28 @@ use Data::Dumper;
# Sub: new # Sub: new
# #
# Required Arguments: # Required Arguments:
# oktorun - Callback that returns true if it's ok to continue running # oktorun - Callback that returns true if it's ok to continue running
# get_lag - Callback passed slave dbh and returns slave's lag # get_lag - Callback passed slave dbh and returns slave's lag
# sleep - Callback to sleep between checking lag. # sleep - Callback to sleep between checking lag.
# max_lag - Max lag # max_lag - Max lag
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] # slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
# initial_n - Initial n value for <update()>
# initial_t - Initial t value for <update()>
# target_t - Target time for t in <update()>
#
# Optional Arguments:
# weight - Weight of previous n/t values (default 0.75).
# #
# Returns: # Returns:
# ReplicaLagLimiter object # ReplicaLagWaiter object
sub new { sub new {
my ( $class, %args ) = @_; my ( $class, %args ) = @_;
my @required_args = qw(oktorun get_lag sleep max_lag slaves initial_n initial_t target_t); my @required_args = qw(oktorun get_lag sleep max_lag slaves);
foreach my $arg ( @required_args ) { foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg}; die "I need a $arg argument" unless defined $args{$arg};
} }
my $self = { my $self = {
%args, %args,
avg_n => $args{initial_n},
avg_t => $args{initial_t},
weight => $args{weight} || 0.75,
}; };
return bless $self, $class; return bless $self, $class;
} }
# Sub: update
# Update weighted decaying average of master operation time. Param n is
# generic; it's how many of whatever the caller is doing (rows, checksums,
# etc.). Param s is how long this n took, in seconds (hi-res or not).
#
# Parameters:
# n - Number of operations (rows, etc.)
# t - Amount of time in seconds that n took
#
# Returns:
# n adjust to meet target_t based on weighted decaying avg rate
sub update {
my ($self, $n, $t) = @_;
MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's');
$self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n;
$self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t;
$self->{avg_rate} = $self->{avg_n} / $self->{avg_t};
MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s');
my $new_n = int($self->{avg_rate} * $self->{target_t});
MKDEBUG && _d('Adjust n to', $new_n);
return $new_n;
}
# Sub: wait # Sub: wait
# Wait for Seconds_Behind_Master on all slaves to become < max. # Wait for Seconds_Behind_Master on all slaves to become < max.
# #

96
lib/WeightedAvgRate.pm Normal file
View File

@@ -0,0 +1,96 @@
# This program is copyright 2011 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# WeightedAvgRate package
# ###########################################################################
{
# Package: WeightedAvgRate
# WeightedAvgRate calculates and returns a weighted average rate.
package WeightedAvgRate;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
# Sub: new
#
# Required Arguments:
# initial_n - Initial n value for <update()>
# initial_t - Initial t value for <update()>
# target_t - Target time for t in <update()>
#
# Optional Arguments:
# weight - Weight of previous n/t values (default 0.75).
#
# Returns:
# WeightedAvgRate
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(initial_n initial_t target_t);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my $self = {
%args,
avg_n => $args{initial_n},
avg_t => $args{initial_t},
weight => $args{weight} || 0.75,
};
return bless $self, $class;
}
# Sub: update
# Update weighted average rate. Param n is generic; it's how many of
# whatever the caller is doing (rows, checksums, etc.). Param s is how
# long this n took, in seconds (hi-res or not).
#
# Parameters:
# n - Number of operations (rows, etc.)
# t - Amount of time in seconds that n took
#
# Returns:
# n adjust to meet target_t based on weighted decaying avg rate
sub update {
my ($self, $n, $t) = @_;
MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's');
$self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n;
$self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t;
$self->{avg_rate} = $self->{avg_n} / $self->{avg_t};
MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s');
my $new_n = int($self->{avg_rate} * $self->{target_t});
MKDEBUG && _d('Adjust n to', $new_n);
return $new_n;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End WeightedAvgRate package
# ###########################################################################

View File

@@ -77,10 +77,11 @@ sub make_nibble_iter {
1 while $si->next_schema_object(); 1 while $si->next_schema_object();
my $ni = new NibbleIterator( my $ni = new NibbleIterator(
dbh => $dbh, dbh => $dbh,
tbl => $schema->get_table($args{db}, $args{tbl}), tbl => $schema->get_table($args{db}, $args{tbl}),
callbacks => $args{callbacks}, chunk_size => $o->get('chunk-size'),
select => $args{select}, callbacks => $args{callbacks},
select => $args{select},
%common_modules, %common_modules,
); );

View File

@@ -9,9 +9,9 @@ BEGIN {
use strict; use strict;
use warnings FATAL => 'all'; use warnings FATAL => 'all';
use English qw(-no_match_vars); use English qw(-no_match_vars);
use Test::More tests => 10; use Test::More tests => 5;
use ReplicaLagLimiter; use ReplicaLagWaiter;
use PerconaTest; use PerconaTest;
my $oktorun = 1; my $oktorun = 1;
@@ -35,75 +35,17 @@ sub sleep {
sleep $t; sleep $t;
} }
my $rll = new ReplicaLagLimiter( my $rll = new ReplicaLagWaiter(
oktorun => \&oktorun, oktorun => \&oktorun,
get_lag => \&get_lag, get_lag => \&get_lag,
sleep => \&sleep, sleep => \&sleep,
max_lag => 1, max_lag => 1,
initial_n => 1000,
initial_t => 1,
target_t => 1,
slaves => [ slaves => [
{ dsn=>{n=>'slave1'}, dbh=>1 }, { dsn=>{n=>'slave1'}, dbh=>1 },
{ dsn=>{n=>'slave2'}, dbh=>2 }, { dsn=>{n=>'slave2'}, dbh=>2 },
], ],
); );
# ############################################################################
# Update master op, see if we get correct adjustment result.
# ############################################################################
# stay the same
for (1..5) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
1000,
"Same rate, same n"
);
# slow down
for (1..5) {
$rll->update(1000, 2);
}
is(
$rll->update(1000, 2),
542,
"Decrease rate, decrease n"
);
for (1..15) {
$rll->update(1000, 2);
}
is(
$rll->update(1000, 2),
500,
"limit n=500 decreasing"
);
# speed up
for (1..5) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
849,
"Increase rate, increase n"
);
for (1..20) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
999,
"limit n=1000 increasing"
);
# ############################################################################
# Fake waiting for slaves.
# ############################################################################
@lag = (0, 0); @lag = (0, 0);
my $t = time; my $t = time;
$rll->wait(); $rll->wait();

85
t/lib/WeightedAvgRate.t Normal file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/perl
BEGIN {
die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n"
unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH};
unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib";
};
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Test::More tests => 6;
use WeightedAvgRate;
use PerconaTest;
my $rll = new WeightedAvgRate(
initial_n => 1000,
initial_t => 1,
target_t => 1,
);
# stay the same
for (1..5) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
1000,
"Same rate, same n"
);
# slow down
for (1..5) {
$rll->update(1000, 2);
}
is(
$rll->update(1000, 2),
542,
"Decrease rate, decrease n"
);
for (1..15) {
$rll->update(1000, 2);
}
is(
$rll->update(1000, 2),
500,
"limit n=500 decreasing"
);
# speed up
for (1..5) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
849,
"Increase rate, increase n"
);
for (1..20) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
999,
"limit n=1000 increasing"
);
# #############################################################################
# Done.
# #############################################################################
my $output = '';
{
local *STDERR;
open STDERR, '>', \$output;
$rll->_d('Complete test coverage');
}
like(
$output,
qr/Complete test coverage/,
'_d() works'
);
exit;