mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-18 01:33:14 +00:00
Use weighted exp avg instead of moving avg for ReplicaLagLimiter::update(). Use "LIMIT ?, 2" for ub_sql/sth in NibbleIterator.
This commit is contained in:
@@ -93,15 +93,16 @@ sub new {
|
|||||||
# does (col > a AND col <= e). Hence the fancy LIMIT 2 which returns
|
# does (col > a AND col <= e). Hence the fancy LIMIT 2 which returns
|
||||||
# the upper boundary for the current nibble *and* the lower boundary
|
# the upper boundary for the current nibble *and* the lower boundary
|
||||||
# for the next nibble. See _next_boundaries().
|
# for the next nibble. See _next_boundaries().
|
||||||
my $ub_sql = _make_ub_sql(
|
my $ub_sql
|
||||||
cols => $asc->{scols},
|
= "SELECT /*!40001 SQL_NO_CACHE */ "
|
||||||
from => $from,
|
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
|
||||||
where => $asc->{boundaries}->{'>='}
|
. " FROM $from"
|
||||||
. ($args{where} ? " AND ($args{where})" : ''),
|
. " WHERE " . $asc->{boundaries}->{'>='}
|
||||||
order_by => $order_by,
|
. ($args{where} ? " AND ($args{where})" : '')
|
||||||
limit => $o->get('chunk-size'),
|
. " ORDER BY $order_by"
|
||||||
Quoter => $q,
|
. " LIMIT ?, 2"
|
||||||
);
|
. " /*upper boundary*/";
|
||||||
|
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
|
||||||
|
|
||||||
# This statement does the actual nibbling work; its rows are returned
|
# This statement does the actual nibbling work; its rows are returned
|
||||||
# to the caller via next().
|
# to the caller via next().
|
||||||
@@ -157,6 +158,7 @@ sub new {
|
|||||||
index => $index,
|
index => $index,
|
||||||
from => $from,
|
from => $from,
|
||||||
order_by => $order_by,
|
order_by => $order_by,
|
||||||
|
limit => $o->get('chunk-size') - 1,
|
||||||
first_lb_sql => $first_lb_sql,
|
first_lb_sql => $first_lb_sql,
|
||||||
last_ub_sql => $last_ub_sql,
|
last_ub_sql => $last_ub_sql,
|
||||||
ub_sql => $ub_sql,
|
ub_sql => $ub_sql,
|
||||||
@@ -258,47 +260,10 @@ sub nibble_number {
|
|||||||
sub set_chunk_size {
|
sub set_chunk_size {
|
||||||
my ($self, $limit) = @_;
|
my ($self, $limit) = @_;
|
||||||
MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit);
|
MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit);
|
||||||
|
$self->{limit} = $limit - 1;
|
||||||
$self->{ub_sql} = _make_ub_sql(
|
|
||||||
cols => $self->{asc}->{scols},
|
|
||||||
from => $self->{from},
|
|
||||||
where => $self->{asc}->{boundaries}->{'>='}
|
|
||||||
. ($self->{where} ? " AND ($self->{where})" : ''),
|
|
||||||
order_by => $self->{order_by},
|
|
||||||
limit => $limit,
|
|
||||||
Quoter => $self->{Quoter},
|
|
||||||
);
|
|
||||||
|
|
||||||
# ub_sth won't exist if user calls this sub before calling next() once.
|
|
||||||
if ($self->{ub_sth}) {
|
|
||||||
$self->{ub_sth}->finish();
|
|
||||||
$self->{ub_sth} = undef;
|
|
||||||
}
|
|
||||||
|
|
||||||
$self->_prepare_sths();
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub _make_ub_sql {
|
|
||||||
my (%args) = @_;
|
|
||||||
my @required_args = qw(cols from where order_by limit Quoter);
|
|
||||||
foreach my $arg ( @required_args ) {
|
|
||||||
die "I need a $arg argument" unless $args{$arg};
|
|
||||||
}
|
|
||||||
my ($cols, $from, $where, $order_by, $limit, $q) = @args{@required_args};
|
|
||||||
my $ub_sql
|
|
||||||
= "SELECT /*!40001 SQL_NO_CACHE */ "
|
|
||||||
. join(', ', map { $q->quote($_) } @{$cols})
|
|
||||||
. " FROM $from"
|
|
||||||
. " WHERE $where"
|
|
||||||
. " ORDER BY $order_by"
|
|
||||||
. " LIMIT 2 OFFSET " . ((int($limit) || 1) - 1)
|
|
||||||
. " /*upper boundary*/";
|
|
||||||
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
|
|
||||||
return $ub_sql;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub _can_nibble_once {
|
sub _can_nibble_once {
|
||||||
my ($self) = @_;
|
my ($self) = @_;
|
||||||
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
|
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
|
||||||
@@ -382,8 +347,8 @@ sub _next_boundaries {
|
|||||||
$self->{lb} = $self->{next_lb};
|
$self->{lb} = $self->{next_lb};
|
||||||
|
|
||||||
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
|
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
|
||||||
join(', ', @{$self->{lb}}));
|
join(', ', @{$self->{lb}}), $self->{limit});
|
||||||
$self->{ub_sth}->execute(@{$self->{lb}});
|
$self->{ub_sth}->execute(@{$self->{lb}}, $self->{limit});
|
||||||
my $boundary = $self->{ub_sth}->fetchall_arrayref();
|
my $boundary = $self->{ub_sth}->fetchall_arrayref();
|
||||||
MKDEBUG && _d('Next boundary:', Dumper($boundary));
|
MKDEBUG && _d('Next boundary:', Dumper($boundary));
|
||||||
if ( $boundary && @$boundary ) {
|
if ( $boundary && @$boundary ) {
|
||||||
|
@@ -24,13 +24,13 @@
|
|||||||
# slave lag. Master ops that replicate can affect slave lag, so they
|
# slave lag. Master ops that replicate can affect slave lag, so they
|
||||||
# should be adjusted to prevent overloading slaves. <update()> returns
|
# should be adjusted to prevent overloading slaves. <update()> returns
|
||||||
# and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on
|
# and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on
|
||||||
# a moving average of how long operations are taking on the master.
|
# an weighted decaying average of how long operations are taking on the
|
||||||
# The desired master op time range is specified by target_time. By
|
# master. The desired master op time range is specified by target_time.
|
||||||
# default, the moving average sample_size is 5, so the last 5 master op
|
# By default, the running avg is weight is 0.75; or, new times weight
|
||||||
# times are used. Increasing sample_size will smooth out variations
|
# only 0.25 so temporary variations won't cause volatility.
|
||||||
# and make adjustments less volatile. Regardless of all that, slaves may
|
#
|
||||||
# still lag, so <wait()> waits for them to catch up based on the spec
|
# Regardless of all that, slaves may still lag, so <wait()> waits for them
|
||||||
# passed to <new()>.
|
# to catch up based on the spec passed to <new()>.
|
||||||
package ReplicaLagLimiter;
|
package ReplicaLagLimiter;
|
||||||
|
|
||||||
use strict;
|
use strict;
|
||||||
@@ -46,10 +46,11 @@ use Time::HiRes qw(sleep time);
|
|||||||
# spec - --replicat-lag spec (arrayref of option=value pairs)
|
# spec - --replicat-lag spec (arrayref of option=value pairs)
|
||||||
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
|
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
|
||||||
# get_lag - Callback passed slave dbh and returns slave's lag
|
# get_lag - Callback passed slave dbh and returns slave's lag
|
||||||
# target_time - Arrayref with lower and upper range for master op time
|
# target_time - Target time for master ops
|
||||||
#
|
#
|
||||||
# Optional Arguments:
|
# Optional Arguments:
|
||||||
# sample_size - Number of master op samples to use for moving avg (default 5)
|
# sample_size - Number of master op samples to use for moving avg (default 5)
|
||||||
|
# weight - Weight of previous average (default 0.75).
|
||||||
#
|
#
|
||||||
# Returns:
|
# Returns:
|
||||||
# ReplicaLagLimiter object
|
# ReplicaLagLimiter object
|
||||||
@@ -79,6 +80,7 @@ sub new {
|
|||||||
get_lag => $args{get_lag},
|
get_lag => $args{get_lag},
|
||||||
target_time => $args{target_time},
|
target_time => $args{target_time},
|
||||||
sample_size => $args{sample_size} || 5,
|
sample_size => $args{sample_size} || 5,
|
||||||
|
weight => $args{weight} || 0.75,
|
||||||
};
|
};
|
||||||
|
|
||||||
return bless $self, $class;
|
return bless $self, $class;
|
||||||
@@ -115,39 +117,49 @@ sub validate_spec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Sub: update
|
# Sub: update
|
||||||
# Update moving average of master operation time.
|
# Update weighted decaying average of master operation time. Param n is
|
||||||
|
# generic; it's how many of whatever the caller is doing (rows, checksums,
|
||||||
|
# etc.). Param s is how long this n took, in seconds (hi-res or not).
|
||||||
#
|
#
|
||||||
# Parameters:
|
# Parameters:
|
||||||
# t - Master operation time (int or float).
|
# n - Number of operations (rows, etc.)
|
||||||
|
# s - Amount of time in seconds that n took
|
||||||
#
|
#
|
||||||
# Returns:
|
# Returns:
|
||||||
# -1 master op is too slow, it should be reduced
|
# -1 master op is too slow, it should be reduced
|
||||||
# 0 master op is within target time range, no adjustment
|
# 0 master op is within target time range, no adjustment
|
||||||
# 1 master is too fast; it can be increased
|
# 1 master is too fast; it can be increased
|
||||||
sub update {
|
sub update {
|
||||||
my ($self, $t) = @_;
|
my ($self, $n, $s) = @_;
|
||||||
MKDEBUG && _d('Sample time:', $t);
|
MKDEBUG && _d('Master op time:', $n, 'n /', $s, 's');
|
||||||
my $sample_size = $self->{sample_size};
|
|
||||||
my $samples = $self->{samples};
|
|
||||||
|
|
||||||
my $adjust = 0;
|
my $adjust = 0;
|
||||||
if ( @$samples == $sample_size ) {
|
if ( $self->{avg_rate} ) {
|
||||||
shift @$samples;
|
# Calculated new weighted averages.
|
||||||
push @$samples, $t;
|
$self->{avg_n} = ($self->{avg_n} * ( $self->{weight}))
|
||||||
my $sum = 0;
|
+ ($n * (1 - $self->{weight}));
|
||||||
map { $sum += $_ } @$samples;
|
$self->{avg_s} = ($self->{avg_s} * ( $self->{weight}))
|
||||||
$self->{moving_avg} = $sum / $sample_size;
|
+ ($s * (1 - $self->{weight}));
|
||||||
|
$self->{avg_rate} = int($self->{avg_n} / $self->{avg_s});
|
||||||
|
MKDEBUG && _d('Weighted avg n:', $self->{avg_n}, 's:', $self->{avg_s},
|
||||||
|
'rate:', $self->{avg_rate}, 'n/s');
|
||||||
|
|
||||||
MKDEBUG && _d('Moving average:', $self->{moving_avg});
|
$adjust = $self->{avg_s} < $self->{target_time} ? 1
|
||||||
$adjust = $self->{moving_avg} < $self->{target_time}->[0] ? 1
|
: $self->{avg_s} > $self->{target_time} ? -1
|
||||||
: $self->{moving_avg} > $self->{target_time}->[1] ? -1
|
: 0;
|
||||||
: 0;
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
MKDEBUG && _d('Saving sample', @$samples + 1, 'of', $sample_size);
|
MKDEBUG && _d('Saved values; initializing averages');
|
||||||
push @$samples, $t;
|
$self->{n_vals}++;
|
||||||
|
$self->{total_n} += $n;
|
||||||
|
$self->{total_s} += $s;
|
||||||
|
if ( $self->{n_vals} == $self->{sample_size} ) {
|
||||||
|
$self->{avg_n} = $self->{total_n} / $self->{n_vals};
|
||||||
|
$self->{avg_s} = $self->{total_s} / $self->{n_vals};
|
||||||
|
$self->{avg_rate} = int($self->{avg_n} / $self->{avg_s});
|
||||||
|
MKDEBUG && _d('Initial avg n:', $self->{avg_n}, 's:', $self->{avg_s},
|
||||||
|
'rate:', $self->{avg_rate}, 'n/s');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $adjust;
|
return $adjust;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -21,7 +21,7 @@ my $rll = new ReplicaLagLimiter(
|
|||||||
{ dsn=>{n=>'slave2'}, dbh=>2 },
|
{ dsn=>{n=>'slave2'}, dbh=>2 },
|
||||||
],
|
],
|
||||||
get_lag => \&get_lag,
|
get_lag => \&get_lag,
|
||||||
target_time => [0.9,1.1],
|
target_time => 1,
|
||||||
);
|
);
|
||||||
|
|
||||||
# ############################################################################
|
# ############################################################################
|
||||||
@@ -69,38 +69,40 @@ throws_ok(
|
|||||||
# Update master op, see if we get correct adjustment result.
|
# Update master op, see if we get correct adjustment result.
|
||||||
# ############################################################################
|
# ############################################################################
|
||||||
for (1..4) {
|
for (1..4) {
|
||||||
$rll->update(1);
|
$rll->update(1000, 1);
|
||||||
}
|
}
|
||||||
is(
|
is(
|
||||||
$rll->update(1),
|
$rll->update(1000, 1),
|
||||||
0,
|
0,
|
||||||
"5 time samples, no adjustmenet"
|
"5 time samples, no adjustmenet"
|
||||||
);
|
);
|
||||||
|
|
||||||
for (1..4) {
|
for (1..4) {
|
||||||
$rll->update(1);
|
$rll->update(1000, 1);
|
||||||
}
|
}
|
||||||
is(
|
is(
|
||||||
$rll->update(1),
|
$rll->update(1000, 1),
|
||||||
0,
|
0,
|
||||||
"Moving avg hasn't changed"
|
"Avg hasn't changed"
|
||||||
);
|
);
|
||||||
|
|
||||||
$rll->update(2);
|
# Results in: Weighted avg n: 1000 s: 1.683593 rate: 593 n/s
|
||||||
$rll->update(2);
|
$rll->update(1000, 2);
|
||||||
$rll->update(2);
|
$rll->update(1000, 2);
|
||||||
|
$rll->update(1000, 2);
|
||||||
is(
|
is(
|
||||||
$rll->update(2),
|
$rll->update(1000, 2),
|
||||||
-1,
|
-1,
|
||||||
"Adjust down (moving avg = 1.8)"
|
"Adjust down"
|
||||||
);
|
);
|
||||||
|
|
||||||
$rll->update(0.1);
|
# Results in: Weighted avg n: 1000 s: 0.768078 rate: 1301 n/s
|
||||||
$rll->update(0.1);
|
$rll->update(1000, 0.1);
|
||||||
|
$rll->update(1000, 0.1);
|
||||||
is(
|
is(
|
||||||
$rll->update(0.1),
|
$rll->update(1000, 0.1),
|
||||||
1,
|
1,
|
||||||
"Adjust up (moving avg = 0.86)"
|
"Adjust up"
|
||||||
);
|
);
|
||||||
|
|
||||||
# ############################################################################
|
# ############################################################################
|
||||||
|
Reference in New Issue
Block a user