diff --git a/lib/NibbleIterator.pm b/lib/NibbleIterator.pm index 006cda1d..ed7d9170 100644 --- a/lib/NibbleIterator.pm +++ b/lib/NibbleIterator.pm @@ -93,15 +93,16 @@ sub new { # does (col > a AND col <= e). Hence the fancy LIMIT 2 which returns # the upper boundary for the current nibble *and* the lower boundary # for the next nibble. See _next_boundaries(). - my $ub_sql = _make_ub_sql( - cols => $asc->{scols}, - from => $from, - where => $asc->{boundaries}->{'>='} - . ($args{where} ? " AND ($args{where})" : ''), - order_by => $order_by, - limit => $o->get('chunk-size'), - Quoter => $q, - ); + my $ub_sql + = "SELECT /*!40001 SQL_NO_CACHE */ " + . join(', ', map { $q->quote($_) } @{$asc->{scols}}) + . " FROM $from" + . " WHERE " . $asc->{boundaries}->{'>='} + . ($args{where} ? " AND ($args{where})" : '') + . " ORDER BY $order_by" + . " LIMIT ?, 2" + . " /*upper boundary*/"; + MKDEBUG && _d('Upper boundary statement:', $ub_sql); # This statement does the actual nibbling work; its rows are returned # to the caller via next(). @@ -157,6 +158,7 @@ sub new { index => $index, from => $from, order_by => $order_by, + limit => $o->get('chunk-size') - 1, first_lb_sql => $first_lb_sql, last_ub_sql => $last_ub_sql, ub_sql => $ub_sql, @@ -258,47 +260,10 @@ sub nibble_number { sub set_chunk_size { my ($self, $limit) = @_; MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit); - - $self->{ub_sql} = _make_ub_sql( - cols => $self->{asc}->{scols}, - from => $self->{from}, - where => $self->{asc}->{boundaries}->{'>='} - . ($self->{where} ? " AND ($self->{where})" : ''), - order_by => $self->{order_by}, - limit => $limit, - Quoter => $self->{Quoter}, - ); - - # ub_sth won't exist if user calls this sub before calling next() once. - if ($self->{ub_sth}) { - $self->{ub_sth}->finish(); - $self->{ub_sth} = undef; - } - - $self->_prepare_sths(); - + $self->{limit} = $limit - 1; return; } -sub _make_ub_sql { - my (%args) = @_; - my @required_args = qw(cols from where order_by limit Quoter); - foreach my $arg ( @required_args ) { - die "I need a $arg argument" unless $args{$arg}; - } - my ($cols, $from, $where, $order_by, $limit, $q) = @args{@required_args}; - my $ub_sql - = "SELECT /*!40001 SQL_NO_CACHE */ " - . join(', ', map { $q->quote($_) } @{$cols}) - . " FROM $from" - . " WHERE $where" - . " ORDER BY $order_by" - . " LIMIT 2 OFFSET " . ((int($limit) || 1) - 1) - . " /*upper boundary*/"; - MKDEBUG && _d('Upper boundary statement:', $ub_sql); - return $ub_sql; -} - sub _can_nibble_once { my ($self) = @_; my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)}; @@ -382,8 +347,8 @@ sub _next_boundaries { $self->{lb} = $self->{next_lb}; MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:', - join(', ', @{$self->{lb}})); - $self->{ub_sth}->execute(@{$self->{lb}}); + join(', ', @{$self->{lb}}), $self->{limit}); + $self->{ub_sth}->execute(@{$self->{lb}}, $self->{limit}); my $boundary = $self->{ub_sth}->fetchall_arrayref(); MKDEBUG && _d('Next boundary:', Dumper($boundary)); if ( $boundary && @$boundary ) { diff --git a/lib/ReplicaLagLimiter.pm b/lib/ReplicaLagLimiter.pm index ef8b8975..90589a62 100644 --- a/lib/ReplicaLagLimiter.pm +++ b/lib/ReplicaLagLimiter.pm @@ -24,13 +24,13 @@ # slave lag. Master ops that replicate can affect slave lag, so they # should be adjusted to prevent overloading slaves. returns # and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on -# a moving average of how long operations are taking on the master. -# The desired master op time range is specified by target_time. By -# default, the moving average sample_size is 5, so the last 5 master op -# times are used. Increasing sample_size will smooth out variations -# and make adjustments less volatile. Regardless of all that, slaves may -# still lag, so waits for them to catch up based on the spec -# passed to . +# an weighted decaying average of how long operations are taking on the +# master. The desired master op time range is specified by target_time. +# By default, the running avg is weight is 0.75; or, new times weight +# only 0.25 so temporary variations won't cause volatility. +# +# Regardless of all that, slaves may still lag, so waits for them +# to catch up based on the spec passed to . package ReplicaLagLimiter; use strict; @@ -46,10 +46,11 @@ use Time::HiRes qw(sleep time); # spec - --replicat-lag spec (arrayref of option=value pairs) # slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...] # get_lag - Callback passed slave dbh and returns slave's lag -# target_time - Arrayref with lower and upper range for master op time +# target_time - Target time for master ops # # Optional Arguments: # sample_size - Number of master op samples to use for moving avg (default 5) +# weight - Weight of previous average (default 0.75). # # Returns: # ReplicaLagLimiter object @@ -79,6 +80,7 @@ sub new { get_lag => $args{get_lag}, target_time => $args{target_time}, sample_size => $args{sample_size} || 5, + weight => $args{weight} || 0.75, }; return bless $self, $class; @@ -115,39 +117,49 @@ sub validate_spec { } # Sub: update -# Update moving average of master operation time. +# Update weighted decaying average of master operation time. Param n is +# generic; it's how many of whatever the caller is doing (rows, checksums, +# etc.). Param s is how long this n took, in seconds (hi-res or not). # # Parameters: -# t - Master operation time (int or float). +# n - Number of operations (rows, etc.) +# s - Amount of time in seconds that n took # # Returns: # -1 master op is too slow, it should be reduced # 0 master op is within target time range, no adjustment # 1 master is too fast; it can be increased sub update { - my ($self, $t) = @_; - MKDEBUG && _d('Sample time:', $t); - my $sample_size = $self->{sample_size}; - my $samples = $self->{samples}; - + my ($self, $n, $s) = @_; + MKDEBUG && _d('Master op time:', $n, 'n /', $s, 's'); my $adjust = 0; - if ( @$samples == $sample_size ) { - shift @$samples; - push @$samples, $t; - my $sum = 0; - map { $sum += $_ } @$samples; - $self->{moving_avg} = $sum / $sample_size; + if ( $self->{avg_rate} ) { + # Calculated new weighted averages. + $self->{avg_n} = ($self->{avg_n} * ( $self->{weight})) + + ($n * (1 - $self->{weight})); + $self->{avg_s} = ($self->{avg_s} * ( $self->{weight})) + + ($s * (1 - $self->{weight})); + $self->{avg_rate} = int($self->{avg_n} / $self->{avg_s}); + MKDEBUG && _d('Weighted avg n:', $self->{avg_n}, 's:', $self->{avg_s}, + 'rate:', $self->{avg_rate}, 'n/s'); - MKDEBUG && _d('Moving average:', $self->{moving_avg}); - $adjust = $self->{moving_avg} < $self->{target_time}->[0] ? 1 - : $self->{moving_avg} > $self->{target_time}->[1] ? -1 - : 0; + $adjust = $self->{avg_s} < $self->{target_time} ? 1 + : $self->{avg_s} > $self->{target_time} ? -1 + : 0; } else { - MKDEBUG && _d('Saving sample', @$samples + 1, 'of', $sample_size); - push @$samples, $t; + MKDEBUG && _d('Saved values; initializing averages'); + $self->{n_vals}++; + $self->{total_n} += $n; + $self->{total_s} += $s; + if ( $self->{n_vals} == $self->{sample_size} ) { + $self->{avg_n} = $self->{total_n} / $self->{n_vals}; + $self->{avg_s} = $self->{total_s} / $self->{n_vals}; + $self->{avg_rate} = int($self->{avg_n} / $self->{avg_s}); + MKDEBUG && _d('Initial avg n:', $self->{avg_n}, 's:', $self->{avg_s}, + 'rate:', $self->{avg_rate}, 'n/s'); + } } - return $adjust; } diff --git a/t/lib/ReplicaLagLimiter.t b/t/lib/ReplicaLagLimiter.t index 68bd65ca..9690bcca 100644 --- a/t/lib/ReplicaLagLimiter.t +++ b/t/lib/ReplicaLagLimiter.t @@ -21,7 +21,7 @@ my $rll = new ReplicaLagLimiter( { dsn=>{n=>'slave2'}, dbh=>2 }, ], get_lag => \&get_lag, - target_time => [0.9,1.1], + target_time => 1, ); # ############################################################################ @@ -69,38 +69,40 @@ throws_ok( # Update master op, see if we get correct adjustment result. # ############################################################################ for (1..4) { - $rll->update(1); + $rll->update(1000, 1); } is( - $rll->update(1), + $rll->update(1000, 1), 0, "5 time samples, no adjustmenet" ); for (1..4) { - $rll->update(1); + $rll->update(1000, 1); } is( - $rll->update(1), + $rll->update(1000, 1), 0, - "Moving avg hasn't changed" + "Avg hasn't changed" ); -$rll->update(2); -$rll->update(2); -$rll->update(2); +# Results in: Weighted avg n: 1000 s: 1.683593 rate: 593 n/s +$rll->update(1000, 2); +$rll->update(1000, 2); +$rll->update(1000, 2); is( - $rll->update(2), + $rll->update(1000, 2), -1, - "Adjust down (moving avg = 1.8)" + "Adjust down" ); -$rll->update(0.1); -$rll->update(0.1); +# Results in: Weighted avg n: 1000 s: 0.768078 rate: 1301 n/s +$rll->update(1000, 0.1); +$rll->update(1000, 0.1); is( - $rll->update(0.1), + $rll->update(1000, 0.1), 1, - "Adjust up (moving avg = 0.86)" + "Adjust up" ); # ############################################################################