PT-1869: Enable slave list reloading (#456)

* PT-1869: Enable slave list reloading

* PT-1869: Fix pt-osc/slave_lag sample sizes for more consistent testing results

* PT-1869: Move slaves_to_skip to get_slaves_cb
This commit is contained in:
Mateus Dubiela Oliveira
2020-08-12 11:30:56 -03:00
committed by GitHub
parent f9b510e22f
commit d6ada6a7bf
4 changed files with 213 additions and 84 deletions

View File

@@ -29,22 +29,22 @@ use constant PTDEBUG => $ENV{PTDEBUG} || 0;
# Sub: check_recursion_method
# Check that the arrayref of recursion methods passed in is valid
sub check_recursion_method {
sub check_recursion_method {
my ($methods) = @_;
if ( @$methods != 1 ) {
if ( grep({ !m/processlist|hosts/i } @$methods)
&& $methods->[0] !~ /^dsn=/i )
{
die "Invalid combination of recursion methods: "
. join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". "
. "Only hosts and processlist may be combined.\n"
}
}
else {
if ( @$methods != 1 ) {
if ( grep({ !m/processlist|hosts/i } @$methods)
&& $methods->[0] !~ /^dsn=/i )
{
die "Invalid combination of recursion methods: "
. join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". "
. "Only hosts and processlist may be combined.\n"
}
}
else {
my ($method) = @$methods;
die "Invalid recursion method: " . ( $method || 'undef' )
unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i;
}
die "Invalid recursion method: " . ( $method || 'undef' )
unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i;
}
}
sub new {
@@ -73,7 +73,7 @@ sub get_slaves {
my $methods = $self->_resolve_recursion_methods($args{dsn});
return $slaves unless @$methods;
if ( grep { m/processlist|hosts/i } @$methods ) {
my @required_args = qw(dbh dsn);
foreach my $arg ( @required_args ) {
@@ -86,7 +86,7 @@ sub get_slaves {
{ dbh => $dbh,
dsn => $dsn,
slave_user => $o->got('slave-user') ? $o->get('slave-user') : '',
slave_password => $o->got('slave-password') ? $o->get('slave-password') : '',
slave_password => $o->got('slave-password') ? $o->get('slave-password') : '',
callback => sub {
my ( $dsn, $dbh, $level, $parent ) = @_;
return unless $level;
@@ -118,7 +118,7 @@ sub get_slaves {
else {
die "Unexpected recursion methods: @$methods";
}
return $slaves;
}
@@ -798,7 +798,7 @@ sub short_host {
# Returns:
# True if the proclist item is the given type of replication thread.
sub is_replication_thread {
my ( $self, $query, %args ) = @_;
my ( $self, $query, %args ) = @_;
return unless $query;
my $type = lc($args{type} || 'all');
@@ -814,7 +814,7 @@ sub is_replication_thread {
# On a slave, there are two threads. Both have user="system user".
if ( ($query->{User} || $query->{user} || '') eq "system user" ) {
PTDEBUG && _d("Slave replication thread");
if ( $type ne 'all' ) {
if ( $type ne 'all' ) {
# Match a particular slave thread.
my $state = $query->{State} || $query->{state} || '';
@@ -831,7 +831,7 @@ sub is_replication_thread {
|Reading\sevent\sfrom\sthe\srelay\slog
|Has\sread\sall\srelay\slog;\swaiting
|Making\stemp\sfile
|Waiting\sfor\sslave\smutex\son\sexit)/xi;
|Waiting\sfor\sslave\smutex\son\sexit)/xi;
# Type is either "slave_sql" or "slave_io". The second line
# implies that if this isn't the sql thread then it must be
@@ -919,7 +919,7 @@ sub get_replication_filters {
replicate_do_db
replicate_ignore_db
replicate_do_table
replicate_ignore_table
replicate_ignore_table
replicate_wild_do_table
replicate_wild_ignore_table
);
@@ -931,7 +931,7 @@ sub get_replication_filters {
$filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
}
return \%filters;
return \%filters;
}

View File

@@ -40,7 +40,7 @@ use Data::Dumper;
# slaves - Arrayref of <Cxn> objects
#
# Returns:
# ReplicaLagWaiter object
# ReplicaLagWaiter object
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(oktorun get_lag sleep max_lag slaves);
@@ -80,6 +80,26 @@ sub wait {
my $worst; # most lagging slave
my $pr_callback;
my $pr_first_report;
### refresh list of slaves. In: self passed to wait()
### Returns: new slave list
my $pr_refresh_slave_list = sub {
my ($self) = @_;
my ($slaves, $refresher) = ($self->{slaves}, $self->{get_slaves_cb});
return $slaves if ( not defined $refresher );
my $before = join ' ', sort map {$_->name()} @$slaves;
$slaves = $refresher->();
my $after = join ' ', sort map {$_->name()} @$slaves;
if ($before ne $after) {
$self->{slaves} = $slaves;
printf STDERR "Slave set to watch has changed\n Was: %s\n Now: %s\n",
$before, $after;
}
return($self->{slaves});
};
$slaves = $pr_refresh_slave_list->($self);
if ( $pr ) {
# If you use the default Progress report callback, you'll need to
# to add Transformers.pm to this tool.
@@ -116,11 +136,26 @@ sub wait {
}
# First check all slaves.
my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves;
my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves;
while ( $oktorun->() && @lagged_slaves ) {
PTDEBUG && _d('Checking slave lag');
### while we were waiting our list of slaves may have changed
$slaves = $pr_refresh_slave_list->($self);
my $watched = 0;
@lagged_slaves = grep {
my $slave_name = $_->{cxn}->name();
grep {$slave_name eq $_->name()} @{$slaves // []}
} @lagged_slaves;
for my $i ( 0..$#lagged_slaves ) {
my $lag = $get_lag->($lagged_slaves[$i]->{cxn});
my $lag;
eval {
$lag = $get_lag->($lagged_slaves[$i]->{cxn});
};
if ($EVAL_ERROR) {
die $EVAL_ERROR;
}
PTDEBUG && _d($lagged_slaves[$i]->{cxn}->name(),
'slave lag:', $lag);
if ( !defined $lag || $lag > $max_lag ) {