diff --git a/bin/pt-online-schema-change b/bin/pt-online-schema-change index a822f14b..8476204f 100755 --- a/bin/pt-online-schema-change +++ b/bin/pt-online-schema-change @@ -4255,7 +4255,7 @@ sub get_slaves { else { die "Unexpected recursion methods: @$methods"; } - + return $slaves; } @@ -5015,10 +5015,32 @@ sub wait { my $worst; # most lagging slave my $pr_callback; my $pr_first_report; + + ### refresh list of slaves. In: self passed to wait() + ### Returns: new slave list + my $pr_refresh_slave_list = sub { + my ($self) = @_; + my ($slaves, $refresher) = ($self->{slaves}, $self->{get_slaves_cb}); + return $slaves if ( not defined $refresher ); + my $before = join ' ', sort map {$_->name()} @$slaves; + $slaves = $refresher->(); + my $after = join ' ', sort map {$_->name()} @$slaves; + if ($before ne $after) { + $self->{slaves} = $slaves; + printf STDERR "Slave set to watch has changed\n Was: %s\n Now: %s\n", + $before, $after; + } + return($self->{slaves}); + }; + + $slaves = $pr_refresh_slave_list->($self); + if ( $pr ) { + # If you use the default Progress report callback, you'll need to + # to add Transformers.pm to this tool. $pr_callback = sub { my ($fraction, $elapsed, $remaining, $eta, $completed) = @_; - my $dsn_name = $worst->{cxn}->{dsn_name}; + my $dsn_name = $worst->{cxn}->name(); if ( defined $worst->{lag} ) { print STDERR "Replica lag is " . ($worst->{lag} || '?') . " seconds on $dsn_name. Waiting.\n"; @@ -5033,21 +5055,34 @@ sub wait { }; $pr->set_callback($pr_callback); + # If a replic is stopped, don't wait 30s (or whatever interval) + # to report this. Instead, report it once, immediately, then + # keep reporting it every interval. $pr_first_report = sub { - my $dsn_name = $worst->{cxn}->{dsn_name}; + my $dsn_name = $worst->{cxn}->name(); if ( !defined $worst->{lag} ) { if ($self->{fail_on_stopped_replication}) { die 'replication is stopped'; } - print STDERR "(2) Replica $dsn_name is stopped. Waiting.\n"; + print STDERR "(2) Replica '$dsn_name' is stopped. Waiting.\n"; } return; }; } - my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves; + # First check all slaves. + my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves; while ( $oktorun->() && @lagged_slaves ) { PTDEBUG && _d('Checking slave lag'); + + ### while we were waiting our list of slaves may have changed + $slaves = $pr_refresh_slave_list->($self); + my $watched = 0; + @lagged_slaves = grep { + my $slave_name = $_->{cxn}->name(); + grep {$slave_name eq $_->name()} @{$slaves // []} + } @lagged_slaves; + for my $i ( 0..$#lagged_slaves ) { my $lag; eval { @@ -5066,8 +5101,10 @@ sub wait { } } + # Remove slaves that aren't lagging. @lagged_slaves = grep { defined $_ } @lagged_slaves; if ( @lagged_slaves ) { + # Sort lag, undef is highest because it means the slave is stopped. @lagged_slaves = reverse sort { defined $a->{lag} && defined $b->{lag} ? $a->{lag} <=> $b->{lag} : defined $a->{lag} ? -1 @@ -5078,6 +5115,10 @@ sub wait { $worst->{lag}, 'on', Dumper($worst->{cxn}->dsn())); if ( $pr ) { + # There's no real progress because we can't estimate how long + # it will take all slaves to catch up. The progress reports + # are just to inform the user every 30s which slave is still + # lagging this most. $pr->update( sub { return 0; }, first_report => $pr_first_report, @@ -8759,13 +8800,42 @@ sub main { channel => $o->get('channel'), ); - $slaves = $ms->get_slaves( - dbh => $cxn->dbh(), - dsn => $cxn->dsn(), - make_cxn => sub { - return $make_cxn->(@_, prev_dsn => $cxn->dsn()); - }, - ); + my $slaves_to_skip = $o->get('skip-check-slave-lag'); + + my $get_slaves_cb = sub { + my ($intolerant) = @_; + my $slaves =$ms->get_slaves( + dbh => $cxn->dbh(), + dsn => $cxn->dsn(), + make_cxn => sub { + return $make_cxn->( + @_, + prev_dsn => $cxn->dsn(), + errok => (not $intolerant) + ); + }, + ); + + if ($slaves_to_skip) { + my $filtered_slaves = []; + for my $slave (@$slaves) { + for my $slave_to_skip (@$slaves_to_skip) { + if ($slave->{dsn}->{h} eq $slave_to_skip->{h} && $slave->{dsn}->{P} eq $slave_to_skip->{P}) { + print "Skipping slave " . $slave->description() . "\n"; + } else { + push @$filtered_slaves, $slave; + } + } + } + $slaves = $filtered_slaves; + } + + return $slaves; + }; + + ### first ever call only: do not tolerate connection errors + $slaves = $get_slaves_cb->('intolerant'); + PTDEBUG && _d(scalar @$slaves, 'slaves found'); if ( scalar @$slaves ) { print "Found " . scalar(@$slaves) . " slaves:\n"; @@ -8789,6 +8859,7 @@ sub main { #prev_dsn => $cxn->dsn(), ); $slave_lag_cxns = [ $cxn ]; + $get_slaves_cb = undef; } else { PTDEBUG && _d('Will check slave lag on all slaves'); @@ -8796,31 +8867,9 @@ sub main { } if ( $slave_lag_cxns && scalar @$slave_lag_cxns ) { - if ($o->get('skip-check-slave-lag')) { - my $slaves_to_skip = $o->get('skip-check-slave-lag'); - my $filtered_slaves = []; - for my $slave (@$slave_lag_cxns) { - my $found=0; - for my $slave_to_skip (@$slaves_to_skip) { - if ($slave->{dsn}->{h} eq $slave_to_skip->{h} && $slave->{dsn}->{P} eq $slave_to_skip->{P}) { - $found=1; - } - } - if ($found) { - print "Skipping slave ". $slave->description()."\n"; - } else { - push @$filtered_slaves, $slave; - } - } - $slave_lag_cxns = $filtered_slaves; - } - if (!scalar @$slave_lag_cxns) { - print "Not checking slave lag because all slaves were skipped\n"; - } else{ - print "Will check slave lag on:\n"; - foreach my $cxn ( @$slave_lag_cxns ) { - print $cxn->description()."\n"; - } + print "Will check slave lag on:\n"; + foreach my $cxn ( @$slave_lag_cxns ) { + print $cxn->description()."\n"; } } else { @@ -8931,11 +8980,12 @@ sub main { } $replica_lag = new ReplicaLagWaiter( - slaves => $slave_lag_cxns, - max_lag => $o->get('max-lag'), - oktorun => sub { return $oktorun }, - get_lag => $get_lag, - sleep => $sleep, + slaves => $slave_lag_cxns, + get_slaves_cb => $get_slaves_cb, + max_lag => $o->get('max-lag'), + oktorun => sub { return $oktorun }, + get_lag => $get_lag, + sleep => $sleep, ); my $get_status; diff --git a/lib/MasterSlave.pm b/lib/MasterSlave.pm index 59abd93d..2c503821 100644 --- a/lib/MasterSlave.pm +++ b/lib/MasterSlave.pm @@ -29,22 +29,22 @@ use constant PTDEBUG => $ENV{PTDEBUG} || 0; # Sub: check_recursion_method # Check that the arrayref of recursion methods passed in is valid -sub check_recursion_method { +sub check_recursion_method { my ($methods) = @_; - if ( @$methods != 1 ) { - if ( grep({ !m/processlist|hosts/i } @$methods) - && $methods->[0] !~ /^dsn=/i ) - { - die "Invalid combination of recursion methods: " - . join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". " - . "Only hosts and processlist may be combined.\n" - } - } - else { + if ( @$methods != 1 ) { + if ( grep({ !m/processlist|hosts/i } @$methods) + && $methods->[0] !~ /^dsn=/i ) + { + die "Invalid combination of recursion methods: " + . join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". " + . "Only hosts and processlist may be combined.\n" + } + } + else { my ($method) = @$methods; - die "Invalid recursion method: " . ( $method || 'undef' ) - unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i; - } + die "Invalid recursion method: " . ( $method || 'undef' ) + unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i; + } } sub new { @@ -73,7 +73,7 @@ sub get_slaves { my $methods = $self->_resolve_recursion_methods($args{dsn}); return $slaves unless @$methods; - + if ( grep { m/processlist|hosts/i } @$methods ) { my @required_args = qw(dbh dsn); foreach my $arg ( @required_args ) { @@ -86,7 +86,7 @@ sub get_slaves { { dbh => $dbh, dsn => $dsn, slave_user => $o->got('slave-user') ? $o->get('slave-user') : '', - slave_password => $o->got('slave-password') ? $o->get('slave-password') : '', + slave_password => $o->got('slave-password') ? $o->get('slave-password') : '', callback => sub { my ( $dsn, $dbh, $level, $parent ) = @_; return unless $level; @@ -118,7 +118,7 @@ sub get_slaves { else { die "Unexpected recursion methods: @$methods"; } - + return $slaves; } @@ -798,7 +798,7 @@ sub short_host { # Returns: # True if the proclist item is the given type of replication thread. sub is_replication_thread { - my ( $self, $query, %args ) = @_; + my ( $self, $query, %args ) = @_; return unless $query; my $type = lc($args{type} || 'all'); @@ -814,7 +814,7 @@ sub is_replication_thread { # On a slave, there are two threads. Both have user="system user". if ( ($query->{User} || $query->{user} || '') eq "system user" ) { PTDEBUG && _d("Slave replication thread"); - if ( $type ne 'all' ) { + if ( $type ne 'all' ) { # Match a particular slave thread. my $state = $query->{State} || $query->{state} || ''; @@ -831,7 +831,7 @@ sub is_replication_thread { |Reading\sevent\sfrom\sthe\srelay\slog |Has\sread\sall\srelay\slog;\swaiting |Making\stemp\sfile - |Waiting\sfor\sslave\smutex\son\sexit)/xi; + |Waiting\sfor\sslave\smutex\son\sexit)/xi; # Type is either "slave_sql" or "slave_io". The second line # implies that if this isn't the sql thread then it must be @@ -919,7 +919,7 @@ sub get_replication_filters { replicate_do_db replicate_ignore_db replicate_do_table - replicate_ignore_table + replicate_ignore_table replicate_wild_do_table replicate_wild_ignore_table ); @@ -931,7 +931,7 @@ sub get_replication_filters { $filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF'; } - return \%filters; + return \%filters; } diff --git a/lib/ReplicaLagWaiter.pm b/lib/ReplicaLagWaiter.pm index 9ab4789f..0e4c7097 100644 --- a/lib/ReplicaLagWaiter.pm +++ b/lib/ReplicaLagWaiter.pm @@ -40,7 +40,7 @@ use Data::Dumper; # slaves - Arrayref of objects # # Returns: -# ReplicaLagWaiter object +# ReplicaLagWaiter object sub new { my ( $class, %args ) = @_; my @required_args = qw(oktorun get_lag sleep max_lag slaves); @@ -80,6 +80,26 @@ sub wait { my $worst; # most lagging slave my $pr_callback; my $pr_first_report; + + ### refresh list of slaves. In: self passed to wait() + ### Returns: new slave list + my $pr_refresh_slave_list = sub { + my ($self) = @_; + my ($slaves, $refresher) = ($self->{slaves}, $self->{get_slaves_cb}); + return $slaves if ( not defined $refresher ); + my $before = join ' ', sort map {$_->name()} @$slaves; + $slaves = $refresher->(); + my $after = join ' ', sort map {$_->name()} @$slaves; + if ($before ne $after) { + $self->{slaves} = $slaves; + printf STDERR "Slave set to watch has changed\n Was: %s\n Now: %s\n", + $before, $after; + } + return($self->{slaves}); + }; + + $slaves = $pr_refresh_slave_list->($self); + if ( $pr ) { # If you use the default Progress report callback, you'll need to # to add Transformers.pm to this tool. @@ -116,11 +136,26 @@ sub wait { } # First check all slaves. - my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves; + my @lagged_slaves = map { {cxn=>$_, lag=>undef} } @$slaves; while ( $oktorun->() && @lagged_slaves ) { PTDEBUG && _d('Checking slave lag'); + + ### while we were waiting our list of slaves may have changed + $slaves = $pr_refresh_slave_list->($self); + my $watched = 0; + @lagged_slaves = grep { + my $slave_name = $_->{cxn}->name(); + grep {$slave_name eq $_->name()} @{$slaves // []} + } @lagged_slaves; + for my $i ( 0..$#lagged_slaves ) { - my $lag = $get_lag->($lagged_slaves[$i]->{cxn}); + my $lag; + eval { + $lag = $get_lag->($lagged_slaves[$i]->{cxn}); + }; + if ($EVAL_ERROR) { + die $EVAL_ERROR; + } PTDEBUG && _d($lagged_slaves[$i]->{cxn}->name(), 'slave lag:', $lag); if ( !defined $lag || $lag > $max_lag ) { diff --git a/t/pt-online-schema-change/slave_lag.t_ b/t/pt-online-schema-change/slave_lag.t_ index d5da5dbb..0fe97812 100644 --- a/t/pt-online-schema-change/slave_lag.t_ +++ b/t/pt-online-schema-change/slave_lag.t_ @@ -17,12 +17,12 @@ use Data::Dumper; use PerconaTest; use Sandbox; use SqlModes; -use File::Temp qw/ tempdir /; +use File::Temp qw/ tempdir tempfile /; if ($ENV{PERCONA_SLOW_BOX}) { plan skip_all => 'This test needs a fast machine'; } else { - plan tests => 4; + plan tests => 6; } our $delay = 30; @@ -37,6 +37,7 @@ my $sb = new Sandbox(basedir => '/tmp', DSNParser => $dp); my $master_dbh = $sb->get_dbh_for('master'); my $slave_dbh = $sb->get_dbh_for('slave1'); my $master_dsn = 'h=127.0.0.1,P=12345,u=msandbox,p=msandbox'; +my $slave_dsn = 'h=127.0.0.1,P=12346,u=msandbox,p=msandbox'; if ( !$master_dbh ) { plan skip_all => 'Cannot connect to sandbox master'; @@ -58,29 +59,29 @@ $slave_dbh->do('STOP SLAVE'); $slave_dbh->do('RESET SLAVE'); $slave_dbh->do('START SLAVE'); +diag('Loading test data'); +$sb->load_file('master', "t/pt-online-schema-change/samples/slave_lag.sql"); + +my $num_rows = 5000; +diag("Loading $num_rows into the table. This might take some time."); +diag(`util/mysql_random_data_load --host=127.0.0.1 --port=12345 --user=msandbox --password=msandbox test pt178 --bulk-size=1 --max-threads=1 $num_rows`); + diag("Setting slave delay to $delay seconds"); $slave_dbh->do('STOP SLAVE'); $slave_dbh->do("CHANGE MASTER TO MASTER_DELAY=$delay"); $slave_dbh->do('START SLAVE'); -diag('Loading test data'); -$sb->load_file('master', "t/pt-online-schema-change/samples/slave_lag.sql"); - -my $num_rows = 10000; -diag("Loading $num_rows into the table. This might take some time."); -diag(`util/mysql_random_data_load --host=127.0.0.1 --port=12345 --user=msandbox --password=msandbox test pt178 $num_rows`); - # Run a full table scan query to ensure the slave is behind the master # There is no query cache in MySQL 8.0+ reset_query_cache($master_dbh, $master_dbh); -$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); +$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); # This is the base test, ust to ensure that without using --check-slave-lag nor --skip-check-slave-lag # pt-online-schema-change will wait on the slave at port 12346 my $max_lag = $delay / 2; -my $args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 1 --max-lag 5 --alter 'ENGINE=InnoDB' --pid $tmp_file_name"; +my $args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 10 --max-lag $max_lag --alter 'ENGINE=InnoDB' --pid $tmp_file_name"; diag("Starting base test. This is going to take some time due to the delay in the slave"); diag("pid: $tmp_file_name"); my $output = `$trunk/bin/pt-online-schema-change $args 2>&1`; @@ -92,12 +93,12 @@ like( ); # Repeat the test now using --check-slave-lag -$args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 1 --max-lag 5 --alter 'ENGINE=InnoDB' " - . "--check-slave-lag h=127.0.0.1,P=12346,u=msandbox,p=msandbox,D=test,t=sbtest"; +$args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 1 --max-lag $max_lag --alter 'ENGINE=InnoDB' " + . "--check-slave-lag h=127.0.0.1,P=12346,u=msandbox,p=msandbox,D=test,t=sbtest --pid $tmp_file_name"; # Run a full table scan query to ensure the slave is behind the master reset_query_cache($master_dbh, $master_dbh); -$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); +$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); diag("Starting --check-slave-lag test. This is going to take some time due to the delay in the slave"); $output = `$trunk/bin/pt-online-schema-change $args 2>&1`; @@ -108,13 +109,56 @@ like( "--check-slave-lag waits on the correct slave", ); +# Repeat the test new adding and removing a slave during the process +$args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 1 --max-lag $max_lag --alter 'ENGINE=InnoDB' " + . "--recursion-method=dsn=D=test,t=dynamic_replicas --recurse 0 --pid $tmp_file_name"; + +$master_dbh->do('CREATE TABLE `test`.`dynamic_replicas` (id INTEGER PRIMARY KEY, dsn VARCHAR(255) )'); +$master_dbh->do("INSERT INTO `test`.`dynamic_replicas` (id, dsn) VALUES (1, '$slave_dsn')"); + +# Run a full table scan query to ensure the slave is behind the master +reset_query_cache($master_dbh, $master_dbh); +$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); + +diag("Starting --recursion-method with changes during the process"); +my ($fh, $filename) = tempfile(); +my $pid = fork(); + +if (!$pid) { + open(STDERR, '>', $filename); + open(STDOUT, '>', $filename); + exec("$trunk/bin/pt-online-schema-change $args"); +} + +sleep(60); +$master_dbh->do("DELETE FROM `test`.`dynamic_replicas` WHERE id = 1;"); +waitpid($pid, 0); +$output = do { + local $/ = undef; + <$fh>; +}; + +unlink $filename; + +like( + $output, + qr/Slave set to watch has changed/s, + "--recursion-method=dsn updates the slave list", +); + +like( + $output, + qr/Replica lag is \d+ seconds on .* Waiting/s, + "--recursion-method waits on a replica", +); + # Repeat the test now using --skip-check-slave-lag # Run a full table scan query to ensure the slave is behind the master reset_query_cache($master_dbh, $master_dbh); -$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); +$master_dbh->do('UPDATE `test`.`pt178` SET f2 = f2 + 1 WHERE f1 = ""'); -$args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 1 --max-lag 5 --alter 'ENGINE=InnoDB' " - . "--skip-check-slave-lag h=127.0.0.1,P=12346,u=msandbox,p=msandbox,D=test,t=sbtest"; +$args = "$master_dsn,D=test,t=pt178 --execute --chunk-size 1 --max-lag $max_lag --alter 'ENGINE=InnoDB' " + . "--skip-check-slave-lag h=127.0.0.1,P=12346,u=msandbox,p=msandbox,D=test,t=sbtest --pid $tmp_file_name"; diag("Starting --skip-check-slave-lag test. This is going to take some time due to the delay in the slave"); $output = `$trunk/bin/pt-online-schema-change $args 2>&1`;