Files
percona-toolkit/lib/MasterSlave.pm
Sveta Smirnova f8c43118bd PT-2168 pt-osc shouldnt fail while unable to monitor a replica node (#676)
* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Proof of concept
- Fixed regular expression in lib/TableParser.pm mistakenly chaged in the tool's code

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Added basic test case for PT-2168
- Added more details for replica lag information
- Disconnecting replica if lag is not checked. This prevents "Too many
  connections" error

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Implemented option --wait-lost-replicas for pt-osc, added test case

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Added more tests for situations where connection to the replica can
  fail

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Removed extra checks for wait_no_die variable
- Added test cases for SQL queries that pt-osc sends to replicas

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Allow to reload dsns table while waiting for missed replica if
  --recursion-method is dsn
- Fixed logic in replica rediscovery, so it works with replicas on the
  same host but with different ports
- Renamed option wait-lost-replicas to fail-on-stopped-replication, so
  it is in line with pt-table-checksum
- Adjusted tests
- Removed debug code for PT-1760
- Added test case for PT-1760
- Added exception for variable Open_tables_with_triggers in
  lib/bash/collect.sh due to failed test in Percona Server 8.0.34+
- Updated pt-stalk

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Updated modules
- Fixed typo in t/pt-table-sync/bidirectional.t
- Removed trailing whitespaces in lib/MasterSlave.pm

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Help for option --fail-on-stopped-replication

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Added check for availability of the simple_rewrite_plugin in t/pt-online-schema-change/pt-2168.t

* PT-2168 - PT-OSC shouldn't fail while unable to monitor a replica node

- Added link to the simple_rewrite_plugin source code
- Removed tests for code that runs only in the beginning of pt-osc
  action, so should not be affected by the option fail-on-stopped-replication
2023-09-19 23:31:33 +03:00

1068 lines
36 KiB
Perl

# This program is copyright 2007-2011 Baron Schwartz, 2011-2012 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# MasterSlave package
# ###########################################################################
{
# Package: MasterSlave
# MasterSlave handles common tasks related to master-slave setups.
package MasterSlave;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
# Sub: check_recursion_method
# Check that the arrayref of recursion methods passed in is valid
sub check_recursion_method {
my ($methods) = @_;
if ( @$methods != 1 ) {
if ( grep({ !m/processlist|hosts/i } @$methods)
&& $methods->[0] !~ /^dsn=/i )
{
die "Invalid combination of recursion methods: "
. join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". "
. "Only hosts and processlist may be combined.\n"
}
}
else {
my ($method) = @$methods;
die "Invalid recursion method: " . ( $method || 'undef' )
unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i;
}
}
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(OptionParser DSNParser Quoter);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $self = {
%args,
replication_thread => {},
};
return bless $self, $class;
}
sub get_slaves {
my ($self, %args) = @_;
my @required_args = qw(make_cxn);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($make_cxn) = @args{@required_args};
my $slaves = [];
my $dp = $self->{DSNParser};
my $methods = $self->_resolve_recursion_methods($args{dsn});
return $slaves unless @$methods;
if ( grep { m/processlist|hosts/i } @$methods ) {
my @required_args = qw(dbh dsn);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($dbh, $dsn) = @args{@required_args};
my $o = $self->{OptionParser};
$self->recurse_to_slaves(
{ dbh => $dbh,
dsn => $dsn,
slave_user => $o->got('slave-user') ? $o->get('slave-user') : '',
slave_password => $o->got('slave-password') ? $o->get('slave-password') : '',
slaves => $args{slaves},
callback => sub {
my ( $dsn, $dbh, $level, $parent ) = @_;
return unless $level;
PTDEBUG && _d('Found slave:', $dp->as_string($dsn));
my $slave_dsn = $dsn;
if ($o->got('slave-user')) {
$slave_dsn->{u} = $o->get('slave-user');
PTDEBUG && _d("Using slave user ".$o->get('slave-user')." on ".$slave_dsn->{h}.":".$slave_dsn->{P});
}
if ($o->got('slave-password')) {
$slave_dsn->{p} = $o->get('slave-password');
PTDEBUG && _d("Slave password set");
}
push @$slaves, $make_cxn->(dsn => $slave_dsn, dbh => $dbh, parent => $parent);
return;
},
wait_no_die => $args{'wait_no_die'},
}
);
} elsif ( $methods->[0] =~ m/^dsn=/i ) {
(my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i;
$slaves = $self->get_cxn_from_dsn_table(
%args,
dsn_table_dsn => $dsn_table_dsn,
wait_no_die => $args{'wait_no_die'},
);
}
elsif ( $methods->[0] =~ m/none/i ) {
PTDEBUG && _d('Not getting to slaves');
}
else {
die "Unexpected recursion methods: @$methods";
}
return $slaves;
}
sub _resolve_recursion_methods {
my ($self, $dsn) = @_;
my $o = $self->{OptionParser};
if ( $o->got('recursion-method') ) {
return $o->get('recursion-method');
}
elsif ( $dsn && ($dsn->{P} || 3306) != 3306 ) {
# Special case: hosts is best when port is non-standard.
PTDEBUG && _d('Port number is non-standard; using only hosts method');
return [qw(hosts)];
}
else {
# Use the option's default.
return $o->get('recursion-method');
}
}
# Sub: recurse_to_slaves
# Descend to slaves by examining SHOW SLAVE HOSTS.
# The callback gets the slave's DSN, dbh, parent, and the recursion level
# as args. The recursion is tail recursion.
#
# Parameters:
# $args - Hashref of arguments
# $level - Recursion level
#
# Required Arguments:
# dsn - The DSN to connect to; if no dbh arg, connect using this.
# recurse - How many levels to recurse. 0 = none, undef = infinite.
# callback - Code to execute after finding a new slave.
# dsn_parser - <DSNParser> object
#
# Optional Arguments:
# dbh - dbh
# skip_callback - Execute with slaves that will be skipped.
# method - Whether to prefer HOSTS over PROCESSLIST
# parent - The DSN from which this call descended.
sub recurse_to_slaves {
my ( $self, $args, $level ) = @_;
$level ||= 0;
my $dp = $self->{DSNParser};
my $recurse = $args->{recurse} || $self->{OptionParser}->get('recurse');
my $dsn = $args->{dsn};
my $slave_user = $args->{slave_user} || '';
my $slave_password = $args->{slave_password} || '';
my $methods = $self->_resolve_recursion_methods($dsn);
PTDEBUG && _d('Recursion methods:', @$methods);
if ( lc($methods->[0]) eq 'none' ) {
PTDEBUG && _d('Not recursing to slaves');
return;
}
my $slave_dsn = $dsn;
if ($slave_user) {
$slave_dsn->{u} = $slave_user;
PTDEBUG && _d("Using slave user $slave_user on ".$slave_dsn->{h}.":".$slave_dsn->{P});
}
if ($slave_password) {
$slave_dsn->{p} = $slave_password;
PTDEBUG && _d("Slave password set");
}
my $dbh = $args->{dbh};
my $get_dbh = sub {
eval {
$dbh = $dp->get_dbh(
$dp->get_cxn_params($slave_dsn), { AutoCommit => 1 }
);
PTDEBUG && _d('Connected to', $dp->as_string($slave_dsn));
};
if ( $EVAL_ERROR ) {
print STDERR "Cannot connect to ", $dp->as_string($slave_dsn), ": ", $EVAL_ERROR, "\n"
or die "Cannot print: $OS_ERROR";
return;
}
};
DBH: {
if ( !defined $dbh ) {
foreach my $known_slave ( @{$args->{slaves}} ) {
if ($known_slave->{dsn}->{h} eq $slave_dsn->{h} and
$known_slave->{dsn}->{P} eq $slave_dsn->{P} ) {
$dbh = $known_slave->{dbh};
last DBH;
}
}
$get_dbh->();
}
}
my $sql = 'SELECT @@SERVER_ID';
PTDEBUG && _d($sql);
my $id = undef;
do {
eval {
($id) = $dbh->selectrow_array($sql);
};
if ( $EVAL_ERROR ) {
if ( $args->{wait_no_die} ) {
print STDERR "Error getting server id: ", $EVAL_ERROR,
"\nRetrying query for server ", $slave_dsn->{h}, ":", $slave_dsn->{P}, "\n";
sleep 1;
$dbh->disconnect();
$get_dbh->();
} else {
die $EVAL_ERROR;
}
}
} until ($id);
PTDEBUG && _d('Working on server ID', $id);
my $master_thinks_i_am = $dsn->{server_id};
if ( !defined $id
|| ( defined $master_thinks_i_am && $master_thinks_i_am != $id )
|| $args->{server_ids_seen}->{$id}++
) {
PTDEBUG && _d('Server ID seen, or not what master said');
if ( $args->{skip_callback} ) {
$args->{skip_callback}->($dsn, $dbh, $level, $args->{parent});
}
return;
}
$args->{callback}->($dsn, $dbh, $level, $args->{parent});
if ( !defined $recurse || $level < $recurse ) {
my @slaves =
grep { !$_->{master_id} || $_->{master_id} == $id } # Only my slaves.
$self->find_slave_hosts($dp, $dbh, $dsn, $methods);
foreach my $slave ( @slaves ) {
PTDEBUG && _d('Recursing from',
$dp->as_string($dsn), 'to', $dp->as_string($slave));
$self->recurse_to_slaves(
{ %$args, dsn => $slave, dbh => undef, parent => $dsn, slave_user => $slave_user, $slave_password => $slave_password }, $level + 1 );
}
}
}
# Finds slave hosts by trying different methods. The default preferred method
# is trying SHOW PROCESSLIST (processlist) and guessing which ones are slaves,
# and if that doesn't reveal anything, then try SHOW SLAVE STATUS (hosts).
# One exception is if the port is non-standard (3306), indicating that the port
# from SHOW SLAVE HOSTS may be important. Then only the hosts methods is used.
#
# Returns a list of DSN hashes. Optional extra keys in the DSN hash are
# master_id and server_id. Also, the 'source' key is either 'processlist' or
# 'hosts'.
#
# If a method is given, it becomes the preferred (first tried) method.
# Searching stops as soon as a method finds slaves.
sub find_slave_hosts {
my ( $self, $dsn_parser, $dbh, $dsn, $methods ) = @_;
PTDEBUG && _d('Looking for slaves on', $dsn_parser->as_string($dsn),
'using methods', @$methods);
my @slaves;
METHOD:
foreach my $method ( @$methods ) {
my $find_slaves = "_find_slaves_by_$method";
PTDEBUG && _d('Finding slaves with', $find_slaves);
@slaves = $self->$find_slaves($dsn_parser, $dbh, $dsn);
last METHOD if @slaves;
}
PTDEBUG && _d('Found', scalar(@slaves), 'slaves');
return @slaves;
}
sub _find_slaves_by_processlist {
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
my @connected_slaves = $self->get_connected_slaves($dbh);
my @slaves = $self->_process_slaves_list($dsn_parser, $dsn, \@connected_slaves);
return @slaves;
}
sub _process_slaves_list {
my ($self, $dsn_parser, $dsn, $connected_slaves) = @_;
my @slaves = map {
my $slave = $dsn_parser->parse("h=$_", $dsn);
$slave->{source} = 'processlist';
$slave;
}
grep { $_ }
map {
my ( $host ) = $_->{host} =~ m/^(.*):\d+$/;
if ( $host eq 'localhost' ) {
$host = '127.0.0.1'; # Replication never uses sockets.
}
if ($host =~ m/::/) {
$host = '['.$host.']';
}
$host;
} @$connected_slaves;
return @slaves;
}
# SHOW SLAVE HOSTS is significantly less reliable.
# Machines tend to share the host list around with every machine in the
# replication hierarchy, but they don't update each other when machines
# disconnect or change to use a different master or something. So there is
# lots of cruft in SHOW SLAVE HOSTS.
sub _find_slaves_by_hosts {
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
my @slaves;
my $sql = 'SHOW SLAVE HOSTS';
PTDEBUG && _d($dbh, $sql);
@slaves = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
# Convert SHOW SLAVE HOSTS into DSN hashes.
if ( @slaves ) {
PTDEBUG && _d('Found some SHOW SLAVE HOSTS info');
@slaves = map {
my %hash;
@hash{ map { lc $_ } keys %$_ } = values %$_;
my $spec = "h=$hash{host},P=$hash{port}"
. ( $hash{user} ? ",u=$hash{user}" : '')
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{master_id} = $hash{master_id};
$dsn->{source} = 'hosts';
$dsn;
} @slaves;
}
return @slaves;
}
# Returns PROCESSLIST entries of connected slaves, normalized to lowercase
# column names.
sub get_connected_slaves {
my ( $self, $dbh ) = @_;
# Check for the PROCESS privilege.
my $show = "SHOW GRANTS FOR ";
my $user = 'CURRENT_USER()';
my $sql = $show . $user;
PTDEBUG && _d($dbh, $sql);
my $proc;
eval {
$proc = grep {
m/ALL PRIVILEGES.*?\*\.\*|PROCESS/
} @{$dbh->selectcol_arrayref($sql)};
};
if ( $EVAL_ERROR ) {
if ( $EVAL_ERROR =~ m/no such grant defined for user/ ) {
# Try again without a host.
PTDEBUG && _d('Retrying SHOW GRANTS without host; error:',
$EVAL_ERROR);
($user) = split('@', $user);
$sql = $show . $user;
PTDEBUG && _d($sql);
eval {
$proc = grep {
m/ALL PRIVILEGES.*?\*\.\*|PROCESS/
} @{$dbh->selectcol_arrayref($sql)};
};
}
# The 2nd try above might have cleared $EVAL_ERROR.
# If not, die now.
die "Failed to $sql: $EVAL_ERROR" if $EVAL_ERROR;
}
if ( !$proc ) {
die "You do not have the PROCESS privilege";
}
$sql = 'SHOW FULL PROCESSLIST';
PTDEBUG && _d($dbh, $sql);
# It's probably a slave if it's doing a binlog dump.
grep { $_->{command} =~ m/Binlog Dump/i }
map { # Lowercase the column names
my %hash;
@hash{ map { lc $_ } keys %$_ } = values %$_;
\%hash;
}
@{$dbh->selectall_arrayref($sql, { Slice => {} })};
}
# Verifies that $master is really the master of $slave. This is not an exact
# science, but there is a decent chance of catching some obvious cases when it
# is not the master. If not the master, it dies; otherwise returns true.
sub is_master_of {
my ( $self, $master, $slave ) = @_;
my $master_status = $self->get_master_status($master)
or die "The server specified as a master is not a master";
my $slave_status = $self->get_slave_status($slave)
or die "The server specified as a slave is not a slave";
my @connected = $self->get_connected_slaves($master)
or die "The server specified as a master has no connected slaves";
my (undef, $port) = $master->selectrow_array("SHOW VARIABLES LIKE 'port'");
if ( $port != $slave_status->{master_port} ) {
die "The slave is connected to $slave_status->{master_port} "
. "but the master's port is $port";
}
if ( !grep { $slave_status->{master_user} eq $_->{user} } @connected ) {
die "I don't see any slave I/O thread connected with user "
. $slave_status->{master_user};
}
if ( ($slave_status->{slave_io_state} || '')
eq 'Waiting for master to send event' )
{
# The slave thinks its I/O thread is caught up to the master. Let's
# compare and make sure the master and slave are reasonably close to each
# other. Note that this is one of the few places where I check the I/O
# thread positions instead of the SQL thread positions!
# Master_Log_File/Read_Master_Log_Pos is the I/O thread's position on the
# master.
my ( $master_log_name, $master_log_num )
= $master_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
my ( $slave_log_name, $slave_log_num )
= $slave_status->{master_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
if ( $master_log_name ne $slave_log_name
|| abs($master_log_num - $slave_log_num) > 1 )
{
die "The slave thinks it is reading from "
. "$slave_status->{master_log_file}, but the "
. "master is writing to $master_status->{file}";
}
}
return 1;
}
# Figures out how to connect to the master, by examining SHOW SLAVE STATUS. But
# does NOT use the value from Master_User for the username, because typically we
# want to perform operations as the username that was specified (usually to the
# program's --user option, or in a DSN), rather than as the replication user,
# which is often restricted.
sub get_master_dsn {
my ( $self, $dbh, $dsn, $dsn_parser ) = @_;
my $master = $self->get_slave_status($dbh) or return undef;
my $spec = "h=$master->{master_host},P=$master->{master_port}";
return $dsn_parser->parse($spec, $dsn);
}
# Gets SHOW SLAVE STATUS, with column names all lowercased, as a hashref.
sub get_slave_status {
my ( $self, $dbh ) = @_;
if ( !$self->{not_a_slave}->{$dbh} ) {
my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS}
||= $dbh->prepare('SHOW SLAVE STATUS');
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
$sth->execute();
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
# If SHOW SLAVE STATUS returns more than one row it means that this slave is connected to more
# than one master using replication channels.
# If we have a channel name as a parameter, we need to select the correct row and return it.
# If we don't have a channel name as a parameter, there is no way to know what the correct master is so,
# return an error.
my $ss;
if ( $sss_rows && @$sss_rows ) {
if (scalar @$sss_rows > 1) {
if (!$self->{channel}) {
die 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
}
my $slave_use_channels;
for my $row (@$sss_rows) {
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
if ($row->{channel_name}) {
$slave_use_channels = 1;
}
if ($row->{channel_name} eq $self->{channel}) {
$ss = $row;
last;
}
}
if (!$ss && $slave_use_channels) {
die 'This server is using replication channels but "channel" was not specified on the command line';
}
} else {
if ($sss_rows->[0]->{channel_name} && $sss_rows->[0]->{channel_name} ne $self->{channel}) {
die 'This server is using replication channels but "channel" was not specified on the command line';
} else {
$ss = $sss_rows->[0];
}
}
if ( $ss && %$ss ) {
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
return $ss;
}
if (!$ss && $self->{channel}) {
die "Specified channel name is invalid";
}
}
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
$self->{not_a_slave}->{$dbh}++;
}
}
# Gets SHOW MASTER STATUS, with column names all lowercased, as a hashref.
sub get_master_status {
my ( $self, $dbh ) = @_;
if ( $self->{not_a_master}->{$dbh} ) {
PTDEBUG && _d('Server on dbh', $dbh, 'is not a master');
return;
}
my $sth = $self->{sths}->{$dbh}->{MASTER_STATUS}
||= $dbh->prepare('SHOW MASTER STATUS');
PTDEBUG && _d($dbh, 'SHOW MASTER STATUS');
$sth->execute();
my ($ms) = @{$sth->fetchall_arrayref({})};
PTDEBUG && _d(
$ms ? map { "$_=" . (defined $ms->{$_} ? $ms->{$_} : '') } keys %$ms
: '');
if ( !$ms || scalar keys %$ms < 2 ) {
PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a master');
$self->{not_a_master}->{$dbh}++;
}
return { map { lc($_) => $ms->{$_} } keys %$ms }; # lowercase the keys
}
# Sub: wait_for_master
# Execute MASTER_POS_WAIT() to make slave wait for its master.
#
# Parameters:
# %args - Arguments
#
# Required Arguments:
# * master_status - Hashref returned by <get_master_status()>
# * slave_dbh - dbh for slave host
#
# Optional Arguments:
# * timeout - Wait time in seconds (default 60)
#
# Returns:
# Hashref with result of waiting, like:
# (start code)
# {
# result => the result returned by MASTER_POS_WAIT: -1, undef, 0+
# waited => the number of seconds waited, might be zero
# }
# (end code)
sub wait_for_master {
my ( $self, %args ) = @_;
my @required_args = qw(master_status slave_dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($master_status, $slave_dbh) = @args{@required_args};
my $timeout = $args{timeout} || 60;
my $result;
my $waited;
if ( $master_status ) {
my $slave_status;
eval {
$slave_status = $self->get_slave_status($slave_dbh);
};
if ($EVAL_ERROR) {
return {
result => undef,
waited => 0,
error =>'Wait for master: this is a multi-master slave but "channel" was not specified on the command line',
};
}
my $server_version = VersionParser->new($slave_dbh);
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
PTDEBUG && _d($slave_dbh, $sql);
my $start = time;
($result) = $slave_dbh->selectrow_array($sql);
# If MASTER_POS_WAIT() returned NULL and we waited at least 1s
# and the time we waited is less than the timeout then this is
# a strong indication that the slave was stopped while we were
# waiting.
$waited = time - $start;
PTDEBUG && _d('Result of waiting:', $result);
PTDEBUG && _d("Waited", $waited, "seconds");
}
else {
PTDEBUG && _d('Not waiting: this server is not a master');
}
return {
result => $result,
waited => $waited,
};
}
# Executes STOP SLAVE.
sub stop_slave {
my ( $self, $dbh ) = @_;
my $sth = $self->{sths}->{$dbh}->{STOP_SLAVE}
||= $dbh->prepare('STOP SLAVE');
PTDEBUG && _d($dbh, $sth->{Statement});
$sth->execute();
}
# Executes START SLAVE, optionally with UNTIL.
sub start_slave {
my ( $self, $dbh, $pos ) = @_;
if ( $pos ) {
# Just like with CHANGE MASTER TO, you can't quote the position.
my $sql = "START SLAVE UNTIL MASTER_LOG_FILE='$pos->{file}', "
. "MASTER_LOG_POS=$pos->{position}";
PTDEBUG && _d($dbh, $sql);
$dbh->do($sql);
}
else {
my $sth = $self->{sths}->{$dbh}->{START_SLAVE}
||= $dbh->prepare('START SLAVE');
PTDEBUG && _d($dbh, $sth->{Statement});
$sth->execute();
}
}
# Waits for the slave to catch up to its master, using START SLAVE UNTIL. When
# complete, the slave is caught up to the master, and the slave process is
# stopped on both servers.
sub catchup_to_master {
my ( $self, $slave, $master, $timeout ) = @_;
$self->stop_slave($master);
$self->stop_slave($slave);
my $slave_status = $self->get_slave_status($slave);
my $slave_pos = $self->repl_posn($slave_status);
my $master_status = $self->get_master_status($master);
my $master_pos = $self->repl_posn($master_status);
PTDEBUG && _d('Master position:', $self->pos_to_string($master_pos),
'Slave position:', $self->pos_to_string($slave_pos));
my $result;
if ( $self->pos_cmp($slave_pos, $master_pos) < 0 ) {
PTDEBUG && _d('Waiting for slave to catch up to master');
$self->start_slave($slave, $master_pos);
# The slave may catch up instantly and stop, in which case
# MASTER_POS_WAIT will return NULL and $result->{result} will be undef.
# We must catch this; if it returns NULL, then we check that
# its position is as desired.
# TODO: what if master_pos_wait times out and $result == -1? retry?
$result = $self->wait_for_master(
master_status => $master_status,
slave_dbh => $slave,
timeout => $timeout,
master_status => $master_status
);
if ($result->{error}) {
die $result->{error};
}
if ( !defined $result->{result} ) {
$slave_status = $self->get_slave_status($slave);
if ( !$self->slave_is_running($slave_status) ) {
PTDEBUG && _d('Master position:',
$self->pos_to_string($master_pos),
'Slave position:', $self->pos_to_string($slave_pos));
$slave_pos = $self->repl_posn($slave_status);
if ( $self->pos_cmp($slave_pos, $master_pos) != 0 ) {
die "MASTER_POS_WAIT() returned NULL but slave has not "
. "caught up to master";
}
PTDEBUG && _d('Slave is caught up to master and stopped');
}
else {
die "Slave has not caught up to master and it is still running";
}
}
}
else {
PTDEBUG && _d("Slave is already caught up to master");
}
return $result;
}
# Makes one server catch up to the other in replication. When complete, both
# servers are stopped and at the same position.
sub catchup_to_same_pos {
my ( $self, $s1_dbh, $s2_dbh ) = @_;
$self->stop_slave($s1_dbh);
$self->stop_slave($s2_dbh);
my $s1_status = $self->get_slave_status($s1_dbh);
my $s2_status = $self->get_slave_status($s2_dbh);
my $s1_pos = $self->repl_posn($s1_status);
my $s2_pos = $self->repl_posn($s2_status);
if ( $self->pos_cmp($s1_pos, $s2_pos) < 0 ) {
$self->start_slave($s1_dbh, $s2_pos);
}
elsif ( $self->pos_cmp($s2_pos, $s1_pos) < 0 ) {
$self->start_slave($s2_dbh, $s1_pos);
}
# Re-fetch the replication statuses and positions.
$s1_status = $self->get_slave_status($s1_dbh);
$s2_status = $self->get_slave_status($s2_dbh);
$s1_pos = $self->repl_posn($s1_status);
$s2_pos = $self->repl_posn($s2_status);
# Verify that they are both stopped and are at the same position.
if ( $self->slave_is_running($s1_status)
|| $self->slave_is_running($s2_status)
|| $self->pos_cmp($s1_pos, $s2_pos) != 0)
{
die "The servers aren't both stopped at the same position";
}
}
# Returns true if the slave is running.
sub slave_is_running {
my ( $self, $slave_status ) = @_;
return ($slave_status->{slave_sql_running} || 'No') eq 'Yes';
}
# Returns true if the server's log_slave_updates option is enabled.
sub has_slave_updates {
my ( $self, $dbh ) = @_;
my $sql = q{SHOW VARIABLES LIKE 'log_slave_updates'};
PTDEBUG && _d($dbh, $sql);
my ($name, $value) = $dbh->selectrow_array($sql);
return $value && $value =~ m/^(1|ON)$/;
}
# Extracts the replication position out of either SHOW MASTER STATUS or SHOW
# SLAVE STATUS, and returns it as a hashref { file, position }
sub repl_posn {
my ( $self, $status ) = @_;
if ( exists $status->{file} && exists $status->{position} ) {
# It's the output of SHOW MASTER STATUS
return {
file => $status->{file},
position => $status->{position},
};
}
else {
return {
file => $status->{relay_master_log_file},
position => $status->{exec_master_log_pos},
};
}
}
# Gets the slave's lag. TODO: permit using a heartbeat table.
sub get_slave_lag {
my ( $self, $dbh ) = @_;
my $stat = $self->get_slave_status($dbh);
return unless $stat; # server is not a slave
return $stat->{seconds_behind_master};
}
# Compares two replication positions and returns -1, 0, or 1 just as the cmp
# operator does.
sub pos_cmp {
my ( $self, $a, $b ) = @_;
return $self->pos_to_string($a) cmp $self->pos_to_string($b);
}
# Sub: short_host
# Simplify a hostname as much as possible. For purposes of replication, a
# hostname is really just the combination of hostname and port, since
# replication always uses TCP connections (it does not work via sockets). If
# the port is the default 3306, it is omitted. As a convenience, this sub
# accepts either SHOW SLAVE STATUS or a DSN.
#
# Parameters:
# $dsn - DSN hashref
#
# Returns:
# Short hostname string
sub short_host {
my ( $self, $dsn ) = @_;
my ($host, $port);
if ( $dsn->{master_host} ) {
$host = $dsn->{master_host};
$port = $dsn->{master_port};
}
else {
$host = $dsn->{h};
$port = $dsn->{P};
}
return ($host || '[default]') . ( ($port || 3306) == 3306 ? '' : ":$port" );
}
# Sub: is_replication_thread
# Determine if a processlist item is a replication thread.
#
# Parameters:
# $query - Hashref of a processlist item
# %args - Arguments
#
# Arguments:
# type - Which kind of repl thread to match:
# all, binlog_dump (master), slave_io, or slave_sql
# (default: all)
# check_known_ids - Check known replication thread IDs (default: yes)
#
# Returns:
# True if the proclist item is the given type of replication thread.
sub is_replication_thread {
my ( $self, $query, %args ) = @_;
return unless $query;
my $type = lc($args{type} || 'all');
die "Invalid type: $type"
unless $type =~ m/^binlog_dump|slave_io|slave_sql|all$/i;
my $match = 0;
if ( $type =~ m/binlog_dump|all/i ) {
$match = 1
if ($query->{Command} || $query->{command} || '') eq "Binlog Dump";
}
if ( !$match ) {
# On a slave, there are two threads. Both have user="system user".
if ( ($query->{User} || $query->{user} || '') eq "system user" ) {
PTDEBUG && _d("Slave replication thread");
if ( $type ne 'all' ) {
# Match a particular slave thread.
my $state = $query->{State} || $query->{state} || '';
if ( $state =~ m/^init|end$/ ) {
# http://code.google.com/p/maatkit/issues/detail?id=1121
PTDEBUG && _d("Special state:", $state);
$match = 1;
}
else {
# These patterns are abbreviated because if the first few words
# match chances are very high it's the full slave thd state.
my ($slave_sql) = $state =~ m/
^(Waiting\sfor\sthe\snext\sevent
|Reading\sevent\sfrom\sthe\srelay\slog
|Has\sread\sall\srelay\slog;\swaiting
|Making\stemp\sfile
|Waiting\sfor\sslave\smutex\son\sexit)/xi;
# Type is either "slave_sql" or "slave_io". The second line
# implies that if this isn't the sql thread then it must be
# the io thread, so match is true if we were supposed to match
# the io thread.
$match = $type eq 'slave_sql' && $slave_sql ? 1
: $type eq 'slave_io' && !$slave_sql ? 1
: 0;
}
}
else {
# Type is "all" and it's not a master (binlog_dump) thread,
# else we wouldn't have gotten here. It's either of the 2
# slave threads and we don't care which.
$match = 1;
}
}
else {
PTDEBUG && _d('Not system user');
}
# MySQL loves to trick us. Sometimes a slave replication thread will
# temporarily morph into what looks like a regular user thread when
# really it's still the same slave repl thread. So here we save known
# repl thread IDs and check if a non-matching event is actually a
# known repl thread ID and if yes then we make it match.
if ( !defined $args{check_known_ids} || $args{check_known_ids} ) {
my $id = $query->{Id} || $query->{id};
if ( $match ) {
$self->{replication_thread}->{$id} = 1;
}
else {
if ( $self->{replication_thread}->{$id} ) {
PTDEBUG && _d("Thread ID is a known replication thread ID");
$match = 1;
}
}
}
}
PTDEBUG && _d('Matches', $type, 'replication thread:',
($match ? 'yes' : 'no'), '; match:', $match);
return $match;
}
# Sub: get_replication_filters
# Get any replication filters set on the host.
#
# Parameters:
# %args - Arguments
#
# Required Arguments:
# dbh - dbh, master or slave
#
# Returns:
# Hashref of any replication filters. If none are set, an empty hashref
# is returned.
sub get_replication_filters {
my ( $self, %args ) = @_;
my @required_args = qw(dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($dbh) = @args{@required_args};
my %filters = ();
my $status = $self->get_master_status($dbh);
if ( $status ) {
map { $filters{$_} = $status->{$_} }
grep { defined $status->{$_} && $status->{$_} ne '' }
qw(
binlog_do_db
binlog_ignore_db
);
}
$status = $self->get_slave_status($dbh);
if ( $status ) {
map { $filters{$_} = $status->{$_} }
grep { defined $status->{$_} && $status->{$_} ne '' }
qw(
replicate_do_db
replicate_ignore_db
replicate_do_table
replicate_ignore_table
replicate_wild_do_table
replicate_wild_ignore_table
);
my $sql = "SHOW VARIABLES LIKE 'slave_skip_errors'";
PTDEBUG && _d($dbh, $sql);
my $row = $dbh->selectrow_arrayref($sql);
# "OFF" in 5.0, "" in 5.1
$filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
}
return \%filters;
}
# Sub: pos_to_string
# Stringify a position in a way that's string-comparable.
#
# Parameters:
# $pos - Hashref with file and position
#
# Returns:
# String like "file/posNNNNN"
sub pos_to_string {
my ( $self, $pos ) = @_;
my $fmt = '%s/%020d';
return sprintf($fmt, @{$pos}{qw(file position)});
}
sub reset_known_replication_threads {
my ( $self ) = @_;
$self->{replication_thread} = {};
return;
}
sub get_cxn_from_dsn_table {
my ($self, %args) = @_;
my @required_args = qw(dsn_table_dsn make_cxn);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($dsn_table_dsn, $make_cxn) = @args{@required_args};
PTDEBUG && _d('DSN table DSN:', $dsn_table_dsn);
my $dp = $self->{DSNParser};
my $q = $self->{Quoter};
my $dsn = $dp->parse($dsn_table_dsn);
my $dsn_table;
if ( $dsn->{D} && $dsn->{t} ) {
$dsn_table = $q->quote($dsn->{D}, $dsn->{t});
}
elsif ( $dsn->{t} && $dsn->{t} =~ m/\./ ) {
$dsn_table = $q->quote($q->split_unquote($dsn->{t}));
}
else {
die "DSN table DSN does not specify a database (D) "
. "or a database-qualified table (t)";
}
my $done = 0;
my $dsn_tbl_cxn = $make_cxn->(dsn => $dsn);
my $dbh = $dsn_tbl_cxn->connect();
my $sql = "SELECT dsn FROM $dsn_table ORDER BY id";
PTDEBUG && _d($sql);
my @cxn;
use Data::Dumper;
DSN:
do {
@cxn = ();
my $dsn_strings = $dbh->selectcol_arrayref($sql);
if ( $dsn_strings ) {
foreach my $dsn_string ( @$dsn_strings ) {
PTDEBUG && _d('DSN from DSN table:', $dsn_string);
if ($args{wait_no_die}) {
my $lcxn;
eval {
$lcxn = $make_cxn->(dsn_string => $dsn_string);
};
if ( $EVAL_ERROR && ($dsn_tbl_cxn->lost_connection($EVAL_ERROR)
|| $EVAL_ERROR =~ m/Can't connect to MySQL server/)) {
PTDEBUG && _d("Server is not accessible, waiting when it is online again");
sleep(1);
goto DSN;
}
push @cxn, $lcxn;
} else {
push @cxn, $make_cxn->(dsn_string => $dsn_string);
}
}
}
$done = 1;
} until $done;
return \@cxn;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End MasterSlave package
# ###########################################################################