mirror of
https://github.com/percona/percona-toolkit.git
synced 2026-05-02 01:01:09 +08:00
Merge pull request #215 from percona/PT-139
PT-139 Add support for replication channels (MySQL 5.7+) to pt-table-sync
This commit is contained in:
+27
-7
@@ -3881,16 +3881,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -3930,8 +3949,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
+27
-7
@@ -456,16 +456,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -505,8 +524,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
+27
-7
@@ -4162,16 +4162,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -4211,8 +4230,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
@@ -4484,16 +4484,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -4533,8 +4552,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
+27
-7
@@ -10780,16 +10780,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -10829,8 +10848,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
+27
-7
@@ -2570,16 +2570,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -2619,8 +2638,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
+27
-7
@@ -2981,16 +2981,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -3030,8 +3049,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
|
||||
+42
-7
@@ -5392,16 +5392,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -5441,8 +5460,9 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
@@ -8513,6 +8533,7 @@ sub wait {
|
||||
PTDEBUG && _d('Checking slave lag');
|
||||
for my $i ( 0..$#lagged_slaves ) {
|
||||
my $lag = $get_lag->($lagged_slaves[$i]->{cxn});
|
||||
die "Please restart the program using --channel=<channel name>" if !defined($lag);
|
||||
PTDEBUG && _d($lagged_slaves[$i]->{cxn}->name(),
|
||||
'slave lag:', $lag);
|
||||
if ( !defined $lag || $lag > $max_lag ) {
|
||||
@@ -9139,6 +9160,7 @@ use POSIX qw(signal_h);
|
||||
use List::Util qw(max);
|
||||
use Time::HiRes qw(sleep time);
|
||||
use Data::Dumper;
|
||||
use Carp;
|
||||
$Data::Dumper::Indent = 1;
|
||||
$Data::Dumper::Sortkeys = 1;
|
||||
$Data::Dumper::Quotekeys = 0;
|
||||
@@ -9482,6 +9504,7 @@ sub main {
|
||||
OptionParser => $o,
|
||||
DSNParser => $dp,
|
||||
Quoter => $q,
|
||||
channel => $o->get('channel')
|
||||
);
|
||||
|
||||
my $slaves = []; # all slaves (that we can find)
|
||||
@@ -12130,6 +12153,18 @@ group: Connection
|
||||
|
||||
Prompt for a password when connecting to MySQL.
|
||||
|
||||
=item --channel
|
||||
|
||||
type: string
|
||||
|
||||
Channel name used when connected to a server using replication channels.
|
||||
Suppose you have two masters, master_a at port 12345, master_b at port 1236 and
|
||||
a slave connected to both masters using channels chan_master_a and chan_master_b.
|
||||
If you want to run pt-table-sync to syncronize the slave against master_a, pt-table-sync
|
||||
won't be able to determine what's the correct master since SHOW SLAVE STATUS
|
||||
will return 2 rows. In this case, you can use --channel=chan_master_a to specify
|
||||
the channel name to use in the SHOW SLAVE STATUS command.
|
||||
|
||||
=item --[no]check-binlog-format
|
||||
|
||||
default: yes
|
||||
|
||||
+57
-8
@@ -6255,6 +6255,9 @@ sub lock_and_wait {
|
||||
slave_dbh => $dst->{dbh},
|
||||
timeout => $timeout,
|
||||
);
|
||||
if ($wait->{error}) {
|
||||
die $result->{error};
|
||||
}
|
||||
if ( defined $wait->{result} && $wait->{result} != -1 ) {
|
||||
return; # slave caught up
|
||||
}
|
||||
@@ -6961,16 +6964,35 @@ sub get_slave_status {
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub get_master_status {
|
||||
@@ -7010,8 +7032,18 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $slave_status = $self->get_slave_status($slave_dbh);
|
||||
if (!$slave_status) {
|
||||
return {
|
||||
result => undef,
|
||||
waited => 0,
|
||||
error =>'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line',
|
||||
};
|
||||
}
|
||||
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
@@ -7028,6 +7060,7 @@ sub wait_for_master {
|
||||
return {
|
||||
result => $result,
|
||||
waited => $waited,
|
||||
error => undef,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -7077,6 +7110,10 @@ sub catchup_to_master {
|
||||
timeout => $timeout,
|
||||
master_status => $master_status
|
||||
);
|
||||
if ($result->{error}) {
|
||||
die $result->{error};
|
||||
}
|
||||
|
||||
if ( !defined $result->{result} ) {
|
||||
$slave_status = $self->get_slave_status($slave);
|
||||
if ( !$self->slave_is_running($slave_status) ) {
|
||||
@@ -9860,7 +9897,7 @@ sub main {
|
||||
# Do the work.
|
||||
# ########################################################################
|
||||
my $tp = new TableParser( Quoter => $q );
|
||||
my $ms = new MasterSlave(OptionParser=>$o,DSNParser=>$dp,Quoter=>$q);
|
||||
my $ms = new MasterSlave(OptionParser=>$o,DSNParser=>$dp,Quoter=>$q, channel=>$o->get('channel'));
|
||||
my $rt = new Retry();
|
||||
my $chunker = new TableChunker( Quoter => $q, TableParser => $tp );
|
||||
my $nibbler = new TableNibbler( Quoter => $q, TableParser => $tp );
|
||||
@@ -12031,6 +12068,18 @@ For most non-trivial data sizes, you want to leave this option enabled.
|
||||
|
||||
This option is disabled when L<"--bidirectional"> is used.
|
||||
|
||||
=item --channel
|
||||
|
||||
type: string
|
||||
|
||||
Channel name used when connected to a server using replication channels.
|
||||
Suppose you have two masters, master_a at port 12345, master_b at port 1236 and
|
||||
a slave connected to both masters using channels chan_master_a and chan_master_b.
|
||||
If you want to run pt-table-sync to syncronize the slave against master_a, pt-table-sync
|
||||
won't be able to determine what's the correct master since SHOW SLAVE STATUS
|
||||
will return 2 rows. In this case, you can use --channel=chan_master_a to specify
|
||||
the channel name to use in the SHOW SLAVE STATUS command.
|
||||
|
||||
=item --charset
|
||||
|
||||
short form: -A; type: string
|
||||
|
||||
+50
-9
@@ -105,8 +105,7 @@ sub get_slaves {
|
||||
},
|
||||
}
|
||||
);
|
||||
}
|
||||
elsif ( $methods->[0] =~ m/^dsn=/i ) {
|
||||
} elsif ( $methods->[0] =~ m/^dsn=/i ) {
|
||||
(my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i;
|
||||
$slaves = $self->get_cxn_from_dsn_table(
|
||||
%args,
|
||||
@@ -430,21 +429,51 @@ sub get_master_dsn {
|
||||
# Gets SHOW SLAVE STATUS, with column names all lowercased, as a hashref.
|
||||
sub get_slave_status {
|
||||
my ( $self, $dbh ) = @_;
|
||||
|
||||
if ( !$self->{not_a_slave}->{$dbh} ) {
|
||||
my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS}
|
||||
||= $dbh->prepare('SHOW SLAVE STATUS');
|
||||
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
|
||||
$sth->execute();
|
||||
my ($ss) = @{$sth->fetchall_arrayref({})};
|
||||
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
# If SHOW SLAVE STATUS returns more than one row it means that this slave is connected to more
|
||||
# than one master using replication channels.
|
||||
# If we have a channel name as a parameter, we need to select the correct row and return it.
|
||||
# If we don't have a channel name as a parameter, there is no way to know what the correct master is so,
|
||||
# return an error.
|
||||
my $ss;
|
||||
if ( $sss_rows && @$sss_rows ) {
|
||||
if (scalar @$sss_rows > 1) {
|
||||
if (!$self->{channel}) {
|
||||
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
}
|
||||
for my $row (@$sss_rows) {
|
||||
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
||||
if ($row->{channel_name} eq $self->{channel}) {
|
||||
$ss = $row;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if ($sss_rows->[0]->{channel_name} && $sss_rows->[0]->{channel_name} ne $self->{channel}) {
|
||||
warn 'This server is using replication channels but "channel" was not specified on the command line';
|
||||
return undef;
|
||||
} else {
|
||||
$ss = $sss_rows->[0];
|
||||
}
|
||||
}
|
||||
|
||||
if ( $ss && %$ss ) {
|
||||
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
||||
return $ss;
|
||||
}
|
||||
}
|
||||
|
||||
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
|
||||
$self->{not_a_slave}->{$dbh}++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Gets SHOW MASTER STATUS, with column names all lowercased, as a hashref.
|
||||
@@ -506,8 +535,17 @@ sub wait_for_master {
|
||||
my $result;
|
||||
my $waited;
|
||||
if ( $master_status ) {
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
|
||||
. "$master_status->{position}, $timeout)";
|
||||
my $slave_status = $self->get_slave_status($slave_dbh);
|
||||
if (!$slave_status) {
|
||||
return {
|
||||
result => undef,
|
||||
waited => 0,
|
||||
error =>'Wait for master: this is a multi-master slave but "channel" was not specified on the command line',
|
||||
};
|
||||
}
|
||||
my $server_version = VersionParser->new($slave_dbh);
|
||||
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
||||
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
|
||||
PTDEBUG && _d($slave_dbh, $sql);
|
||||
my $start = time;
|
||||
($result) = $slave_dbh->selectrow_array($sql);
|
||||
@@ -588,6 +626,9 @@ sub catchup_to_master {
|
||||
timeout => $timeout,
|
||||
master_status => $master_status
|
||||
);
|
||||
if ($result->{error}) {
|
||||
die $result->{error};
|
||||
}
|
||||
if ( !defined $result->{result} ) {
|
||||
$slave_status = $self->get_slave_status($slave);
|
||||
if ( !$self->slave_is_running($slave_status) ) {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
STOP SLAVE FOR CHANNEL '';
|
||||
SET GLOBAL master_info_repository = 'TABLE';
|
||||
SET @@GLOBAL.relay_log_info_repository = 'TABLE';
|
||||
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
|
||||
|
||||
@@ -750,6 +750,106 @@ like(
|
||||
"--recursion-method none,none"
|
||||
);
|
||||
|
||||
SKIP: {
|
||||
|
||||
skip "Only test on mysql 5.7" if ( $sandbox_version lt '5.7' );
|
||||
|
||||
my ($master1_dbh, $master1_dsn) = $sb->start_sandbox(
|
||||
server => 'chan_master1',
|
||||
type => 'master',
|
||||
);
|
||||
my ($master2_dbh, $master2_dsn) = $sb->start_sandbox(
|
||||
server => 'chan_master2',
|
||||
type => 'master',
|
||||
);
|
||||
my ($slave1_dbh, $slave1_dsn) = $sb->start_sandbox(
|
||||
server => 'chan_slave1',
|
||||
type => 'master',
|
||||
);
|
||||
my $slave1_port = $sb->port_for('chan_slave1');
|
||||
|
||||
$sb->load_file('chan_master1', "sandbox/gtid_on.sql", undef, no_wait => 1);
|
||||
$sb->load_file('chan_master2', "sandbox/gtid_on.sql", undef, no_wait => 1);
|
||||
$sb->load_file('chan_slave1', "sandbox/slave_channels.sql", undef, no_wait => 1);
|
||||
|
||||
my $chan_slaves = $ms->get_slaves(
|
||||
dbh => $master1_dbh,
|
||||
dsn => $master1_dsn,
|
||||
make_cxn => sub {
|
||||
my $cxn = new Cxn(
|
||||
@_,
|
||||
DSNParser => $dp,
|
||||
OptionParser => $o,
|
||||
);
|
||||
$cxn->connect();
|
||||
return $cxn;
|
||||
},
|
||||
);
|
||||
|
||||
our $message;
|
||||
local $SIG{__WARN__} = sub {
|
||||
$message = shift;
|
||||
};
|
||||
my $css = $ms->get_slave_status($slave1_dbh);
|
||||
local $SIG{__WARN__} = undef;
|
||||
is (
|
||||
$css,
|
||||
undef,
|
||||
'Cannot determine slave in a multi source config without --channel param'
|
||||
);
|
||||
like (
|
||||
$message,
|
||||
qr/This server returned more than one row for SHOW SLAVE STATUS/,
|
||||
'Got warning message if we cannot determine slave in a multi source config without --channel param',
|
||||
);
|
||||
|
||||
my $wfm = $ms->wait_for_master(
|
||||
master_status => $ms->get_master_status($dbh),
|
||||
slave_dbh => $slave1_dbh,
|
||||
timeout => 1,
|
||||
);
|
||||
like(
|
||||
$wfm->{error},
|
||||
qr/"channel" was not specified on the command line/,
|
||||
'Wait for master returned error',
|
||||
);
|
||||
|
||||
# After stopping one of the replication channels, show slave status returns only one slave
|
||||
# but it has a channel name and we didn't specified a channels name in the command line.
|
||||
# It should return undef
|
||||
$slave1_dbh->do("STOP SLAVE for channel 'masterchan2'");
|
||||
|
||||
$css = $ms->get_slave_status($slave1_dbh);
|
||||
is (
|
||||
$css,
|
||||
undef,
|
||||
'Cannot determine slave in a multi source config without --channel param (only one server)'
|
||||
);
|
||||
|
||||
$slave1_dbh->do("START SLAVE for channel 'masterchan2'");
|
||||
|
||||
# Now try specifying a channel name
|
||||
$ms->{channel} = 'masterchan1';
|
||||
$css = $ms->get_slave_status($slave1_dbh);
|
||||
is (
|
||||
$css->{channel_name},
|
||||
'masterchan1',
|
||||
'Returned the correct slave',
|
||||
);
|
||||
|
||||
$wfm = $ms->wait_for_master(
|
||||
master_status => $ms->get_master_status($dbh),
|
||||
slave_dbh => $slave1_dbh,
|
||||
timeout => 1,
|
||||
);
|
||||
is(
|
||||
$wfm->{error},
|
||||
undef,
|
||||
'Wait for master returned no error',
|
||||
);
|
||||
|
||||
$sb->stop_sandbox(qw(chan_master1 chan_master2 chan_slave1));
|
||||
}
|
||||
# #############################################################################
|
||||
# Done.
|
||||
# #############################################################################
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
#!/usr/bin/env perl
|
||||
|
||||
BEGIN {
|
||||
die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n"
|
||||
unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH};
|
||||
unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib";
|
||||
};
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use Test::More;
|
||||
|
||||
use PerconaTest;
|
||||
use Sandbox;
|
||||
require "$trunk/bin/pt-table-sync";
|
||||
|
||||
my $dp = new DSNParser(opts=>$dsn_opts);
|
||||
my $sb = new Sandbox(basedir => '/tmp', DSNParser => $dp);
|
||||
my $master_dbh = $sb->get_dbh_for('master');
|
||||
my $slave_dbh = $sb->get_dbh_for('slave1');
|
||||
|
||||
if ( !$master_dbh ) {
|
||||
plan skip_all => 'Cannot connect to sandbox master';
|
||||
}
|
||||
elsif ( !$slave_dbh ) {
|
||||
plan skip_all => 'Cannot connect to sandbox slave';
|
||||
} elsif ($sandbox_version lt '5.7') {
|
||||
plan skip_all => 'Only on MySQL 5.7+';
|
||||
} else {
|
||||
plan tests => 2;
|
||||
}
|
||||
|
||||
my ($master1_dbh, $master1_dsn) = $sb->start_sandbox(
|
||||
server => 'chan_master1',
|
||||
type => 'master',
|
||||
);
|
||||
my ($master2_dbh, $master2_dsn) = $sb->start_sandbox(
|
||||
server => 'chan_master2',
|
||||
type => 'master',
|
||||
);
|
||||
my ($slave1_dbh, $slave1_dsn) = $sb->start_sandbox(
|
||||
server => 'chan_slave1',
|
||||
type => 'master',
|
||||
);
|
||||
my $slave1_port = $sb->port_for('chan_slave1');
|
||||
|
||||
$sb->load_file('chan_master1', "sandbox/gtid_on.sql", undef, no_wait => 1);
|
||||
$sb->load_file('chan_master2', "sandbox/gtid_on.sql", undef, no_wait => 1);
|
||||
$sb->load_file('chan_slave1', "sandbox/slave_channels.sql", undef, no_wait => 1);
|
||||
|
||||
my @args = qw(--execute --no-foreign-key-checks --verbose --databases=sakila --tables=actor --sync-to-master --channel=masterchan1);
|
||||
my $exit_status;
|
||||
|
||||
my $output = output(
|
||||
sub { $exit_status = pt_table_sync::main(@args, $slave1_dsn) },
|
||||
stderr => 1,
|
||||
);
|
||||
|
||||
like (
|
||||
$output,
|
||||
qr/sakila.actor/,
|
||||
'Synced actor table'
|
||||
);
|
||||
|
||||
$sb->stop_sandbox(qw(chan_master1 chan_master2 chan_slave1));
|
||||
|
||||
|
||||
# #############################################################################
|
||||
# Done.
|
||||
# #############################################################################
|
||||
$sb->wipe_clean($master_dbh);
|
||||
ok($sb->ok(), "Sandbox servers") or BAIL_OUT(__FILE__ . " broke the sandbox");
|
||||
exit;
|
||||
Reference in New Issue
Block a user