PT-139 Added replication channels suport to pt-table-sync

This commit is contained in:
Carlos Salguero
2017-05-12 11:55:52 -03:00
parent 7dcef2671c
commit 5744fdf583
12 changed files with 350 additions and 102 deletions

View File

@@ -430,21 +430,46 @@ sub get_master_dsn {
# Gets SHOW SLAVE STATUS, with column names all lowercased, as a hashref.
sub get_slave_status {
my ( $self, $dbh ) = @_;
if ( !$self->{not_a_slave}->{$dbh} ) {
my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS}
||= $dbh->prepare('SHOW SLAVE STATUS');
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
$sth->execute();
my ($ss) = @{$sth->fetchall_arrayref({})};
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
if ( $ss && %$ss ) {
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
return $ss;
# If SHOW SLAVE STATUS returns more than one row it means that this slave is connected to more
# than one master using replication channels.
# If we have a channel name as a parameter, we need to select the correct row and return it.
# If we don't have a channel name as a parameter, there is no way to know what the correct master is so,
# return an error.
my $ss;
if ( $sss_rows && @$sss_rows ) {
if (scalar @$sss_rows > 1) {
if (!$self->{channel}) {
warn 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
return undef;
}
for my $row (@$sss_rows) {
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
if ($row->{channel_name} eq $self->{channel}) {
$ss = $row;
last;
}
}
} else {
$ss = $sss_rows->[0];
}
if ( $ss && %$ss ) {
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
return $ss;
}
}
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
$self->{not_a_slave}->{$dbh}++;
}
}
}
# Gets SHOW MASTER STATUS, with column names all lowercased, as a hashref.
@@ -506,8 +531,9 @@ sub wait_for_master {
my $result;
my $waited;
if ( $master_status ) {
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', "
. "$master_status->{position}, $timeout)";
my $server_version = VersionParser->new($slave_dbh);
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
PTDEBUG && _d($slave_dbh, $sql);
my $start = time;
($result) = $slave_dbh->selectrow_array($sql);