Simplify ReplicaLagLimiter::update().

This commit is contained in:
Daniel Nichter
2011-09-20 08:59:33 -06:00
parent 26b99b20ad
commit a19dd0b16c
2 changed files with 79 additions and 85 deletions

View File

@@ -23,11 +23,9 @@
# There are two sides to this problem: operations on the master and # There are two sides to this problem: operations on the master and
# slave lag. Master ops that replicate can affect slave lag, so they # slave lag. Master ops that replicate can affect slave lag, so they
# should be adjusted to prevent overloading slaves. <update()> returns # should be adjusted to prevent overloading slaves. <update()> returns
# and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on # an adjusted "n" value (number of whatever the master is doing) based
# an weighted decaying average of how long operations are taking on the # on a weighted decaying average of "t", how long operations are taking.
# master. The desired master op time range is specified by target_time. # The desired master op time range is specified by target_t.
# By default, the running avg is weight is 0.75; or, new times weight
# only 0.25 so temporary variations won't cause volatility.
# #
# Regardless of all that, slaves may still lag, so <wait()> waits for them # Regardless of all that, slaves may still lag, so <wait()> waits for them
# to catch up based on the spec passed to <new()>. # to catch up based on the spec passed to <new()>.
@@ -46,17 +44,18 @@ use Time::HiRes qw(sleep time);
# spec - --replicat-lag spec (arrayref of option=value pairs) # spec - --replicat-lag spec (arrayref of option=value pairs)
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] # slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
# get_lag - Callback passed slave dbh and returns slave's lag # get_lag - Callback passed slave dbh and returns slave's lag
# target_time - Target time for master ops # initial_n - Initial n value for <update()>
# initial_t - Initial t value for <update()>
# target_t - Target time for t in <update()>
# #
# Optional Arguments: # Optional Arguments:
# sample_size - Number of master op samples to use for moving avg (default 5) # weight - Weight of previous n/t values (default 0.75).
# weight - Weight of previous average (default 0.75).
# #
# Returns: # Returns:
# ReplicaLagLimiter object # ReplicaLagLimiter object
sub new { sub new {
my ( $class, %args ) = @_; my ( $class, %args ) = @_;
my @required_args = qw(spec slaves get_lag target_time); my @required_args = qw(spec slaves get_lag initial_n initial_t target_t);
foreach my $arg ( @required_args ) { foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg}; die "I need a $arg argument" unless $args{$arg};
} }
@@ -74,12 +73,11 @@ sub new {
check => 1, # sleep time between checking slave lag check => 1, # sleep time between checking slave lag
continue => 'no', # return true even if timeout continue => 'no', # return true even if timeout
%specs, # slave wait specs from caller %specs, # slave wait specs from caller
samples => [], # master op times
moving_avg => 0, # moving avgerge of samples
slaves => $args{slaves}, slaves => $args{slaves},
get_lag => $args{get_lag}, get_lag => $args{get_lag},
target_time => $args{target_time}, avg_n => $args{initial_n},
sample_size => $args{sample_size} || 5, avg_t => $args{initial_t},
target_t => $args{target_t},
weight => $args{weight} || 0.75, weight => $args{weight} || 0.75,
}; };
@@ -123,44 +121,22 @@ sub validate_spec {
# #
# Parameters: # Parameters:
# n - Number of operations (rows, etc.) # n - Number of operations (rows, etc.)
# s - Amount of time in seconds that n took # t - Amount of time in seconds that n took
# #
# Returns: # Returns:
# -1 master op is too slow, it should be reduced # n adjust to meet target_t based on weighted decaying avg rate
# 0 master op is within target time range, no adjustment
# 1 master is too fast; it can be increased
sub update { sub update {
my ($self, $n, $s) = @_; my ($self, $n, $t) = @_;
MKDEBUG && _d('Master op time:', $n, 'n /', $s, 's'); MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's');
my $adjust = 0;
if ( $self->{avg_rate} ) {
# Calculated new weighted averages.
$self->{avg_n} = ($self->{avg_n} * ( $self->{weight}))
+ ($n * (1 - $self->{weight}));
$self->{avg_s} = ($self->{avg_s} * ( $self->{weight}))
+ ($s * (1 - $self->{weight}));
$self->{avg_rate} = int($self->{avg_n} / $self->{avg_s});
MKDEBUG && _d('Weighted avg n:', $self->{avg_n}, 's:', $self->{avg_s},
'rate:', $self->{avg_rate}, 'n/s');
$adjust = $self->{avg_s} < $self->{target_time} ? 1 $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n;
: $self->{avg_s} > $self->{target_time} ? -1 $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t;
: 0; $self->{avg_rate} = $self->{avg_n} / $self->{avg_t};
} MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s');
else {
MKDEBUG && _d('Saved values; initializing averages'); my $new_n = int($self->{avg_rate} * $self->{target_t});
$self->{n_vals}++; MKDEBUG && _d('Adjust n to', $new_n);
$self->{total_n} += $n; return $new_n;
$self->{total_s} += $s;
if ( $self->{n_vals} == $self->{sample_size} ) {
$self->{avg_n} = $self->{total_n} / $self->{n_vals};
$self->{avg_s} = $self->{total_s} / $self->{n_vals};
$self->{avg_rate} = int($self->{avg_n} / $self->{avg_s});
MKDEBUG && _d('Initial avg n:', $self->{avg_n}, 's:', $self->{avg_s},
'rate:', $self->{avg_rate}, 'n/s');
}
}
return $adjust;
} }
# Sub: wait_for_slave # Sub: wait_for_slave

