mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-19 02:05:23 +00:00
Rewrite ReplicaLagLimiter::wait().
This commit is contained in:
@@ -37,16 +37,19 @@ use English qw(-no_match_vars);
|
||||
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
|
||||
|
||||
use Time::HiRes qw(sleep time);
|
||||
use Data::Dumper;
|
||||
|
||||
# Sub: new
|
||||
#
|
||||
# Required Arguments:
|
||||
# spec - --replicat-lag spec (arrayref of option=value pairs)
|
||||
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
|
||||
# get_lag - Callback passed slave dbh and returns slave's lag
|
||||
# initial_n - Initial n value for <update()>
|
||||
# initial_t - Initial t value for <update()>
|
||||
# target_t - Target time for t in <update()>
|
||||
# oktorun - Callback that returns true if it's ok to continue running
|
||||
# get_lag - Callback passed slave dbh and returns slave's lag
|
||||
# sleep - Callback to sleep between checking lag.
|
||||
# max_lag - Max lag
|
||||
# slaves - Arrayref of slave cxn, like [{dsn=>{...}, dbh=>...},...]
|
||||
# initial_n - Initial n value for <update()>
|
||||
# initial_t - Initial t value for <update()>
|
||||
# target_t - Target time for t in <update()>
|
||||
#
|
||||
# Optional Arguments:
|
||||
# weight - Weight of previous n/t values (default 0.75).
|
||||
@@ -55,65 +58,21 @@ use Time::HiRes qw(sleep time);
|
||||
# ReplicaLagLimiter object
|
||||
sub new {
|
||||
my ( $class, %args ) = @_;
|
||||
my @required_args = qw(spec slaves get_lag initial_n initial_t target_t);
|
||||
my @required_args = qw(oktorun get_lag sleep max_lag slaves initial_n initial_t target_t);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless defined $args{$arg};
|
||||
}
|
||||
my ($spec) = @args{@required_args};
|
||||
|
||||
my %specs = map {
|
||||
my ($key, $val) = split '=', $_;
|
||||
MKDEBUG && _d($key, '=', $val);
|
||||
lc($key) => $val;
|
||||
} @$spec;
|
||||
|
||||
my $self = {
|
||||
max => 1, # max slave lag
|
||||
timeout => 3600, # max time to wait for all slaves to catch up
|
||||
check => 1, # sleep time between checking slave lag
|
||||
continue => 'no', # return true even if timeout
|
||||
%specs, # slave wait specs from caller
|
||||
slaves => $args{slaves},
|
||||
get_lag => $args{get_lag},
|
||||
avg_n => $args{initial_n},
|
||||
avg_t => $args{initial_t},
|
||||
target_t => $args{target_t},
|
||||
weight => $args{weight} || 0.75,
|
||||
%args,
|
||||
avg_n => $args{initial_n},
|
||||
avg_t => $args{initial_t},
|
||||
weight => $args{weight} || 0.75,
|
||||
};
|
||||
|
||||
return bless $self, $class;
|
||||
}
|
||||
|
||||
sub validate_spec {
|
||||
# Permit calling as ReplicaLagLimiter-> or ReplicaLagLimiter::
|
||||
shift @_ if $_[0] eq 'ReplicaLagLimiter';
|
||||
my ( $spec ) = @_;
|
||||
if ( @$spec == 0 ) {
|
||||
die "spec array requires at least a max value\n";
|
||||
}
|
||||
my $have_max;
|
||||
foreach my $op ( @$spec ) {
|
||||
my ($key, $val) = split '=', $op;
|
||||
if ( !$key || !$val ) {
|
||||
die "invalid spec format, should be option=value: $op\n";
|
||||
}
|
||||
if ( $key !~ m/(?:max|timeout|continue)/i ) {
|
||||
die "unknown option in spec: $op\n";
|
||||
}
|
||||
if ( $key ne 'continue' && $val !~ m/^\d+$/ ) {
|
||||
die "value must be an integer: $op\n";
|
||||
}
|
||||
if ( $key eq 'continue' && $val !~ m/(?:yes|no)/i ) {
|
||||
die "value for $key must be \"yes\" or \"no\"\n";
|
||||
}
|
||||
$have_max = 1 if $key eq 'max';
|
||||
}
|
||||
if ( !$have_max ) {
|
||||
die "max must be specified"
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
# Sub: update
|
||||
# Update weighted decaying average of master operation time. Param n is
|
||||
# generic; it's how many of whatever the caller is doing (rows, checksums,
|
||||
@@ -139,7 +98,7 @@ sub update {
|
||||
return $new_n;
|
||||
}
|
||||
|
||||
# Sub: wait_for_slave
|
||||
# Sub: wait
|
||||
# Wait for Seconds_Behind_Master on all slaves to become < max.
|
||||
#
|
||||
# Optional Arguments:
|
||||
@@ -153,62 +112,76 @@ sub wait {
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my $pr = $args{Progres};
|
||||
my $get_lag = $self->{get_lag};
|
||||
my $slaves = $self->{slaves};
|
||||
my $n_slaves = @$slaves;
|
||||
my $pr = $args{Progress};
|
||||
|
||||
my $oktorun = $self->{oktorun};
|
||||
my $get_lag = $self->{get_lag};
|
||||
my $sleep = $self->{sleep};
|
||||
my $slaves = $self->{slaves};
|
||||
my $max_lag = $self->{max_lag};
|
||||
|
||||
my $worst; # most lagging slave
|
||||
my $pr_callback;
|
||||
if ( $pr ) {
|
||||
# If you use the default Progress report callback, you'll need to
|
||||
# to add Transformers.pm to this tool.
|
||||
my $reported = 0;
|
||||
$pr_callback = sub {
|
||||
my ($fraction, $elapsed, $remaining, $eta, $slave_no) = @_;
|
||||
if ( !$reported ) {
|
||||
print STDERR "Waiting for replica "
|
||||
. ($slaves->[$slave_no]->{dsn}->{n} || '')
|
||||
. " to catch up...\n";
|
||||
$reported = 1;
|
||||
my ($fraction, $elapsed, $remaining, $eta, $completed) = @_;
|
||||
if ( defined $worst->{lag} ) {
|
||||
print STDERR "Replica lag is $worst->{lag} seconds on "
|
||||
. "$worst->{dsn}->{n}. Waiting.\n";
|
||||
}
|
||||
else {
|
||||
print STDERR "Still waiting ($elapsed seconds)...\n";
|
||||
print STDERR "Replica $worst->{dsn}->{n} is stopped. Waiting.\n";
|
||||
}
|
||||
return;
|
||||
};
|
||||
$pr->set_callback($pr_callback);
|
||||
}
|
||||
|
||||
my ($max, $check, $timeout) = @{$self}{qw(max check timeout)};
|
||||
my $slave_no = 0;
|
||||
my $slave = $slaves->[$slave_no];
|
||||
my $t_start = time;
|
||||
while ($slave && time - $t_start < $timeout) {
|
||||
MKDEBUG && _d('Checking slave lag on', $slave->{dsn}->{n});
|
||||
my $lag = $get_lag->($slave->{dbh});
|
||||
if ( !defined $lag || $lag > $max ) {
|
||||
MKDEBUG && _d('Replica lag', $lag, '>', $max, '; sleeping', $check);
|
||||
$pr->update(sub { return $slave_no; }) if $pr;
|
||||
sleep $check;
|
||||
my @lagged_slaves = @$slaves; # first check all slaves
|
||||
while ( $oktorun->() && @lagged_slaves ) {
|
||||
MKDEBUG && _d('Checking slave lag');
|
||||
for my $i ( 0..$#lagged_slaves ) {
|
||||
my $slave = $lagged_slaves[$i];
|
||||
my $lag = $get_lag->($slave->{dbh});
|
||||
MKDEBUG && _d($slave->{dsn}->{n}, 'slave lag:', $lag);
|
||||
if ( !defined $lag || $lag > $max_lag ) {
|
||||
$slave->{lag} = $lag;
|
||||
}
|
||||
else {
|
||||
delete $lagged_slaves[$i];
|
||||
}
|
||||
}
|
||||
else {
|
||||
MKDEBUG && _d('Replica ready, lag', $lag, '<=', $max);
|
||||
$slave = $slaves->[++$slave_no];
|
||||
}
|
||||
}
|
||||
if ( $slave_no < @$slaves ) {
|
||||
if ( $self->{continue} eq 'no' ) {
|
||||
die "Timeout waiting for replica " . $slaves->[$slave_no]->{dsn}->{n}
|
||||
. " to catch up\n";
|
||||
}
|
||||
else {
|
||||
MKDEBUG && _d('Some slave are not caught up');
|
||||
return 0; # not ready
|
||||
|
||||
# Remove slaves that aren't lagging.
|
||||
@lagged_slaves = grep { defined $_ } @lagged_slaves;
|
||||
if ( @lagged_slaves ) {
|
||||
# Sort lag, undef is highest because it means the slave is stopped.
|
||||
@lagged_slaves = reverse sort {
|
||||
defined $a && defined $b ? $a <=> $b
|
||||
: defined $a ? -1
|
||||
: 1;
|
||||
} @lagged_slaves;
|
||||
$worst = $lagged_slaves[0];
|
||||
MKDEBUG && _d(scalar @lagged_slaves, 'slaves are lagging, worst:',
|
||||
Dumper($worst));
|
||||
|
||||
if ( $pr ) {
|
||||
# There's no real progress because we can't estimate how long
|
||||
# it will take all slaves to catch up. The progress reports
|
||||
# are just to inform the user every 30s which slave is still
|
||||
# lagging this most.
|
||||
$pr->update(sub { return 0; });
|
||||
}
|
||||
|
||||
MKDEBUG && _d('Calling sleep callback');
|
||||
$sleep->();
|
||||
}
|
||||
}
|
||||
|
||||
MKDEBUG && _d('All slaves caught up');
|
||||
return 1; # ready
|
||||
return;
|
||||
}
|
||||
|
||||
sub _d {
|
||||
|
Reference in New Issue
Block a user