PT-2340 - Support MySQL 8.4

- Fixed staff I broke for 8.0
This commit is contained in:
Sveta Smirnova
2024-08-22 14:29:53 +03:00
parent a38bee6d60
commit b332537481
31 changed files with 222 additions and 160 deletions

View File

@@ -3922,8 +3922,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -3939,7 +3941,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -4018,9 +4020,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')
@@ -4063,7 +4065,7 @@ sub get_replica_status {
my $server_version = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $server_version < '8.1' || $server_version->flavor() =~ m/maria/ ) {
$replica_name = 'replica';
$replica_name = 'slave';
}
if ( !$self->{not_a_replica}->{$dbh} ) {
@@ -4206,7 +4208,7 @@ sub stop_replica {
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'replica';
$replica_name = 'slave';
}
my $sth = $self->{sths}->{$dbh}->{STOP_REPLICA}
||= $dbh->prepare("STOP ${replica_name}");

View File

@@ -383,8 +383,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -400,7 +402,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -479,9 +481,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')
@@ -524,7 +526,7 @@ sub get_replica_status {
my $server_version = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $server_version < '8.1' || $server_version->flavor() =~ m/maria/ ) {
$replica_name = 'replica';
$replica_name = 'slave';
}
if ( !$self->{not_a_replica}->{$dbh} ) {
@@ -667,7 +669,7 @@ sub stop_replica {
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'replica';
$replica_name = 'slave';
}
my $sth = $self->{sths}->{$dbh}->{STOP_REPLICA}
||= $dbh->prepare("STOP ${replica_name}");
@@ -6105,6 +6107,9 @@ sub main {
@dbhs = ();
@sths = ();
my $relay_source_log_file_col = 'relay_source_log_file';
my $exec_source_log_pos_col = 'exec_source_log_pos';
# ########################################################################
# Get configuration information.
# ########################################################################
@@ -6311,6 +6316,16 @@ sub main {
die "Heartbeat table $db_tbl does not have a ts column"
unless $tbl_struct->{is_col}->{ts};
if ( $tbl_struct->{is_col}->{exec_master_log_file}
&& !$tbl_struct->{is_col}->{exec_source_log_file} ) {
$exec_source_log_pos_col = 'exec_master_log_file';
}
if ( $tbl_struct->{is_col}->{exec_master_log_pos}
&& !$tbl_struct->{is_col}->{exec_source_log_pos} ) {
$exec_source_log_pos_col = 'exec_master_log_pos';
}
my $hires_ts = $tbl_struct->{type_for}->{ts} =~ m/char/i ? 1 : 0;
PTDEBUG && _d("Hi-res ts:", ($hires_ts ? 'yes' : 'no'));
@@ -6429,7 +6444,7 @@ sub main {
PTDEBUG && _d("Master status columns:", join(', ', @master_status_cols));
my @slave_status_cols = grep { $tbl_struct->{is_col}->{$_} }
("relay_${source_name}_log_file", "exec_${source_name}_log_pos");
($relay_source_log_file_col, $exec_source_log_pos_col);
PTDEBUG && _d("Slave status columns:", join(', ', @slave_status_cols));
# Just a shortcut so I don't have to check both arrays when creating
@@ -6486,6 +6501,7 @@ sub main {
}
else {
$sql = 'SHOW SLAVE STATUS';
s/source/master/ for @slave_status_cols;
}
PTDEBUG && _d($dbh, $sql);
my $row = $dbh->selectrow_hashref($sql);

View File

@@ -4199,8 +4199,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -4216,7 +4218,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -4295,9 +4297,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')
@@ -4340,7 +4342,7 @@ sub get_replica_status {
my $server_version = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $server_version < '8.1' || $server_version->flavor() =~ m/maria/ ) {
$replica_name = 'replica';
$replica_name = 'slave';
}
if ( !$self->{not_a_replica}->{$dbh} ) {
@@ -4483,7 +4485,7 @@ sub stop_replica {
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'replica';
$replica_name = 'slave';
}
my $sth = $self->{sths}->{$dbh}->{STOP_REPLICA}
||= $dbh->prepare("STOP ${replica_name}");

View File

@@ -981,7 +981,7 @@ collect_mysql_show_replica_hosts () {
local replicas='replicas'
if [ "$version" '<' "8.1" ]; then
replicas='slave hosts'
replicas='slave hosts';
fi
$CMD_MYSQL $EXT_ARGV -ssE -e "SHOW ${replicas}" 2>/dev/null
@@ -995,8 +995,8 @@ collect_source_logs_status () {
local source_log='binary'
local source_status='binary log'
if [ "$version" '<' "8.1" ]; then
source_log = 'master'
source_status = 'master'
source_log='master';
source_status='master';
fi
$CMD_MYSQL $EXT_ARGV -ss -e "SHOW ${source_log} LOGS" > "$source_logs_file" 2>/dev/null
$CMD_MYSQL $EXT_ARGV -ss -e "SHOW ${source_status} STATUS" > "$source_status_file" 2>/dev/null

View File

@@ -4487,8 +4487,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -4504,7 +4506,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -4583,9 +4585,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')

View File

@@ -10793,8 +10793,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -10810,7 +10812,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -10889,9 +10891,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')

View File

@@ -2524,8 +2524,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -2541,7 +2543,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -2620,9 +2622,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')

View File

@@ -2935,8 +2935,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -2952,7 +2954,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -3031,9 +3033,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')
@@ -5518,7 +5520,7 @@ sub watch_server {
# Get master_uuid from SHOW REPLICA STATUS if a UUID is not specified
# with --source-uuid.
my $gtid_uuid = $o->get("${source_name}-uuid");
my $gtid_uuid = $o->get("source-uuid");
if ( !$gtid_uuid ) {
$gtid_uuid = $stat->{"${source_name}_uuid"};
die "No ${source_name}_uuid" unless $gtid_uuid; # shouldn't happen

View File

@@ -5442,8 +5442,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -5459,7 +5461,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -5538,9 +5540,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')
@@ -6499,11 +6501,11 @@ sub _make_xor_slices {
sub find_replication_differences {
my ($self, %args) = @_;
my @required_args = qw(dbh repl_table);
my @required_args = qw(dbh repl_table source_crc_name source_cnt_name);
foreach my $arg( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($dbh, $repl_table) = @args{@required_args};
my ($dbh, $repl_table, $source_crc_name, $source_cnt_name) = @args{@required_args};
my $tries = $self->{'OptionParser'}->get('replicate-check-retries') || 1;
@@ -6512,13 +6514,13 @@ sub find_replication_differences {
my $sql
= "SELECT CONCAT(db, '.', tbl) AS `table`, "
. "chunk, chunk_index, lower_boundary, upper_boundary, "
. "COALESCE(this_cnt-source_cnt, 0) AS cnt_diff, "
. "COALESCE(this_cnt-${source_cnt_name}, 0) AS cnt_diff, "
. "COALESCE("
. "this_crc <> source_crc OR ISNULL(source_crc) <> ISNULL(this_crc), 0"
. ") AS crc_diff, this_cnt, source_cnt, this_crc, source_crc "
. "this_crc <> ${source_crc_name} OR ISNULL(${source_crc_name}) <> ISNULL(this_crc), 0"
. ") AS crc_diff, this_cnt, ${source_cnt_name}, this_crc, ${source_crc_name} "
. "FROM $repl_table "
. "WHERE (source_cnt <> this_cnt OR source_crc <> this_crc "
. "OR ISNULL(source_crc) <> ISNULL(this_crc)) "
. "WHERE (${source_cnt_name} <> this_cnt OR ${source_crc_name} <> this_crc "
. "OR ISNULL(${source_crc_name}) <> ISNULL(this_crc)) "
. ($args{where} ? " AND ($args{where})" : "");
PTDEBUG && _d($sql);
$diffs = $dbh->selectall_arrayref($sql, { Slice => {} });
@@ -10558,6 +10560,9 @@ sub main {
my $update_sth; # update source_cnt and source_cnt in repl table
my $delete_sth; # delete checksums for one db.tbl from repl table
my ($source_crc_name, $source_cnt_name); # do we use legacy checksum table definition?
if ( $o->get('truncate-replicate-table') ) {
eval {
$source_dbh->do("TRUNCATE TABLE $repl_table");
@@ -10829,45 +10834,6 @@ sub main {
}
}
# #####################################################################
# Possibly check replication replicas and exit.
# #####################################################################
if ( $o->get('replicate-check') && $o->get('replicate-check-only') ) {
PTDEBUG && _d('Will --replicate-check and exit');
# --plugin hook
if ( $plugin && $plugin->can('before_replicate_check') ) {
$plugin->before_replicate_check();
}
foreach my $replica ( @$replicas ) {
my $diffs = $rc->find_replication_differences(
dbh => $replica->dbh(),
repl_table => $repl_table,
);
PTDEBUG && _d(scalar @$diffs, 'checksum diffs on',
$replica->name());
$diffs = filter_tables_replicate_check_only($diffs, $o);
if ( @$diffs ) {
$exit_status |= $PTC_EXIT_STATUS{TABLE_DIFF};
if ( $o->get('quiet') < 2 ) {
print_checksum_diffs(
cxn => $replica,
diffs => $diffs,
);
}
}
}
# --plugin hook
if ( $plugin && $plugin->can('after_replicate_check') ) {
$plugin->after_replicate_check();
}
PTDEBUG && _d('Exit status', $exit_status, 'oktorun', $oktorun);
return $exit_status;
}
# #####################################################################
# Check for replication filters.
# #####################################################################
@@ -10904,10 +10870,10 @@ sub main {
# Check that the replication table exists, or possibly create it.
# #####################################################################
eval {
check_repl_table(
($source_crc_name, $source_cnt_name) = check_repl_table(
dbh => $source_dbh,
repl_table => $repl_table,
replicas => $replicas,
replicas => $replicas,
have_time => $have_time,
OptionParser => $o,
TableParser => $tp,
@@ -10918,6 +10884,47 @@ sub main {
die ts($EVAL_ERROR);
}
# #####################################################################
# Possibly check replication replicas and exit.
# #####################################################################
if ( $o->get('replicate-check') && $o->get('replicate-check-only') ) {
PTDEBUG && _d('Will --replicate-check and exit');
# --plugin hook
if ( $plugin && $plugin->can('before_replicate_check') ) {
$plugin->before_replicate_check();
}
foreach my $replica ( @$replicas ) {
my $diffs = $rc->find_replication_differences(
dbh => $replica->dbh(),
repl_table => $repl_table,
source_crc_name => $source_crc_name,
source_cnt_name => $source_cnt_name,
);
PTDEBUG && _d(scalar @$diffs, 'checksum diffs on',
$replica->name());
$diffs = filter_tables_replicate_check_only($diffs, $o);
if ( @$diffs ) {
$exit_status |= $PTC_EXIT_STATUS{TABLE_DIFF};
if ( $o->get('quiet') < 2 ) {
print_checksum_diffs(
cxn => $replica,
diffs => $diffs,
);
}
}
}
# --plugin hook
if ( $plugin && $plugin->can('after_replicate_check') ) {
$plugin->after_replicate_check();
}
PTDEBUG && _d('Exit status', $exit_status, 'oktorun', $oktorun);
return $exit_status;
}
# #####################################################################
# Make a ReplicaLagWaiter to help wait for replicas after each chunk.
# #####################################################################
@@ -11037,7 +11044,7 @@ sub main {
"SELECT this_crc, this_cnt FROM $repl_table "
. "WHERE db = ? AND tbl = ? AND chunk = ?");
$update_sth = $source_dbh->prepare(
"UPDATE $repl_table SET chunk_time = ?, ${source_name}_crc = ?, ${source_name}_cnt = ? "
"UPDATE $repl_table SET chunk_time = ?, ${source_crc_name} = ?, ${source_cnt_name} = ? "
. "WHERE db = ? AND tbl = ? AND chunk = ?");
$delete_sth = $source_dbh->prepare(
"DELETE FROM $repl_table WHERE db = ? AND tbl = ?");
@@ -11074,8 +11081,9 @@ sub main {
my $last_chunk;
if ( $o->get('resume') ) {
$last_chunk = last_chunk(
dbh => $source_dbh,
repl_table => $repl_table,
dbh => $source_dbh,
repl_table => $repl_table,
source_cnt_name => $source_cnt_name,
);
}
@@ -11567,14 +11575,14 @@ sub main {
wait_for_replicas(source_dbh => $args{Cxn}->dbh(), source_replica => $ms, replicas => $replicas);
}
wait_for_last_checksum(
tbl => $tbl,
repl_table => $repl_table,
replicas => $replicas,
max_chunk => $max_chunk,
check_pr => $check_pr,
have_time => $have_time,
OptionParser => $o,
source_name => $source_name,
tbl => $tbl,
repl_table => $repl_table,
replicas => $replicas,
max_chunk => $max_chunk,
check_pr => $check_pr,
have_time => $have_time,
OptionParser => $o,
source_crc_name => $source_crc_name,
);
# Check each replica for checksum diffs.
@@ -11585,6 +11593,8 @@ sub main {
dbh => $replica->dbh(),
repl_table => $repl_table,
where => "db='$tbl->{db}' AND tbl='$tbl->{tbl}'",
source_crc_name => $source_crc_name,
source_cnt_name => $source_cnt_name,
);
PTDEBUG && _d(scalar @$diffs, 'checksum diffs on',
$replica->name());
@@ -12328,9 +12338,24 @@ sub check_repl_table {
}
}
my $create_table = $tp->get_create_table( $dbh, $db, $tbl );
my $source_crc_name = 'source_crc';
my $source_cnt_name = 'source_cnt';
if ( $create_table =~ /master_crc/si
|| $create_table =~ /master_cnt/si )
{
warn "The current checksum table uses deprecated column names. Support for these names "
. "will be occasionally removed. Please update the table definition with command:\n"
. "ALTER TABLE `$db`.`$tbl` RENAME COLUMN master_crc TO source_crc, "
. "RENAME COLUMN master_cnt TO source_cnt;\n";
$source_crc_name = 'master_crc';
$source_cnt_name = 'master_cnt';
}
if ( $o->get('binary-index') ) {
PTDEBUG && _d('--binary-index : checking if replicate table has binary type columns');
my $create_table = $tp->get_create_table( $dbh, $db, $tbl );
if ( $create_table !~ /lower_boundary`?\s+BLOB/si
|| $create_table !~ /upper_boundary`?\s+BLOB/si )
{
@@ -12340,7 +12365,7 @@ sub check_repl_table {
}
}
return; # success, repl table is ready to go
return ($source_crc_name, $source_cnt_name); # success, repl table is ready to go
}
# Check that db.tbl exists on all replicas and has the checksum cols,
@@ -12578,16 +12603,16 @@ sub explain_statement {
sub last_chunk {
my (%args) = @_;
my @required_args = qw(dbh repl_table);
my @required_args = qw(dbh repl_table source_cnt_name);
my $source_name = $args{source_name} ? $args{source_name} : 'source';
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($dbh, $repl_table, $q) = @args{@required_args};
my ($dbh, $repl_table, $source_cnt_name) = @args{@required_args};
PTDEBUG && _d('Getting last chunk for --resume');
my $sql = "SELECT * FROM $repl_table FORCE INDEX (ts_db_tbl) "
. "WHERE ${source_name}_cnt IS NOT NULL "
. "WHERE ${source_cnt_name} IS NOT NULL "
. "ORDER BY ts DESC, db DESC, tbl DESC LIMIT 1";
PTDEBUG && _d($sql);
my $sth = $dbh->prepare($sql);
@@ -12660,13 +12685,12 @@ sub wait_for_replicas {
sub wait_for_last_checksum {
my (%args) = @_;
my @required_args = qw(tbl repl_table replicas max_chunk have_time OptionParser);
my @required_args = qw(tbl repl_table replicas max_chunk have_time OptionParser source_crc_name);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my ($tbl, $repl_table, $replicas, $max_chunk, $have_time, $o) = @args{@required_args};
my ($tbl, $repl_table, $replicas, $max_chunk, $have_time, $o, $source_crc_name) = @args{@required_args};
my $check_pr = $args{check_pr};
my $source_name = $args{source_name} ? $args{source_name} : 'source';
# Requiring "AND source_crc IS NOT NULL" avoids a race condition
# when the system is fast but replication is slow. In such cases,
@@ -12674,7 +12698,7 @@ sub wait_for_last_checksum {
# replicates; this causes a false-positive diff.
my $sql = "SELECT MAX(chunk) FROM $repl_table "
. "WHERE db='$tbl->{db}' AND tbl='$tbl->{tbl}' "
. "AND ${source_name}_crc IS NOT NULL";
. "AND ${source_crc_name} IS NOT NULL";
PTDEBUG && _d($sql);
my $sleep_time = 0;

View File

@@ -6953,8 +6953,10 @@ sub _find_replicas_by_hosts {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
$source_name='master';
}
PTDEBUG && _d($dbh, $sql);
@@ -6970,7 +6972,7 @@ sub _find_replicas_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source_id} = $hash{"${source_name}_id"};
$dsn->{source} = 'hosts';
$dsn;
} @replicas;
@@ -7049,9 +7051,9 @@ sub is_source_of {
. "but the source's port is $port";
}
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
. $replica_status->{"${source_name}_user"};
}
if ( ($replica_status->{replica_io_state} || '')
@@ -10349,7 +10351,7 @@ sub main {
# We should not die if replica connected via tunnel or port redirection
if ( $EVAL_ERROR ) {
$EVAL_ERROR =~ m/The ${replica_name} is connected to (\d+) but the ${source_name}'s port is \d+/;
$EVAL_ERROR =~ m/The ${replica_name} is connected to (\d+) but the source's port is \d+/;
if ( !$1 || $1 != $dsns[0]->{P} ) {
die $EVAL_ERROR;
}