View File

@@ -9,7 +9,7 @@ BEGIN {
use strict; use strict;
use warnings FATAL => 'all'; use warnings FATAL => 'all';
use English qw(-no_match_vars); use English qw(-no_match_vars);
use Test::More tests => 16; use Test::More tests => 17;
use ReplicaLagLimiter; use ReplicaLagLimiter;
use PerconaTest; use PerconaTest;
@@ -21,7 +21,9 @@ my $rll = new ReplicaLagLimiter(
{ dsn=>{n=>'slave2'}, dbh=>2 }, { dsn=>{n=>'slave2'}, dbh=>2 },
], ],
get_lag => \&get_lag, get_lag => \&get_lag,
target_time => 1, initial_n => 1000,
initial_t => 1,
target_t => 1,
); );
# ############################################################################ # ############################################################################
@@ -68,41 +70,53 @@ throws_ok(
# ############################################################################ # ############################################################################
# Update master op, see if we get correct adjustment result. # Update master op, see if we get correct adjustment result.
# ############################################################################ # ############################################################################
for (1..4) {
# stay the same
for (1..5) {
$rll->update(1000, 1); $rll->update(1000, 1);
} }
is( is(
$rll->update(1000, 1), $rll->update(1000, 1),
0, 1000,
"5 time samples, no adjustmenet" "Same rate, same n"
); );
for (1..4) { # slow down
$rll->update(1000, 1); for (1..5) {
$rll->update(1000, 2);
} }
is(
$rll->update(1000, 1),
0,
"Avg hasn't changed"
);
# Results in: Weighted avg n: 1000 s: 1.683593 rate: 593 n/s
$rll->update(1000, 2);
$rll->update(1000, 2);
$rll->update(1000, 2);
is( is(
$rll->update(1000, 2), $rll->update(1000, 2),
-1, 542,
"Adjust down" "Decrease rate, decrease n"
); );
# Results in: Weighted avg n: 1000 s: 0.768078 rate: 1301 n/s for (1..15) {
$rll->update(1000, 0.1); $rll->update(1000, 2);
$rll->update(1000, 0.1); }
is( is(
$rll->update(1000, 0.1), $rll->update(1000, 2),
1, 500,
"Adjust up" "limit n=500 decreasing"
);
# speed up
for (1..5) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
849,
"Increase rate, increase n"
);
for (1..20) {
$rll->update(1000, 1);
}
is(
$rll->update(1000, 1),
999,
"limit n=1000 increasing"
); );
# ############################################################################ # ############################################################################
@@ -153,7 +167,9 @@ $rll = new ReplicaLagLimiter(
{ dsn=>{n=>'slave2'}, dbh=>2 }, { dsn=>{n=>'slave2'}, dbh=>2 },
], ],
get_lag => \&get_lag, get_lag => \&get_lag,
target_time => [0.9,1.1], initial_n => 1000,
initial_t => 1,
target_t => 1,
); );
@waited = (); @waited = ();
@@ -172,7 +188,9 @@ $rll = new ReplicaLagLimiter(
{ dsn=>{n=>'slave2'}, dbh=>2 }, { dsn=>{n=>'slave2'}, dbh=>2 },
], ],
get_lag => \&get_lag, get_lag => \&get_lag,
target_time => [0.9,1.1], initial_n => 1000,
initial_t => 1,
target_t => 1,
); );
@waited = (); @waited = ();