From a19dd0b16c72af64837cf31859a82684b3a3f279 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Tue, 20 Sep 2011 08:59:33 -0600 Subject: [PATCH] Simplify ReplicaLagLimiter::update(). --- lib/ReplicaLagLimiter.pm | 92 +++++++++++++++------------------------ t/lib/ReplicaLagLimiter.t | 72 ++++++++++++++++++------------ 2 files changed, 79 insertions(+), 85 deletions(-) diff --git a/lib/ReplicaLagLimiter.pm b/lib/ReplicaLagLimiter.pm index 90589a62..58c77463 100644 --- a/lib/ReplicaLagLimiter.pm +++ b/lib/ReplicaLagLimiter.pm @@ -23,11 +23,9 @@ # There are two sides to this problem: operations on the master and # slave lag. Master ops that replicate can affect slave lag, so they # should be adjusted to prevent overloading slaves. returns -# and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on -# an weighted decaying average of how long operations are taking on the -# master. The desired master op time range is specified by target_time. -# By default, the running avg is weight is 0.75; or, new times weight -# only 0.25 so temporary variations won't cause volatility. +# an adjusted "n" value (number of whatever the master is doing) based +# on a weighted decaying average of "t", how long operations are taking. +# The desired master op time range is specified by target_t. # # Regardless of all that, slaves may still lag, so waits for them # to catch up based on the spec passed to . @@ -43,20 +41,21 @@ use Time::HiRes qw(sleep time); # Sub: new # # Required Arguments: -# spec - --replicat-lag spec (arrayref of option=value pairs) -# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] -# get_lag - Callback passed slave dbh and returns slave's lag -# target_time - Target time for master ops +# spec - --replicat-lag spec (arrayref of option=value pairs) +# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] +# get_lag - Callback passed slave dbh and returns slave's lag +# initial_n - Initial n value for +# initial_t - Initial t value for +# target_t - Target time for t in # # Optional Arguments: -# sample_size - Number of master op samples to use for moving avg (default 5) -# weight - Weight of previous average (default 0.75). +# weight - Weight of previous n/t values (default 0.75). # # Returns: # ReplicaLagLimiter object sub new { my ( $class, %args ) = @_; - my @required_args = qw(spec slaves get_lag target_time); + my @required_args = qw(spec slaves get_lag initial_n initial_t target_t); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless $args{$arg}; } @@ -69,18 +68,17 @@ sub new { } @$spec; my $self = { - max => 1, # max slave lag - timeout => 3600, # max time to wait for all slaves to catch up - check => 1, # sleep time between checking slave lag - continue => 'no', # return true even if timeout - %specs, # slave wait specs from caller - samples => [], # master op times - moving_avg => 0, # moving avgerge of samples - slaves => $args{slaves}, - get_lag => $args{get_lag}, - target_time => $args{target_time}, - sample_size => $args{sample_size} || 5, - weight => $args{weight} || 0.75, + max => 1, # max slave lag + timeout => 3600, # max time to wait for all slaves to catch up + check => 1, # sleep time between checking slave lag + continue => 'no', # return true even if timeout + %specs, # slave wait specs from caller + slaves => $args{slaves}, + get_lag => $args{get_lag}, + avg_n => $args{initial_n}, + avg_t => $args{initial_t}, + target_t => $args{target_t}, + weight => $args{weight} || 0.75, }; return bless $self, $class; @@ -123,44 +121,22 @@ sub validate_spec { # # Parameters: # n - Number of operations (rows, etc.) -# s - Amount of time in seconds that n took +# t - Amount of time in seconds that n took # # Returns: -# -1 master op is too slow, it should be reduced -# 0 master op is within target time range, no adjustment -# 1 master is too fast; it can be increased +# n adjust to meet target_t based on weighted decaying avg rate sub update { - my ($self, $n, $s) = @_; - MKDEBUG && _d('Master op time:', $n, 'n /', $s, 's'); - my $adjust = 0; - if ( $self->{avg_rate} ) { - # Calculated new weighted averages. - $self->{avg_n} = ($self->{avg_n} * ( $self->{weight})) - + ($n * (1 - $self->{weight})); - $self->{avg_s} = ($self->{avg_s} * ( $self->{weight})) - + ($s * (1 - $self->{weight})); - $self->{avg_rate} = int($self->{avg_n} / $self->{avg_s}); - MKDEBUG && _d('Weighted avg n:', $self->{avg_n}, 's:', $self->{avg_s}, - 'rate:', $self->{avg_rate}, 'n/s'); + my ($self, $n, $t) = @_; + MKDEBUG && _d('Master op time:', $n, 'n /', $t, 's'); - $adjust = $self->{avg_s} < $self->{target_time} ? 1 - : $self->{avg_s} > $self->{target_time} ? -1 - : 0; - } - else { - MKDEBUG && _d('Saved values; initializing averages'); - $self->{n_vals}++; - $self->{total_n} += $n; - $self->{total_s} += $s; - if ( $self->{n_vals} == $self->{sample_size} ) { - $self->{avg_n} = $self->{total_n} / $self->{n_vals}; - $self->{avg_s} = $self->{total_s} / $self->{n_vals}; - $self->{avg_rate} = int($self->{avg_n} / $self->{avg_s}); - MKDEBUG && _d('Initial avg n:', $self->{avg_n}, 's:', $self->{avg_s}, - 'rate:', $self->{avg_rate}, 'n/s'); - } - } - return $adjust; + $self->{avg_n} = ($self->{avg_n} * $self->{weight}) + $n; + $self->{avg_t} = ($self->{avg_t} * $self->{weight}) + $t; + $self->{avg_rate} = $self->{avg_n} / $self->{avg_t}; + MKDEBUG && _d('Weighted avg rate:', $self->{avg_rate}, 'n/s'); + + my $new_n = int($self->{avg_rate} * $self->{target_t}); + MKDEBUG && _d('Adjust n to', $new_n); + return $new_n; } # Sub: wait_for_slave diff --git a/t/lib/ReplicaLagLimiter.t b/t/lib/ReplicaLagLimiter.t index 9690bcca..9604d302 100644 --- a/t/lib/ReplicaLagLimiter.t +++ b/t/lib/ReplicaLagLimiter.t @@ -9,7 +9,7 @@ BEGIN { use strict; use warnings FATAL => 'all'; use English qw(-no_match_vars); -use Test::More tests => 16; +use Test::More tests => 17; use ReplicaLagLimiter; use PerconaTest; @@ -21,7 +21,9 @@ my $rll = new ReplicaLagLimiter( { dsn=>{n=>'slave2'}, dbh=>2 }, ], get_lag => \&get_lag, - target_time => 1, + initial_n => 1000, + initial_t => 1, + target_t => 1, ); # ############################################################################ @@ -68,41 +70,53 @@ throws_ok( # ############################################################################ # Update master op, see if we get correct adjustment result. # ############################################################################ -for (1..4) { + +# stay the same +for (1..5) { $rll->update(1000, 1); } is( $rll->update(1000, 1), - 0, - "5 time samples, no adjustmenet" + 1000, + "Same rate, same n" ); -for (1..4) { - $rll->update(1000, 1); +# slow down +for (1..5) { + $rll->update(1000, 2); } -is( - $rll->update(1000, 1), - 0, - "Avg hasn't changed" -); - -# Results in: Weighted avg n: 1000 s: 1.683593 rate: 593 n/s -$rll->update(1000, 2); -$rll->update(1000, 2); -$rll->update(1000, 2); is( $rll->update(1000, 2), - -1, - "Adjust down" + 542, + "Decrease rate, decrease n" ); -# Results in: Weighted avg n: 1000 s: 0.768078 rate: 1301 n/s -$rll->update(1000, 0.1); -$rll->update(1000, 0.1); +for (1..15) { + $rll->update(1000, 2); +} is( - $rll->update(1000, 0.1), - 1, - "Adjust up" + $rll->update(1000, 2), + 500, + "limit n=500 decreasing" +); + +# speed up +for (1..5) { + $rll->update(1000, 1); +} +is( + $rll->update(1000, 1), + 849, + "Increase rate, increase n" +); + +for (1..20) { + $rll->update(1000, 1); +} +is( + $rll->update(1000, 1), + 999, + "limit n=1000 increasing" ); # ############################################################################ @@ -153,7 +167,9 @@ $rll = new ReplicaLagLimiter( { dsn=>{n=>'slave2'}, dbh=>2 }, ], get_lag => \&get_lag, - target_time => [0.9,1.1], + initial_n => 1000, + initial_t => 1, + target_t => 1, ); @waited = (); @@ -172,7 +188,9 @@ $rll = new ReplicaLagLimiter( { dsn=>{n=>'slave2'}, dbh=>2 }, ], get_lag => \&get_lag, - target_time => [0.9,1.1], + initial_n => 1000, + initial_t => 1, + target_t => 1, ); @waited = ();