Finish, test, and docu ReplicaLagLimiter.

This commit is contained in:
Daniel Nichter
2011-09-19 09:06:30 -06:00
parent 006b93ddf9
commit b1e01be2c2
2 changed files with 203 additions and 53 deletions

View File

@@ -25,8 +25,12 @@
# should be adjusted to prevent overloading slaves. <update()> returns
# and adjustment (-1=down/decrease, 0=none, 1=up/increase) based on
# a moving average of how long operations are taking on the master.
# Regardless of that, slaves may still lag, so <wait()> waits for them
# to catchup based on the spec passed to <new()>.
# The desired master op time range is specified by target_time. By
# default, the moving average sample_size is 5, so the last 5 master op
# times are used. Increasing sample_size will smooth out variations
# and make adjustments less volatile. Regardless of all that, slaves may
# still lag, so <wait()> waits for them to catch up based on the spec
# passed to <new()>.
package ReplicaLagLimiter;
use strict;
@@ -34,9 +38,24 @@ use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
use Time::HiRes qw(sleep time);
# Sub: new
#
# Required Arguments:
# spec - --replicat-lag spec (arrayref of option=value pairs)
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
# get_lag - Callback passed slave dbh and returns slave's lag
# target_time - Arrayref with lower and upper range for master op time
#
# Optional Arguments:
# sample_size - Number of master op samples to use for moving avg (default 5)
#
# Returns:
# ReplicaLagLimiter object
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(spec slaves get_lag);
my @required_args = qw(spec slaves get_lag target_time);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
@@ -49,16 +68,17 @@ sub new {
} @$spec;
my $self = {
target_time => 1, # optimal time for master ops
sample_size => 5, # number of master ops to use for moving average
max => 1, # max slave lag
timeout => 3600, # max time to wait for all slaves to catchup
check => 1, # sleep time between checking slave lag
continue => 'no', # return true even if timeout
%specs, # slave wait specs from caller
samples => [], # master op times
moving_avg => 0, # moving avgerge of samples
max => 1, # max slave lag
timeout => 3600, # max time to wait for all slaves to catch up
check => 1, # sleep time between checking slave lag
continue => 'no', # return true even if timeout
%specs, # slave wait specs from caller
samples => [], # master op times
moving_avg => 0, # moving avgerge of samples
slaves => $args{slaves},
get_lag => $args{get_lag},
target_time => $args{target_time},
sample_size => $args{sample_size} || 5,
};
return bless $self, $class;
@@ -74,17 +94,14 @@ sub validate_spec {
my $have_max;
foreach my $op ( @$spec ) {
my ($key, $val) = split '=', $op;
if ( !$key ) {
die "invalid spec format, should be key=value: $spec\n";
if ( !$key || !$val ) {
die "invalid spec format, should be option=value: $op\n";
}
if ( $key !~ m/(?:max|timeout|continue)/i ) {
die "invalid spec: $spec\n";
}
if ( !$val ) {
die "spec has no value: $spec\n";
die "unknown option in spec: $op\n";
}
if ( $key ne 'continue' && $val !~ m/^\d+$/ ) {
die "value must be an integer: $spec\n";
die "value must be an integer: $op\n";
}
if ( $key eq 'continue' && $val !~ m/(?:yes|no)/i ) {
die "value for $key must be \"yes\" or \"no\"\n";
@@ -94,8 +111,19 @@ sub validate_spec {
if ( !$have_max ) {
die "max must be specified"
}
return 1;
}
# Sub: update
# Update moving average of master operation time.
#
# Parameters:
# t - Master operation time (int or float).
#
# Returns:
# -1 master op is too slow, it should be reduced
# 0 master op is within target time range, no adjustment
# 1 master is too fast; it can be increased
sub update {
my ($self, $t) = @_;
MKDEBUG && _d('Sample time:', $t);
@@ -109,9 +137,11 @@ sub update {
my $sum = 0;
map { $sum += $_ } @$samples;
$self->{moving_avg} = $sum / $sample_size;
MKDEBUG && _d('Moving average:', $self->{moving_avg});
$adjust = $self->{target_time} <=> $self->{moving_avg};
$adjust = $self->{moving_avg} < $self->{target_time}->[0] ? 1
: $self->{moving_avg} > $self->{target_time}->[1] ? -1
: 0;
}
else {
MKDEBUG && _d('Saving sample', @$samples + 1, 'of', $sample_size);
@@ -125,10 +155,10 @@ sub update {
# Wait for Seconds_Behind_Master on all slaves to become < max.
#
# Optional Arguments:
# Progress - <Progress> object.
# Progress - <Progress> object to report waiting
#
# Returns:
# True if all slaves caught up, else 0 (timeout)
# True if all slaves catch up before timeout, else die unless continue is true
sub wait {
my ( $self, %args ) = @_;
my @required_args = qw();
@@ -150,7 +180,7 @@ sub wait {
if ( !$reported ) {
print STDERR "Waiting for replica "
. ($slaves->[$slave_no]->{dsn}->{n} || '')
. " to catchup...\n";
. " to catch up...\n";
$reported = 1;
}
else {
@@ -166,7 +196,7 @@ sub wait {
my $slave = $slaves->[$slave_no];
my $t_start = time;
while ($slave && time - $t_start < $timeout) {
MKDEBUG && _d('Checking slave lag on', $slave->{n});
MKDEBUG && _d('Checking slave lag on', $slave->{dsn}->{n});
my $lag = $get_lag->($slave->{dbh});
if ( !defined $lag || $lag > $max ) {
MKDEBUG && _d('Replica lag', $lag, '>', $max, '; sleeping', $check);
@@ -178,10 +208,11 @@ sub wait {
$slave = $slaves->[++$slave_no];
}
}
if ( $slave_no >= @$slave ) {
MKDEBUG && _d('Timeout waiting for', $slaves->[$slave_no]->{dsn}->{n});
return 0 unless $self->{continue};
if ( $slave_no < @$slaves && $self->{continue} eq 'no' ) {
die "Timeout waiting for replica " . $slaves->[$slave_no]->{dsn}->{n}
. " to catch up\n";
}
MKDEBUG && _d('All slaves caught up');
return 1;
}