PT-2340 - Support MySQL 8.4

- Updated modules and tests for pt-query-digest, pt-show-grants, pt-slave-delay, pt-slave-find, pt-slave-restart, pt-stalk, pt-summary, pt-table-checksum
This commit is contained in:
Sveta Smirnova
2024-07-30 18:35:36 +03:00
parent 76c1202cb5
commit d02355f89b
94 changed files with 2930 additions and 2306 deletions

View File

@@ -943,7 +943,7 @@ sub get_cxn_params {
$dsn = 'DBI:mysql:' . ( $info->{D} || '' ) . ';'
. join(';', map { "$opts{$_}->{dsn}=$info->{$_}" }
grep { defined $info->{$_} }
qw(F h P S A))
qw(F h P S A s))
. ';mysql_read_default_group=client'
. ($info->{L} ? ';mysql_local_infile=1' : '');
}
@@ -3812,7 +3812,7 @@ use constant {
COM_BINLOG_DUMP => '12',
COM_TABLE_DUMP => '13',
COM_CONNECT_OUT => '14',
COM_REGISTER_SLAVE => '15',
COM_REGISTER_REPLICA => '15',
COM_STMT_PREPARE => '16',
COM_STMT_EXECUTE => '17',
COM_STMT_SEND_LONG_DATA => '18',
@@ -3846,7 +3846,7 @@ my %com_for = (
'12' => 'COM_BINLOG_DUMP',
'13' => 'COM_TABLE_DUMP',
'14' => 'COM_CONNECT_OUT',
'15' => 'COM_REGISTER_SLAVE',
'15' => 'COM_REGISTER_REPLICA',
'16' => 'COM_STMT_PREPARE',
'17' => 'COM_STMT_EXECUTE',
'18' => 'COM_STMT_SEND_LONG_DATA',
@@ -10553,7 +10553,7 @@ sub new {
return bless $self, $class;
}
sub get_slaves {
sub get_replicas {
my ($self, %args) = @_;
my @required_args = qw(make_cxn);
foreach my $arg ( @required_args ) {
@@ -10561,11 +10561,11 @@ sub get_slaves {
}
my ($make_cxn) = @args{@required_args};
my $slaves = [];
my $replicas = [];
my $dp = $self->{DSNParser};
my $methods = $self->_resolve_recursion_methods($args{dsn});
return $slaves unless @$methods;
return $replicas unless @$methods;
if ( grep { m/processlist|hosts/i } @$methods ) {
my @required_args = qw(dbh dsn);
@@ -10575,26 +10575,26 @@ sub get_slaves {
my ($dbh, $dsn) = @args{@required_args};
my $o = $self->{OptionParser};
$self->recurse_to_slaves(
{ dbh => $dbh,
dsn => $dsn,
slave_user => $o->got('slave-user') ? $o->get('slave-user') : '',
slave_password => $o->got('slave-password') ? $o->get('slave-password') : '',
slaves => $args{slaves},
$self->recurse_to_replicas(
{ dbh => $dbh,
dsn => $dsn,
replica_user => $o->got('replica-user') ? $o->get('replica-user') : '',
replica_password => $o->got('replica-password') ? $o->get('replica-password') : '',
replicas => $args{replicas},
callback => sub {
my ( $dsn, $dbh, $level, $parent ) = @_;
return unless $level;
PTDEBUG && _d('Found slave:', $dp->as_string($dsn));
my $slave_dsn = $dsn;
if ($o->got('slave-user')) {
$slave_dsn->{u} = $o->get('slave-user');
PTDEBUG && _d("Using slave user ".$o->get('slave-user')." on ".$slave_dsn->{h}.":".$slave_dsn->{P});
PTDEBUG && _d('Found replica:', $dp->as_string($dsn));
my $replica_dsn = $dsn;
if ($o->got('replica-user')) {
$replica_dsn->{u} = $o->get('replica-user');
PTDEBUG && _d("Using replica user ".$o->get('replica-user')." on ".$replica_dsn->{h}.":".$replica_dsn->{P});
}
if ($o->got('slave-password')) {
$slave_dsn->{p} = $o->get('slave-password');
PTDEBUG && _d("Slave password set");
if ($o->got('replica-password')) {
$replica_dsn->{p} = $o->get('replica-password');
PTDEBUG && _d("Replica password set");
}
push @$slaves, $make_cxn->(dsn => $slave_dsn, dbh => $dbh, parent => $parent);
push @$replicas, $make_cxn->(dsn => $replica_dsn, dbh => $dbh, parent => $parent);
return;
},
wait_no_die => $args{'wait_no_die'},
@@ -10602,20 +10602,20 @@ sub get_slaves {
);
} elsif ( $methods->[0] =~ m/^dsn=/i ) {
(my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i;
$slaves = $self->get_cxn_from_dsn_table(
$replicas = $self->get_cxn_from_dsn_table(
%args,
dsn_table_dsn => $dsn_table_dsn,
wait_no_die => $args{'wait_no_die'},
);
}
elsif ( $methods->[0] =~ m/none/i ) {
PTDEBUG && _d('Not getting to slaves');
PTDEBUG && _d('Not getting to replicas');
}
else {
die "Unexpected recursion methods: @$methods";
}
return $slaves;
return $replicas;
}
sub _resolve_recursion_methods {
@@ -10633,31 +10633,31 @@ sub _resolve_recursion_methods {
}
}
sub recurse_to_slaves {
sub recurse_to_replicas {
my ( $self, $args, $level ) = @_;
$level ||= 0;
my $dp = $self->{DSNParser};
my $recurse = $args->{recurse} || $self->{OptionParser}->get('recurse');
my $dsn = $args->{dsn};
my $slave_user = $args->{slave_user} || '';
my $slave_password = $args->{slave_password} || '';
my $replica_user = $args->{replica_user} || '';
my $replica_password = $args->{replica_password} || '';
my $methods = $self->_resolve_recursion_methods($dsn);
PTDEBUG && _d('Recursion methods:', @$methods);
if ( lc($methods->[0]) eq 'none' ) {
PTDEBUG && _d('Not recursing to slaves');
PTDEBUG && _d('Not recursing to replicas');
return;
}
my $slave_dsn = $dsn;
if ($slave_user) {
$slave_dsn->{u} = $slave_user;
PTDEBUG && _d("Using slave user $slave_user on "
. $slave_dsn->{h} . ":" . ( $slave_dsn->{P} ? $slave_dsn->{P} : ""));
my $replica_dsn = $dsn;
if ($replica_user) {
$replica_dsn->{u} = $replica_user;
PTDEBUG && _d("Using replica user $replica_user on "
. $replica_dsn->{h} . ":" . ( $replica_dsn->{P} ? $replica_dsn->{P} : ""));
}
if ($slave_password) {
$slave_dsn->{p} = $slave_password;
PTDEBUG && _d("Slave password set");
if ($replica_password) {
$replica_dsn->{p} = $replica_password;
PTDEBUG && _d("Replica password set");
}
my $dbh = $args->{dbh};
@@ -10665,12 +10665,12 @@ sub recurse_to_slaves {
my $get_dbh = sub {
eval {
$dbh = $dp->get_dbh(
$dp->get_cxn_params($slave_dsn), { AutoCommit => 1 }
$dp->get_cxn_params($replica_dsn), { AutoCommit => 1 }
);
PTDEBUG && _d('Connected to', $dp->as_string($slave_dsn));
PTDEBUG && _d('Connected to', $dp->as_string($replica_dsn));
};
if ( $EVAL_ERROR ) {
print STDERR "Cannot connect to ", $dp->as_string($slave_dsn), ": ", $EVAL_ERROR, "\n"
print STDERR "Cannot connect to ", $dp->as_string($replica_dsn), ": ", $EVAL_ERROR, "\n"
or die "Cannot print: $OS_ERROR";
return;
}
@@ -10678,10 +10678,10 @@ sub recurse_to_slaves {
DBH: {
if ( !defined $dbh ) {
foreach my $known_slave ( @{$args->{slaves}} ) {
if ($known_slave->{dsn}->{h} eq $slave_dsn->{h} and
$known_slave->{dsn}->{P} eq $slave_dsn->{P} ) {
$dbh = $known_slave->{dbh};
foreach my $known_replica ( @{$args->{replicas}} ) {
if ($known_replica->{dsn}->{h} eq $replica_dsn->{h} and
$known_replica->{dsn}->{P} eq $replica_dsn->{P} ) {
$dbh = $known_replica->{dbh};
last DBH;
}
}
@@ -10699,7 +10699,7 @@ sub recurse_to_slaves {
if ( $EVAL_ERROR ) {
if ( $args->{wait_no_die} ) {
print STDERR "Error getting server id: ", $EVAL_ERROR,
"\nRetrying query for server ", $slave_dsn->{h}, ":", $slave_dsn->{P}, "\n";
"\nRetrying query for server ", $replica_dsn->{h}, ":", $replica_dsn->{P}, "\n";
sleep 1;
$dbh->disconnect();
$get_dbh->();
@@ -10709,12 +10709,12 @@ sub recurse_to_slaves {
}
} until (defined $id);
PTDEBUG && _d('Working on server ID', $id);
my $master_thinks_i_am = $dsn->{server_id};
my $source_thinks_i_am = $dsn->{server_id};
if ( !defined $id
|| ( defined $master_thinks_i_am && $master_thinks_i_am != $id )
|| ( defined $source_thinks_i_am && $source_thinks_i_am != $id )
|| $args->{server_ids_seen}->{$id}++
) {
PTDEBUG && _d('Server ID seen, or not what master said');
PTDEBUG && _d('Server ID seen, or not what source said');
if ( $args->{skip_callback} ) {
$args->{skip_callback}->($dsn, $dbh, $level, $args->{parent});
}
@@ -10725,51 +10725,51 @@ sub recurse_to_slaves {
if ( !defined $recurse || $level < $recurse ) {
my @slaves =
grep { !$_->{master_id} || $_->{master_id} == $id } # Only my slaves.
$self->find_slave_hosts($dp, $dbh, $dsn, $methods);
my @replicas =
grep { !$_->{source_id} || $_->{source_id} == $id } # Only my replicas.
$self->find_replica_hosts($dp, $dbh, $dsn, $methods);
foreach my $slave ( @slaves ) {
foreach my $replica ( @replicas ) {
PTDEBUG && _d('Recursing from',
$dp->as_string($dsn), 'to', $dp->as_string($slave));
$self->recurse_to_slaves(
{ %$args, dsn => $slave, dbh => undef, parent => $dsn, slave_user => $slave_user, $slave_password => $slave_password }, $level + 1 );
$dp->as_string($dsn), 'to', $dp->as_string($replica));
$self->recurse_to_replicas(
{ %$args, dsn => $replica, dbh => undef, parent => $dsn, replica_user => $replica_user, $replica_password => $replica_password }, $level + 1 );
}
}
}
sub find_slave_hosts {
sub find_replica_hosts {
my ( $self, $dsn_parser, $dbh, $dsn, $methods ) = @_;
PTDEBUG && _d('Looking for slaves on', $dsn_parser->as_string($dsn),
PTDEBUG && _d('Looking for replicas on', $dsn_parser->as_string($dsn),
'using methods', @$methods);
my @slaves;
my @replicas;
METHOD:
foreach my $method ( @$methods ) {
my $find_slaves = "_find_slaves_by_$method";
PTDEBUG && _d('Finding slaves with', $find_slaves);
@slaves = $self->$find_slaves($dsn_parser, $dbh, $dsn);
last METHOD if @slaves;
my $find_replicas = "_find_replicas_by_$method";
PTDEBUG && _d('Finding replicas with', $find_replicas);
@replicas = $self->$find_replicas($dsn_parser, $dbh, $dsn);
last METHOD if @replicas;
}
PTDEBUG && _d('Found', scalar(@slaves), 'slaves');
return @slaves;
PTDEBUG && _d('Found', scalar(@replicas), 'replicas');
return @replicas;
}
sub _find_slaves_by_processlist {
sub _find_replicas_by_processlist {
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
my @connected_slaves = $self->get_connected_slaves($dbh);
my @slaves = $self->_process_slaves_list($dsn_parser, $dsn, \@connected_slaves);
return @slaves;
my @connected_replicas = $self->get_connected_replicas($dbh);
my @replicas = $self->_process_replicas_list($dsn_parser, $dsn, \@connected_replicas);
return @replicas;
}
sub _process_slaves_list {
my ($self, $dsn_parser, $dsn, $connected_slaves) = @_;
my @slaves = map {
my $slave = $dsn_parser->parse("h=$_", $dsn);
$slave->{source} = 'processlist';
$slave;
sub _process_replicas_list {
my ($self, $dsn_parser, $dsn, $connected_replicas) = @_;
my @replicas = map {
my $replica = $dsn_parser->parse("h=$_", $dsn);
$replica->{source} = 'processlist';
$replica;
}
grep { $_ }
map {
@@ -10781,22 +10781,28 @@ sub _process_slaves_list {
$host = '['.$host.']';
}
$host;
} @$connected_slaves;
} @$connected_replicas;
return @slaves;
return @replicas;
}
sub _find_slaves_by_hosts {
sub _find_replicas_by_hosts {
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
my @slaves;
my $sql = 'SHOW SLAVE HOSTS';
PTDEBUG && _d($dbh, $sql);
@slaves = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
my @replicas;
if ( @slaves ) {
PTDEBUG && _d('Found some SHOW SLAVE HOSTS info');
@slaves = map {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
}
PTDEBUG && _d($dbh, $sql);
@replicas = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
if ( @replicas ) {
PTDEBUG && _d('Found some SHOW REPLICAS info');
@replicas = map {
my %hash;
@hash{ map { lc $_ } keys %$_ } = values %$_;
my $spec = "h=$hash{host},P=$hash{port}"
@@ -10804,16 +10810,16 @@ sub _find_slaves_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{master_id} = $hash{master_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source} = 'hosts';
$dsn;
} @slaves;
} @replicas;
}
return @slaves;
return @replicas;
}
sub get_connected_slaves {
sub get_connected_replicas {
my ( $self, $dbh ) = @_;
my $show = "SHOW GRANTS FOR ";
@@ -10859,79 +10865,103 @@ sub get_connected_slaves {
@{$dbh->selectall_arrayref($sql, { Slice => {} })};
}
sub is_master_of {
my ( $self, $master, $slave ) = @_;
my $master_status = $self->get_master_status($master)
or die "The server specified as a master is not a master";
my $slave_status = $self->get_slave_status($slave)
or die "The server specified as a slave is not a slave";
my @connected = $self->get_connected_slaves($master)
or die "The server specified as a master has no connected slaves";
my (undef, $port) = $master->selectrow_array("SHOW VARIABLES LIKE 'port'");
sub is_source_of {
my ( $self, $source, $replica ) = @_;
if ( $port != $slave_status->{master_port} ) {
die "The slave is connected to $slave_status->{master_port} "
. "but the master's port is $port";
my $replica_version = VersionParser->new($replica);
my $source_name = 'source';
my $source_port = 'source_port';
if ( $replica_version < '8.1' || $replica_version->flavor() =~ m/maria/ ) {
$source_name = 'master';
$source_port = 'master_port';
}
if ( !grep { $slave_status->{master_user} eq $_->{user} } @connected ) {
die "I don't see any slave I/O thread connected with user "
. $slave_status->{master_user};
my $source_status = $self->get_source_status($source)
or die "The server specified as a source is not a source";
my $replica_status = $self->get_replica_status($replica)
or die "The server specified as a replica is not a replica";
my @connected = $self->get_connected_replicas($source)
or die "The server specified as a source has no connected replicas";
my (undef, $port) = $source->selectrow_array("SHOW VARIABLES LIKE 'port'");
if ( $port != $replica_status->{$source_port} ) {
die "The replica is connected to $replica_status->{$source_port} "
. "but the source's port is $port";
}
if ( ($slave_status->{slave_io_state} || '')
eq 'Waiting for master to send event' )
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
}
if ( ($replica_status->{replica_io_state} || '')
eq 'Waiting for ${source_name} to send event' )
{
my ( $master_log_name, $master_log_num )
= $master_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
my ( $slave_log_name, $slave_log_num )
= $slave_status->{master_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
if ( $master_log_name ne $slave_log_name
|| abs($master_log_num - $slave_log_num) > 1 )
my ( $source_log_name, $source_log_num )
= $source_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
my ( $replica_log_name, $replica_log_num )
= $replica_status->{source_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
if ( $source_log_name ne $replica_log_name
|| abs($source_log_num - $replica_log_num) > 1 )
{
die "The slave thinks it is reading from "
. "$slave_status->{master_log_file}, but the "
. "master is writing to $master_status->{file}";
die "The replica thinks it is reading from "
. "$replica_status->{source_log_file}, but the "
. "source is writing to $source_status->{file}";
}
}
return 1;
}
sub get_master_dsn {
sub get_source_dsn {
my ( $self, $dbh, $dsn, $dsn_parser ) = @_;
my $master = $self->get_slave_status($dbh) or return undef;
my $spec = "h=$master->{master_host},P=$master->{master_port}";
my $vp = VersionParser->new($dbh);
my $source_host = 'source_host';
my $source_port = 'source_port';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_host = 'master_host';
$source_port = 'master_port';
}
my $source = $self->get_replica_status($dbh) or return undef;
my $spec = "h=$source->{${source_host}},P=$source->{${source_port}}";
return $dsn_parser->parse($spec, $dsn);
}
sub get_slave_status {
sub get_replica_status {
my ( $self, $dbh ) = @_;
if ( !$self->{not_a_slave}->{$dbh} ) {
my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS}
||= $dbh->prepare('SHOW SLAVE STATUS');
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
my $server_version = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $server_version < '8.1' || $server_version->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
if ( !$self->{not_a_replica}->{$dbh} ) {
my $sth = $self->{sths}->{$dbh}->{REPLICA_STATUS}
||= $dbh->prepare("SHOW ${replica_name} STATUS");
PTDEBUG && _d($dbh, "SHOW ${replica_name} STATUS");
$sth->execute();
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Replica Status rows
my $ss;
if ( $sss_rows && @$sss_rows ) {
if (scalar @$sss_rows > 1) {
if (!$self->{channel}) {
die 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
die 'This server returned more than one row for SHOW REPLICA STATUS but "channel" was not specified on the command line';
}
my $slave_use_channels;
my $replica_use_channels;
for my $row (@$sss_rows) {
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
if ($row->{channel_name}) {
$slave_use_channels = 1;
$replica_use_channels = 1;
}
if ($row->{channel_name} eq $self->{channel}) {
$ss = $row;
last;
}
}
if (!$ss && $slave_use_channels) {
if (!$ss && $replica_use_channels) {
die 'This server is using replication channels but "channel" was not specified on the command line';
}
} else {
@@ -10951,28 +10981,34 @@ sub get_slave_status {
}
}
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
$self->{not_a_slave}->{$dbh}++;
PTDEBUG && _d('This server returns nothing for SHOW REPLICA STATUS');
$self->{not_a_replica}->{$dbh}++;
}
}
sub get_master_status {
sub get_source_status {
my ( $self, $dbh ) = @_;
if ( $self->{not_a_master}->{$dbh} ) {
PTDEBUG && _d('Server on dbh', $dbh, 'is not a master');
if ( $self->{not_a_source}->{$dbh} ) {
PTDEBUG && _d('Server on dbh', $dbh, 'is not a source');
return;
}
my $vp = VersionParser->new($dbh);
my $source_name = 'binary log';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
}
my $sth;
if ( $self->{sths}->{$dbh} && $dbh && $self->{sths}->{$dbh} == $dbh ) {
$sth = $self->{sths}->{$dbh}->{MASTER_STATUS}
||= $dbh->prepare('SHOW MASTER STATUS');
$sth = $self->{sths}->{$dbh}->{SOURCE_STATUS}
||= $dbh->prepare("SHOW ${source_name} STATUS");
}
else {
$sth = $dbh->prepare('SHOW MASTER STATUS');
$sth = $dbh->prepare("SHOW ${source_name} STATUS");
}
PTDEBUG && _d($dbh, 'SHOW MASTER STATUS');
PTDEBUG && _d($dbh, "SHOW ${source_name} STATUS");
$sth->execute();
my ($ms) = @{$sth->fetchall_arrayref({})};
PTDEBUG && _d(
@@ -10980,42 +11016,46 @@ sub get_master_status {
: '');
if ( !$ms || scalar keys %$ms < 2 ) {
PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a master');
$self->{not_a_master}->{$dbh}++;
PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a source');
$self->{not_a_source}->{$dbh}++;
}
return { map { lc($_) => $ms->{$_} } keys %$ms }; # lowercase the keys
}
sub wait_for_master {
sub wait_for_source {
my ( $self, %args ) = @_;
my @required_args = qw(master_status slave_dbh);
my @required_args = qw(source_status replica_dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($master_status, $slave_dbh) = @args{@required_args};
my ($source_status, $replica_dbh) = @args{@required_args};
my $timeout = $args{timeout} || 60;
my $result;
my $waited;
if ( $master_status ) {
my $slave_status;
if ( $source_status ) {
my $replica_status;
eval {
$slave_status = $self->get_slave_status($slave_dbh);
$replica_status = $self->get_replica_status($replica_dbh);
};
if ($EVAL_ERROR) {
return {
result => undef,
waited => 0,
error =>'Wait for master: this is a multi-master slave but "channel" was not specified on the command line',
error =>'Wait for source: this is a multi-source replica but "channel" was not specified on the command line',
};
}
my $server_version = VersionParser->new($slave_dbh);
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
PTDEBUG && _d($slave_dbh, $sql);
my $vp = VersionParser->new($replica_dbh);
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
}
my $channel_sql = $vp > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
my $sql = "SELECT ${source_name}_POS_WAIT('$source_status->{file}', $source_status->{position}, $timeout $channel_sql)";
PTDEBUG && _d($replica_dbh, $sql);
my $start = time;
($result) = $slave_dbh->selectrow_array($sql);
($result) = $replica_dbh->selectrow_array($sql);
$waited = time - $start;
@@ -11023,7 +11063,7 @@ sub wait_for_master {
PTDEBUG && _d("Waited", $waited, "seconds");
}
else {
PTDEBUG && _d('Not waiting: this server is not a master');
PTDEBUG && _d('Not waiting: this server is not a source');
}
return {
@@ -11032,75 +11072,96 @@ sub wait_for_master {
};
}
sub stop_slave {
sub stop_replica {
my ( $self, $dbh ) = @_;
my $sth = $self->{sths}->{$dbh}->{STOP_SLAVE}
||= $dbh->prepare('STOP SLAVE');
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
my $sth = $self->{sths}->{$dbh}->{STOP_REPLICA}
||= $dbh->prepare("STOP ${replica_name}");
PTDEBUG && _d($dbh, $sth->{Statement});
$sth->execute();
}
sub start_slave {
sub start_replica {
my ( $self, $dbh, $pos ) = @_;
my $vp = VersionParser->new($dbh);
my $source_name = 'source';
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
$replica_name = 'slave';
}
if ( $pos ) {
my $sql = "START SLAVE UNTIL MASTER_LOG_FILE='$pos->{file}', "
. "MASTER_LOG_POS=$pos->{position}";
my $sql = "START ${replica_name} UNTIL ${source_name}_LOG_FILE='$pos->{file}', "
. "${source_name}_LOG_POS=$pos->{position}";
PTDEBUG && _d($dbh, $sql);
$dbh->do($sql);
}
else {
my $sth = $self->{sths}->{$dbh}->{START_SLAVE}
||= $dbh->prepare('START SLAVE');
my $sth = $self->{sths}->{$dbh}->{START_REPLICA}
||= $dbh->prepare("START ${replica_name}");
PTDEBUG && _d($dbh, $sth->{Statement});
$sth->execute();
}
}
sub catchup_to_master {
my ( $self, $slave, $master, $timeout ) = @_;
$self->stop_slave($master);
$self->stop_slave($slave);
my $slave_status = $self->get_slave_status($slave);
my $slave_pos = $self->repl_posn($slave_status);
my $master_status = $self->get_master_status($master);
my $master_pos = $self->repl_posn($master_status);
PTDEBUG && _d('Master position:', $self->pos_to_string($master_pos),
'Slave position:', $self->pos_to_string($slave_pos));
sub catchup_to_source {
my ( $self, $replica, $source, $timeout ) = @_;
$self->stop_replica($source);
$self->stop_replica($replica);
my $replica_status = $self->get_replica_status($replica);
my $replica_pos = $self->repl_posn($replica_status);
my $source_status = $self->get_source_status($source);
my $source_pos = $self->repl_posn($source_status);
PTDEBUG && _d('Source position:', $self->pos_to_string($source_pos),
'Replica position:', $self->pos_to_string($replica_pos));
my $result;
if ( $self->pos_cmp($slave_pos, $master_pos) < 0 ) {
PTDEBUG && _d('Waiting for slave to catch up to master');
$self->start_slave($slave, $master_pos);
if ( $self->pos_cmp($replica_pos, $source_pos) < 0 ) {
PTDEBUG && _d('Waiting for replica to catch up to source');
$self->start_replica($replica, $source_pos);
$result = $self->wait_for_master(
master_status => $master_status,
slave_dbh => $slave,
$result = $self->wait_for_source(
source_status => $source_status,
replica_dbh => $replica,
timeout => $timeout,
master_status => $master_status
source_status => $source_status
);
if ($result->{error}) {
die $result->{error};
}
if ( !defined $result->{result} ) {
$slave_status = $self->get_slave_status($slave);
if ( !$self->slave_is_running($slave_status) ) {
PTDEBUG && _d('Master position:',
$self->pos_to_string($master_pos),
'Slave position:', $self->pos_to_string($slave_pos));
$slave_pos = $self->repl_posn($slave_status);
if ( $self->pos_cmp($slave_pos, $master_pos) != 0 ) {
die "MASTER_POS_WAIT() returned NULL but slave has not "
. "caught up to master";
$replica_status = $self->get_replica_status($replica);
my $vp = VersionParser->new($replica);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
if ( !$self->replica_is_running($replica_status, $replica_name) ) {
PTDEBUG && _d('Source position:',
$self->pos_to_string($source_pos),
'Replica position:', $self->pos_to_string($replica_pos));
$replica_pos = $self->repl_posn($replica_status);
if ( $self->pos_cmp($replica_pos, $source_pos) != 0 ) {
die "SOURCE_POS_WAIT() returned NULL but replica has not "
. "caught up to source";
}
PTDEBUG && _d('Slave is caught up to master and stopped');
PTDEBUG && _d('Replica is caught up to source and stopped');
}
else {
die "Slave has not caught up to master and it is still running";
die "Replica has not caught up to source and it is still running";
}
}
}
else {
PTDEBUG && _d("Slave is already caught up to master");
PTDEBUG && _d("Replica is already caught up to source");
}
return $result;
@@ -11108,26 +11169,38 @@ sub catchup_to_master {
sub catchup_to_same_pos {
my ( $self, $s1_dbh, $s2_dbh ) = @_;
$self->stop_slave($s1_dbh);
$self->stop_slave($s2_dbh);
my $s1_status = $self->get_slave_status($s1_dbh);
my $s2_status = $self->get_slave_status($s2_dbh);
$self->stop_replica($s1_dbh);
$self->stop_replica($s2_dbh);
my $s1_status = $self->get_replica_status($s1_dbh);
my $s2_status = $self->get_replica_status($s2_dbh);
my $s1_pos = $self->repl_posn($s1_status);
my $s2_pos = $self->repl_posn($s2_status);
if ( $self->pos_cmp($s1_pos, $s2_pos) < 0 ) {
$self->start_slave($s1_dbh, $s2_pos);
$self->start_replica($s1_dbh, $s2_pos);
}
elsif ( $self->pos_cmp($s2_pos, $s1_pos) < 0 ) {
$self->start_slave($s2_dbh, $s1_pos);
$self->start_replica($s2_dbh, $s1_pos);
}
$s1_status = $self->get_slave_status($s1_dbh);
$s2_status = $self->get_slave_status($s2_dbh);
$s1_status = $self->get_replica_status($s1_dbh);
$s2_status = $self->get_replica_status($s2_dbh);
$s1_pos = $self->repl_posn($s1_status);
$s2_pos = $self->repl_posn($s2_status);
if ( $self->slave_is_running($s1_status)
|| $self->slave_is_running($s2_status)
my $vp1 = VersionParser->new($s1_dbh);
my $replica1_name = 'replica';
if ( $vp1 < '8.1' || $vp1->flavor() =~ m/maria/ ) {
$replica1_name = 'slave';
}
my $vp2 = VersionParser->new($s2_dbh);
my $replica2_name = 'replica';
if ( $vp2 < '8.1' || $vp2->flavor() =~ m/maria/ ) {
$replica2_name = 'slave';
}
if ( $self->replica_is_running($s1_status, $replica1_name)
|| $self->replica_is_running($s2_status, $replica2_name)
|| $self->pos_cmp($s1_pos, $s2_pos) != 0)
{
die "The servers aren't both stopped at the same position";
@@ -11135,14 +11208,21 @@ sub catchup_to_same_pos {
}
sub slave_is_running {
my ( $self, $slave_status ) = @_;
return ($slave_status->{slave_sql_running} || 'No') eq 'Yes';
sub replica_is_running {
my ( $self, $replica_status, $replica_name ) = @_;
return ($replica_status->{"${replica_name}_sql_running"} || 'No') eq 'Yes';
}
sub has_slave_updates {
sub has_replica_updates {
my ( $self, $dbh ) = @_;
my $sql = q{SHOW VARIABLES LIKE 'log_slave_updates'};
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
my $sql = qq{SHOW VARIABLES LIKE 'log_${replica_name}_updates'};
PTDEBUG && _d($dbh, $sql);
my ($name, $value) = $dbh->selectrow_array($sql);
return $value && $value =~ m/^(1|ON)$/;
@@ -11156,6 +11236,12 @@ sub repl_posn {
position => $status->{position},
};
}
elsif ( exists $status->{relay_source_log_file} && exists $status->{exec_source_log_pos} ) {
return {
file => $status->{relay_source_log_file},
position => $status->{exec_source_log_pos},
};
}
else {
return {
file => $status->{relay_master_log_file},
@@ -11164,11 +11250,18 @@ sub repl_posn {
}
}
sub get_slave_lag {
sub get_replica_lag {
my ( $self, $dbh ) = @_;
my $stat = $self->get_slave_status($dbh);
return unless $stat; # server is not a slave
return $stat->{seconds_behind_master};
my $vp = VersionParser->new($dbh);
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
}
my $stat = $self->get_replica_status($dbh);
return unless $stat; # server is not a replica
return $stat->{"seconds_behind_${source_name}"};
}
sub pos_cmp {
@@ -11179,9 +11272,9 @@ sub pos_cmp {
sub short_host {
my ( $self, $dsn ) = @_;
my ($host, $port);
if ( $dsn->{master_host} ) {
$host = $dsn->{master_host};
$port = $dsn->{master_port};
if ( $dsn->{source_host} ) {
$host = $dsn->{source_host};
$port = $dsn->{source_port};
}
else {
$host = $dsn->{h};
@@ -11196,7 +11289,7 @@ sub is_replication_thread {
my $type = lc($args{type} || 'all');
die "Invalid type: $type"
unless $type =~ m/^binlog_dump|slave_io|slave_sql|all$/i;
unless $type =~ m/^binlog_dump|slave_io|slave_sql|replica_io|replica_sql|all$/i;
my $match = 0;
if ( $type =~ m/binlog_dump|all/i ) {
@@ -11205,7 +11298,7 @@ sub is_replication_thread {
}
if ( !$match ) {
if ( ($query->{User} || $query->{user} || '') eq "system user" ) {
PTDEBUG && _d("Slave replication thread");
PTDEBUG && _d("Replica replication thread");
if ( $type ne 'all' ) {
my $state = $query->{State} || $query->{state} || '';
@@ -11214,15 +11307,16 @@ sub is_replication_thread {
$match = 1;
}
else {
my ($slave_sql) = $state =~ m/
my ($replica_sql) = $state =~ m/
^(Waiting\sfor\sthe\snext\sevent
|Reading\sevent\sfrom\sthe\srelay\slog
|Has\sread\sall\srelay\slog;\swaiting
|Making\stemp\sfile
|Waiting\sfor\sslave\smutex\son\sexit)/xi;
|Waiting\sfor\sslave\smutex\son\sexit
|Waiting\sfor\sreplica\smutex\son\sexit)/xi;
$match = $type eq 'slave_sql' && $slave_sql ? 1
: $type eq 'slave_io' && !$slave_sql ? 1
$match = $type eq 'replica_sql' && $replica_sql ? 1
: $type eq 'replica_io' && !$replica_sql ? 1
: 0;
}
}
@@ -11263,9 +11357,15 @@ sub get_replication_filters {
}
my ($dbh) = @args{@required_args};
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
my %filters = ();
my $status = $self->get_master_status($dbh);
my $status = $self->get_source_status($dbh);
if ( $status ) {
map { $filters{$_} = $status->{$_} }
grep { defined $status->{$_} && $status->{$_} ne '' }
@@ -11275,7 +11375,7 @@ sub get_replication_filters {
);
}
$status = $self->get_slave_status($dbh);
$status = $self->get_replica_status($dbh);
if ( $status ) {
map { $filters{$_} = $status->{$_} }
grep { defined $status->{$_} && $status->{$_} ne '' }
@@ -11288,10 +11388,10 @@ sub get_replication_filters {
replicate_wild_ignore_table
);
my $sql = "SHOW VARIABLES LIKE 'slave_skip_errors'";
my $sql = "SHOW VARIABLES LIKE '${replica_name}_skip_errors'";
PTDEBUG && _d($dbh, $sql);
my $row = $dbh->selectrow_arrayref($sql);
$filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
$filters{replica_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
}
return \%filters;

View File

@@ -1309,7 +1309,7 @@ sub get_cxn_params {
$dsn = 'DBI:mysql:' . ( $info->{D} || '' ) . ';'
. join(';', map { "$opts{$_}->{dsn}=$info->{$_}" }
grep { defined $info->{$_} }
qw(F h P S A))
qw(F h P S A s))
. ';mysql_read_default_group=client'
. ($info->{L} ? ';mysql_local_infile=1' : '');
}

View File

@@ -2030,7 +2030,7 @@ sub get_cxn_params {
$dsn = 'DBI:mysql:' . ( $info->{D} || '' ) . ';'
. join(';', map { "$opts{$_}->{dsn}=$info->{$_}" }
grep { defined $info->{$_} }
qw(F h P S A))
qw(F h P S A s))
. ';mysql_read_default_group=client'
. ($info->{L} ? ';mysql_local_infile=1' : '');
}

View File

@@ -1965,7 +1965,7 @@ sub get_cxn_params {
$dsn = 'DBI:mysql:' . ( $info->{D} || '' ) . ';'
. join(';', map { "$opts{$_}->{dsn}=$info->{$_}" }
grep { defined $info->{$_} }
qw(F h P S A))
qw(F h P S A s))
. ';mysql_read_default_group=client'
. ($info->{L} ? ';mysql_local_infile=1' : '');
}
@@ -2284,7 +2284,7 @@ sub new {
return bless $self, $class;
}
sub get_slaves {
sub get_replicas {
my ($self, %args) = @_;
my @required_args = qw(make_cxn);
foreach my $arg ( @required_args ) {
@@ -2292,11 +2292,11 @@ sub get_slaves {
}
my ($make_cxn) = @args{@required_args};
my $slaves = [];
my $replicas = [];
my $dp = $self->{DSNParser};
my $methods = $self->_resolve_recursion_methods($args{dsn});
return $slaves unless @$methods;
return $replicas unless @$methods;
if ( grep { m/processlist|hosts/i } @$methods ) {
my @required_args = qw(dbh dsn);
@@ -2306,26 +2306,26 @@ sub get_slaves {
my ($dbh, $dsn) = @args{@required_args};
my $o = $self->{OptionParser};
$self->recurse_to_slaves(
{ dbh => $dbh,
dsn => $dsn,
slave_user => $o->got('slave-user') ? $o->get('slave-user') : '',
slave_password => $o->got('slave-password') ? $o->get('slave-password') : '',
slaves => $args{slaves},
$self->recurse_to_replicas(
{ dbh => $dbh,
dsn => $dsn,
replica_user => $o->got('replica-user') ? $o->get('replica-user') : '',
replica_password => $o->got('replica-password') ? $o->get('replica-password') : '',
replicas => $args{replicas},
callback => sub {
my ( $dsn, $dbh, $level, $parent ) = @_;
return unless $level;
PTDEBUG && _d('Found slave:', $dp->as_string($dsn));
my $slave_dsn = $dsn;
if ($o->got('slave-user')) {
$slave_dsn->{u} = $o->get('slave-user');
PTDEBUG && _d("Using slave user ".$o->get('slave-user')." on ".$slave_dsn->{h}.":".$slave_dsn->{P});
PTDEBUG && _d('Found replica:', $dp->as_string($dsn));
my $replica_dsn = $dsn;
if ($o->got('replica-user')) {
$replica_dsn->{u} = $o->get('replica-user');
PTDEBUG && _d("Using replica user ".$o->get('replica-user')." on ".$replica_dsn->{h}.":".$replica_dsn->{P});
}
if ($o->got('slave-password')) {
$slave_dsn->{p} = $o->get('slave-password');
PTDEBUG && _d("Slave password set");
if ($o->got('replica-password')) {
$replica_dsn->{p} = $o->get('replica-password');
PTDEBUG && _d("Replica password set");
}
push @$slaves, $make_cxn->(dsn => $slave_dsn, dbh => $dbh, parent => $parent);
push @$replicas, $make_cxn->(dsn => $replica_dsn, dbh => $dbh, parent => $parent);
return;
},
wait_no_die => $args{'wait_no_die'},
@@ -2333,20 +2333,20 @@ sub get_slaves {
);
} elsif ( $methods->[0] =~ m/^dsn=/i ) {
(my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i;
$slaves = $self->get_cxn_from_dsn_table(
$replicas = $self->get_cxn_from_dsn_table(
%args,
dsn_table_dsn => $dsn_table_dsn,
wait_no_die => $args{'wait_no_die'},
);
}
elsif ( $methods->[0] =~ m/none/i ) {
PTDEBUG && _d('Not getting to slaves');
PTDEBUG && _d('Not getting to replicas');
}
else {
die "Unexpected recursion methods: @$methods";
}
return $slaves;
return $replicas;
}
sub _resolve_recursion_methods {
@@ -2364,31 +2364,31 @@ sub _resolve_recursion_methods {
}
}
sub recurse_to_slaves {
sub recurse_to_replicas {
my ( $self, $args, $level ) = @_;
$level ||= 0;
my $dp = $self->{DSNParser};
my $recurse = $args->{recurse} || $self->{OptionParser}->get('recurse');
my $dsn = $args->{dsn};
my $slave_user = $args->{slave_user} || '';
my $slave_password = $args->{slave_password} || '';
my $replica_user = $args->{replica_user} || '';
my $replica_password = $args->{replica_password} || '';
my $methods = $self->_resolve_recursion_methods($dsn);
PTDEBUG && _d('Recursion methods:', @$methods);
if ( lc($methods->[0]) eq 'none' ) {
PTDEBUG && _d('Not recursing to slaves');
PTDEBUG && _d('Not recursing to replicas');
return;
}
my $slave_dsn = $dsn;
if ($slave_user) {
$slave_dsn->{u} = $slave_user;
PTDEBUG && _d("Using slave user $slave_user on "
. $slave_dsn->{h} . ":" . ( $slave_dsn->{P} ? $slave_dsn->{P} : ""));
my $replica_dsn = $dsn;
if ($replica_user) {
$replica_dsn->{u} = $replica_user;
PTDEBUG && _d("Using replica user $replica_user on "
. $replica_dsn->{h} . ":" . ( $replica_dsn->{P} ? $replica_dsn->{P} : ""));
}
if ($slave_password) {
$slave_dsn->{p} = $slave_password;
PTDEBUG && _d("Slave password set");
if ($replica_password) {
$replica_dsn->{p} = $replica_password;
PTDEBUG && _d("Replica password set");
}
my $dbh = $args->{dbh};
@@ -2396,12 +2396,12 @@ sub recurse_to_slaves {
my $get_dbh = sub {
eval {
$dbh = $dp->get_dbh(
$dp->get_cxn_params($slave_dsn), { AutoCommit => 1 }
$dp->get_cxn_params($replica_dsn), { AutoCommit => 1 }
);
PTDEBUG && _d('Connected to', $dp->as_string($slave_dsn));
PTDEBUG && _d('Connected to', $dp->as_string($replica_dsn));
};
if ( $EVAL_ERROR ) {
print STDERR "Cannot connect to ", $dp->as_string($slave_dsn), ": ", $EVAL_ERROR, "\n"
print STDERR "Cannot connect to ", $dp->as_string($replica_dsn), ": ", $EVAL_ERROR, "\n"
or die "Cannot print: $OS_ERROR";
return;
}
@@ -2409,10 +2409,10 @@ sub recurse_to_slaves {
DBH: {
if ( !defined $dbh ) {
foreach my $known_slave ( @{$args->{slaves}} ) {
if ($known_slave->{dsn}->{h} eq $slave_dsn->{h} and
$known_slave->{dsn}->{P} eq $slave_dsn->{P} ) {
$dbh = $known_slave->{dbh};
foreach my $known_replica ( @{$args->{replicas}} ) {
if ($known_replica->{dsn}->{h} eq $replica_dsn->{h} and
$known_replica->{dsn}->{P} eq $replica_dsn->{P} ) {
$dbh = $known_replica->{dbh};
last DBH;
}
}
@@ -2430,7 +2430,7 @@ sub recurse_to_slaves {
if ( $EVAL_ERROR ) {
if ( $args->{wait_no_die} ) {
print STDERR "Error getting server id: ", $EVAL_ERROR,
"\nRetrying query for server ", $slave_dsn->{h}, ":", $slave_dsn->{P}, "\n";
"\nRetrying query for server ", $replica_dsn->{h}, ":", $replica_dsn->{P}, "\n";
sleep 1;
$dbh->disconnect();
$get_dbh->();
@@ -2440,12 +2440,12 @@ sub recurse_to_slaves {
}
} until (defined $id);
PTDEBUG && _d('Working on server ID', $id);
my $master_thinks_i_am = $dsn->{server_id};
my $source_thinks_i_am = $dsn->{server_id};
if ( !defined $id
|| ( defined $master_thinks_i_am && $master_thinks_i_am != $id )
|| ( defined $source_thinks_i_am && $source_thinks_i_am != $id )
|| $args->{server_ids_seen}->{$id}++
) {
PTDEBUG && _d('Server ID seen, or not what master said');
PTDEBUG && _d('Server ID seen, or not what source said');
if ( $args->{skip_callback} ) {
$args->{skip_callback}->($dsn, $dbh, $level, $args->{parent});
}
@@ -2456,51 +2456,51 @@ sub recurse_to_slaves {
if ( !defined $recurse || $level < $recurse ) {
my @slaves =
grep { !$_->{master_id} || $_->{master_id} == $id } # Only my slaves.
$self->find_slave_hosts($dp, $dbh, $dsn, $methods);
my @replicas =
grep { !$_->{source_id} || $_->{source_id} == $id } # Only my replicas.
$self->find_replica_hosts($dp, $dbh, $dsn, $methods);
foreach my $slave ( @slaves ) {
foreach my $replica ( @replicas ) {
PTDEBUG && _d('Recursing from',
$dp->as_string($dsn), 'to', $dp->as_string($slave));
$self->recurse_to_slaves(
{ %$args, dsn => $slave, dbh => undef, parent => $dsn, slave_user => $slave_user, $slave_password => $slave_password }, $level + 1 );
$dp->as_string($dsn), 'to', $dp->as_string($replica));
$self->recurse_to_replicas(
{ %$args, dsn => $replica, dbh => undef, parent => $dsn, replica_user => $replica_user, $replica_password => $replica_password }, $level + 1 );
}
}
}
sub find_slave_hosts {
sub find_replica_hosts {
my ( $self, $dsn_parser, $dbh, $dsn, $methods ) = @_;
PTDEBUG && _d('Looking for slaves on', $dsn_parser->as_string($dsn),
PTDEBUG && _d('Looking for replicas on', $dsn_parser->as_string($dsn),
'using methods', @$methods);
my @slaves;
my @replicas;
METHOD:
foreach my $method ( @$methods ) {
my $find_slaves = "_find_slaves_by_$method";
PTDEBUG && _d('Finding slaves with', $find_slaves);
@slaves = $self->$find_slaves($dsn_parser, $dbh, $dsn);
last METHOD if @slaves;
my $find_replicas = "_find_replicas_by_$method";
PTDEBUG && _d('Finding replicas with', $find_replicas);
@replicas = $self->$find_replicas($dsn_parser, $dbh, $dsn);
last METHOD if @replicas;
}
PTDEBUG && _d('Found', scalar(@slaves), 'slaves');
return @slaves;
PTDEBUG && _d('Found', scalar(@replicas), 'replicas');
return @replicas;
}
sub _find_slaves_by_processlist {
sub _find_replicas_by_processlist {
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
my @connected_slaves = $self->get_connected_slaves($dbh);
my @slaves = $self->_process_slaves_list($dsn_parser, $dsn, \@connected_slaves);
return @slaves;
my @connected_replicas = $self->get_connected_replicas($dbh);
my @replicas = $self->_process_replicas_list($dsn_parser, $dsn, \@connected_replicas);
return @replicas;
}
sub _process_slaves_list {
my ($self, $dsn_parser, $dsn, $connected_slaves) = @_;
my @slaves = map {
my $slave = $dsn_parser->parse("h=$_", $dsn);
$slave->{source} = 'processlist';
$slave;
sub _process_replicas_list {
my ($self, $dsn_parser, $dsn, $connected_replicas) = @_;
my @replicas = map {
my $replica = $dsn_parser->parse("h=$_", $dsn);
$replica->{source} = 'processlist';
$replica;
}
grep { $_ }
map {
@@ -2512,22 +2512,28 @@ sub _process_slaves_list {
$host = '['.$host.']';
}
$host;
} @$connected_slaves;
} @$connected_replicas;
return @slaves;
return @replicas;
}
sub _find_slaves_by_hosts {
sub _find_replicas_by_hosts {
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
my @slaves;
my $sql = 'SHOW SLAVE HOSTS';
PTDEBUG && _d($dbh, $sql);
@slaves = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
my @replicas;
if ( @slaves ) {
PTDEBUG && _d('Found some SHOW SLAVE HOSTS info');
@slaves = map {
my $vp = VersionParser->new($dbh);
my $sql = 'SHOW REPLICAS';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$sql = 'SHOW SLAVE HOSTS';
}
PTDEBUG && _d($dbh, $sql);
@replicas = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
if ( @replicas ) {
PTDEBUG && _d('Found some SHOW REPLICAS info');
@replicas = map {
my %hash;
@hash{ map { lc $_ } keys %$_ } = values %$_;
my $spec = "h=$hash{host},P=$hash{port}"
@@ -2535,16 +2541,16 @@ sub _find_slaves_by_hosts {
. ( $hash{password} ? ",p=$hash{password}" : '');
my $dsn = $dsn_parser->parse($spec, $dsn);
$dsn->{server_id} = $hash{server_id};
$dsn->{master_id} = $hash{master_id};
$dsn->{source_id} = $hash{source_id};
$dsn->{source} = 'hosts';
$dsn;
} @slaves;
} @replicas;
}
return @slaves;
return @replicas;
}
sub get_connected_slaves {
sub get_connected_replicas {
my ( $self, $dbh ) = @_;
my $show = "SHOW GRANTS FOR ";
@@ -2590,79 +2596,103 @@ sub get_connected_slaves {
@{$dbh->selectall_arrayref($sql, { Slice => {} })};
}
sub is_master_of {
my ( $self, $master, $slave ) = @_;
my $master_status = $self->get_master_status($master)
or die "The server specified as a master is not a master";
my $slave_status = $self->get_slave_status($slave)
or die "The server specified as a slave is not a slave";
my @connected = $self->get_connected_slaves($master)
or die "The server specified as a master has no connected slaves";
my (undef, $port) = $master->selectrow_array("SHOW VARIABLES LIKE 'port'");
sub is_source_of {
my ( $self, $source, $replica ) = @_;
if ( $port != $slave_status->{master_port} ) {
die "The slave is connected to $slave_status->{master_port} "
. "but the master's port is $port";
my $replica_version = VersionParser->new($replica);
my $source_name = 'source';
my $source_port = 'source_port';
if ( $replica_version < '8.1' || $replica_version->flavor() =~ m/maria/ ) {
$source_name = 'master';
$source_port = 'master_port';
}
if ( !grep { $slave_status->{master_user} eq $_->{user} } @connected ) {
die "I don't see any slave I/O thread connected with user "
. $slave_status->{master_user};
my $source_status = $self->get_source_status($source)
or die "The server specified as a source is not a source";
my $replica_status = $self->get_replica_status($replica)
or die "The server specified as a replica is not a replica";
my @connected = $self->get_connected_replicas($source)
or die "The server specified as a source has no connected replicas";
my (undef, $port) = $source->selectrow_array("SHOW VARIABLES LIKE 'port'");
if ( $port != $replica_status->{$source_port} ) {
die "The replica is connected to $replica_status->{$source_port} "
. "but the source's port is $port";
}
if ( ($slave_status->{slave_io_state} || '')
eq 'Waiting for master to send event' )
if ( !grep { $replica_status->{source_user} eq $_->{user} } @connected ) {
die "I don't see any replica I/O thread connected with user "
. $replica_status->{source_user};
}
if ( ($replica_status->{replica_io_state} || '')
eq 'Waiting for ${source_name} to send event' )
{
my ( $master_log_name, $master_log_num )
= $master_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
my ( $slave_log_name, $slave_log_num )
= $slave_status->{master_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
if ( $master_log_name ne $slave_log_name
|| abs($master_log_num - $slave_log_num) > 1 )
my ( $source_log_name, $source_log_num )
= $source_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
my ( $replica_log_name, $replica_log_num )
= $replica_status->{source_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
if ( $source_log_name ne $replica_log_name
|| abs($source_log_num - $replica_log_num) > 1 )
{
die "The slave thinks it is reading from "
. "$slave_status->{master_log_file}, but the "
. "master is writing to $master_status->{file}";
die "The replica thinks it is reading from "
. "$replica_status->{source_log_file}, but the "
. "source is writing to $source_status->{file}";
}
}
return 1;
}
sub get_master_dsn {
sub get_source_dsn {
my ( $self, $dbh, $dsn, $dsn_parser ) = @_;
my $master = $self->get_slave_status($dbh) or return undef;
my $spec = "h=$master->{master_host},P=$master->{master_port}";
my $vp = VersionParser->new($dbh);
my $source_host = 'source_host';
my $source_port = 'source_port';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_host = 'master_host';
$source_port = 'master_port';
}
my $source = $self->get_replica_status($dbh) or return undef;
my $spec = "h=$source->{${source_host}},P=$source->{${source_port}}";
return $dsn_parser->parse($spec, $dsn);
}
sub get_slave_status {
sub get_replica_status {
my ( $self, $dbh ) = @_;
if ( !$self->{not_a_slave}->{$dbh} ) {
my $sth = $self->{sths}->{$dbh}->{SLAVE_STATUS}
||= $dbh->prepare('SHOW SLAVE STATUS');
PTDEBUG && _d($dbh, 'SHOW SLAVE STATUS');
my $server_version = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $server_version < '8.1' || $server_version->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
if ( !$self->{not_a_replica}->{$dbh} ) {
my $sth = $self->{sths}->{$dbh}->{REPLICA_STATUS}
||= $dbh->prepare("SHOW ${replica_name} STATUS");
PTDEBUG && _d($dbh, "SHOW ${replica_name} STATUS");
$sth->execute();
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Slave Status rows
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Replica Status rows
my $ss;
if ( $sss_rows && @$sss_rows ) {
if (scalar @$sss_rows > 1) {
if (!$self->{channel}) {
die 'This server returned more than one row for SHOW SLAVE STATUS but "channel" was not specified on the command line';
die 'This server returned more than one row for SHOW REPLICA STATUS but "channel" was not specified on the command line';
}
my $slave_use_channels;
my $replica_use_channels;
for my $row (@$sss_rows) {
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
if ($row->{channel_name}) {
$slave_use_channels = 1;
$replica_use_channels = 1;
}
if ($row->{channel_name} eq $self->{channel}) {
$ss = $row;
last;
}
}
if (!$ss && $slave_use_channels) {
if (!$ss && $replica_use_channels) {
die 'This server is using replication channels but "channel" was not specified on the command line';
}
} else {
@@ -2682,28 +2712,34 @@ sub get_slave_status {
}
}
PTDEBUG && _d('This server returns nothing for SHOW SLAVE STATUS');
$self->{not_a_slave}->{$dbh}++;
PTDEBUG && _d('This server returns nothing for SHOW REPLICA STATUS');
$self->{not_a_replica}->{$dbh}++;
}
}
sub get_master_status {
sub get_source_status {
my ( $self, $dbh ) = @_;
if ( $self->{not_a_master}->{$dbh} ) {
PTDEBUG && _d('Server on dbh', $dbh, 'is not a master');
if ( $self->{not_a_source}->{$dbh} ) {
PTDEBUG && _d('Server on dbh', $dbh, 'is not a source');
return;
}
my $vp = VersionParser->new($dbh);
my $source_name = 'binary log';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
}
my $sth;
if ( $self->{sths}->{$dbh} && $dbh && $self->{sths}->{$dbh} == $dbh ) {
$sth = $self->{sths}->{$dbh}->{MASTER_STATUS}
||= $dbh->prepare('SHOW MASTER STATUS');
$sth = $self->{sths}->{$dbh}->{SOURCE_STATUS}
||= $dbh->prepare("SHOW ${source_name} STATUS");
}
else {
$sth = $dbh->prepare('SHOW MASTER STATUS');
$sth = $dbh->prepare("SHOW ${source_name} STATUS");
}
PTDEBUG && _d($dbh, 'SHOW MASTER STATUS');
PTDEBUG && _d($dbh, "SHOW ${source_name} STATUS");
$sth->execute();
my ($ms) = @{$sth->fetchall_arrayref({})};
PTDEBUG && _d(
@@ -2711,42 +2747,46 @@ sub get_master_status {
: '');
if ( !$ms || scalar keys %$ms < 2 ) {
PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a master');
$self->{not_a_master}->{$dbh}++;
PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a source');
$self->{not_a_source}->{$dbh}++;
}
return { map { lc($_) => $ms->{$_} } keys %$ms }; # lowercase the keys
}
sub wait_for_master {
sub wait_for_source {
my ( $self, %args ) = @_;
my @required_args = qw(master_status slave_dbh);
my @required_args = qw(source_status replica_dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($master_status, $slave_dbh) = @args{@required_args};
my ($source_status, $replica_dbh) = @args{@required_args};
my $timeout = $args{timeout} || 60;
my $result;
my $waited;
if ( $master_status ) {
my $slave_status;
if ( $source_status ) {
my $replica_status;
eval {
$slave_status = $self->get_slave_status($slave_dbh);
$replica_status = $self->get_replica_status($replica_dbh);
};
if ($EVAL_ERROR) {
return {
result => undef,
waited => 0,
error =>'Wait for master: this is a multi-master slave but "channel" was not specified on the command line',
error =>'Wait for source: this is a multi-source replica but "channel" was not specified on the command line',
};
}
my $server_version = VersionParser->new($slave_dbh);
my $channel_sql = $server_version > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
my $sql = "SELECT MASTER_POS_WAIT('$master_status->{file}', $master_status->{position}, $timeout $channel_sql)";
PTDEBUG && _d($slave_dbh, $sql);
my $vp = VersionParser->new($replica_dbh);
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
}
my $channel_sql = $vp > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
my $sql = "SELECT ${source_name}_POS_WAIT('$source_status->{file}', $source_status->{position}, $timeout $channel_sql)";
PTDEBUG && _d($replica_dbh, $sql);
my $start = time;
($result) = $slave_dbh->selectrow_array($sql);
($result) = $replica_dbh->selectrow_array($sql);
$waited = time - $start;
@@ -2754,7 +2794,7 @@ sub wait_for_master {
PTDEBUG && _d("Waited", $waited, "seconds");
}
else {
PTDEBUG && _d('Not waiting: this server is not a master');
PTDEBUG && _d('Not waiting: this server is not a source');
}
return {
@@ -2763,75 +2803,96 @@ sub wait_for_master {
};
}
sub stop_slave {
sub stop_replica {
my ( $self, $dbh ) = @_;
my $sth = $self->{sths}->{$dbh}->{STOP_SLAVE}
||= $dbh->prepare('STOP SLAVE');
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
my $sth = $self->{sths}->{$dbh}->{STOP_REPLICA}
||= $dbh->prepare("STOP ${replica_name}");
PTDEBUG && _d($dbh, $sth->{Statement});
$sth->execute();
}
sub start_slave {
sub start_replica {
my ( $self, $dbh, $pos ) = @_;
my $vp = VersionParser->new($dbh);
my $source_name = 'source';
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
$replica_name = 'slave';
}
if ( $pos ) {
my $sql = "START SLAVE UNTIL MASTER_LOG_FILE='$pos->{file}', "
. "MASTER_LOG_POS=$pos->{position}";
my $sql = "START ${replica_name} UNTIL ${source_name}_LOG_FILE='$pos->{file}', "
. "${source_name}_LOG_POS=$pos->{position}";
PTDEBUG && _d($dbh, $sql);
$dbh->do($sql);
}
else {
my $sth = $self->{sths}->{$dbh}->{START_SLAVE}
||= $dbh->prepare('START SLAVE');
my $sth = $self->{sths}->{$dbh}->{START_REPLICA}
||= $dbh->prepare("START ${replica_name}");
PTDEBUG && _d($dbh, $sth->{Statement});
$sth->execute();
}
}
sub catchup_to_master {
my ( $self, $slave, $master, $timeout ) = @_;
$self->stop_slave($master);
$self->stop_slave($slave);
my $slave_status = $self->get_slave_status($slave);
my $slave_pos = $self->repl_posn($slave_status);
my $master_status = $self->get_master_status($master);
my $master_pos = $self->repl_posn($master_status);
PTDEBUG && _d('Master position:', $self->pos_to_string($master_pos),
'Slave position:', $self->pos_to_string($slave_pos));
sub catchup_to_source {
my ( $self, $replica, $source, $timeout ) = @_;
$self->stop_replica($source);
$self->stop_replica($replica);
my $replica_status = $self->get_replica_status($replica);
my $replica_pos = $self->repl_posn($replica_status);
my $source_status = $self->get_source_status($source);
my $source_pos = $self->repl_posn($source_status);
PTDEBUG && _d('Source position:', $self->pos_to_string($source_pos),
'Replica position:', $self->pos_to_string($replica_pos));
my $result;
if ( $self->pos_cmp($slave_pos, $master_pos) < 0 ) {
PTDEBUG && _d('Waiting for slave to catch up to master');
$self->start_slave($slave, $master_pos);
if ( $self->pos_cmp($replica_pos, $source_pos) < 0 ) {
PTDEBUG && _d('Waiting for replica to catch up to source');
$self->start_replica($replica, $source_pos);
$result = $self->wait_for_master(
master_status => $master_status,
slave_dbh => $slave,
$result = $self->wait_for_source(
source_status => $source_status,
replica_dbh => $replica,
timeout => $timeout,
master_status => $master_status
source_status => $source_status
);
if ($result->{error}) {
die $result->{error};
}
if ( !defined $result->{result} ) {
$slave_status = $self->get_slave_status($slave);
if ( !$self->slave_is_running($slave_status) ) {
PTDEBUG && _d('Master position:',
$self->pos_to_string($master_pos),
'Slave position:', $self->pos_to_string($slave_pos));
$slave_pos = $self->repl_posn($slave_status);
if ( $self->pos_cmp($slave_pos, $master_pos) != 0 ) {
die "MASTER_POS_WAIT() returned NULL but slave has not "
. "caught up to master";
$replica_status = $self->get_replica_status($replica);
my $vp = VersionParser->new($replica);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
if ( !$self->replica_is_running($replica_status, $replica_name) ) {
PTDEBUG && _d('Source position:',
$self->pos_to_string($source_pos),
'Replica position:', $self->pos_to_string($replica_pos));
$replica_pos = $self->repl_posn($replica_status);
if ( $self->pos_cmp($replica_pos, $source_pos) != 0 ) {
die "SOURCE_POS_WAIT() returned NULL but replica has not "
. "caught up to source";
}
PTDEBUG && _d('Slave is caught up to master and stopped');
PTDEBUG && _d('Replica is caught up to source and stopped');
}
else {
die "Slave has not caught up to master and it is still running";
die "Replica has not caught up to source and it is still running";
}
}
}
else {
PTDEBUG && _d("Slave is already caught up to master");
PTDEBUG && _d("Replica is already caught up to source");
}
return $result;
@@ -2839,26 +2900,38 @@ sub catchup_to_master {
sub catchup_to_same_pos {
my ( $self, $s1_dbh, $s2_dbh ) = @_;
$self->stop_slave($s1_dbh);
$self->stop_slave($s2_dbh);
my $s1_status = $self->get_slave_status($s1_dbh);
my $s2_status = $self->get_slave_status($s2_dbh);
$self->stop_replica($s1_dbh);
$self->stop_replica($s2_dbh);
my $s1_status = $self->get_replica_status($s1_dbh);
my $s2_status = $self->get_replica_status($s2_dbh);
my $s1_pos = $self->repl_posn($s1_status);
my $s2_pos = $self->repl_posn($s2_status);
if ( $self->pos_cmp($s1_pos, $s2_pos) < 0 ) {
$self->start_slave($s1_dbh, $s2_pos);
$self->start_replica($s1_dbh, $s2_pos);
}
elsif ( $self->pos_cmp($s2_pos, $s1_pos) < 0 ) {
$self->start_slave($s2_dbh, $s1_pos);
$self->start_replica($s2_dbh, $s1_pos);
}
$s1_status = $self->get_slave_status($s1_dbh);
$s2_status = $self->get_slave_status($s2_dbh);
$s1_status = $self->get_replica_status($s1_dbh);
$s2_status = $self->get_replica_status($s2_dbh);
$s1_pos = $self->repl_posn($s1_status);
$s2_pos = $self->repl_posn($s2_status);
if ( $self->slave_is_running($s1_status)
|| $self->slave_is_running($s2_status)
my $vp1 = VersionParser->new($s1_dbh);
my $replica1_name = 'replica';
if ( $vp1 < '8.1' || $vp1->flavor() =~ m/maria/ ) {
$replica1_name = 'slave';
}
my $vp2 = VersionParser->new($s2_dbh);
my $replica2_name = 'replica';
if ( $vp2 < '8.1' || $vp2->flavor() =~ m/maria/ ) {
$replica2_name = 'slave';
}
if ( $self->replica_is_running($s1_status, $replica1_name)
|| $self->replica_is_running($s2_status, $replica2_name)
|| $self->pos_cmp($s1_pos, $s2_pos) != 0)
{
die "The servers aren't both stopped at the same position";
@@ -2866,14 +2939,21 @@ sub catchup_to_same_pos {
}
sub slave_is_running {
my ( $self, $slave_status ) = @_;
return ($slave_status->{slave_sql_running} || 'No') eq 'Yes';
sub replica_is_running {
my ( $self, $replica_status, $replica_name ) = @_;
return ($replica_status->{"${replica_name}_sql_running"} || 'No') eq 'Yes';
}
sub has_slave_updates {
sub has_replica_updates {
my ( $self, $dbh ) = @_;
my $sql = q{SHOW VARIABLES LIKE 'log_slave_updates'};
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
my $sql = qq{SHOW VARIABLES LIKE 'log_${replica_name}_updates'};
PTDEBUG && _d($dbh, $sql);
my ($name, $value) = $dbh->selectrow_array($sql);
return $value && $value =~ m/^(1|ON)$/;
@@ -2887,6 +2967,12 @@ sub repl_posn {
position => $status->{position},
};
}
elsif ( exists $status->{relay_source_log_file} && exists $status->{exec_source_log_pos} ) {
return {
file => $status->{relay_source_log_file},
position => $status->{exec_source_log_pos},
};
}
else {
return {
file => $status->{relay_master_log_file},
@@ -2895,11 +2981,18 @@ sub repl_posn {
}
}
sub get_slave_lag {
sub get_replica_lag {
my ( $self, $dbh ) = @_;
my $stat = $self->get_slave_status($dbh);
return unless $stat; # server is not a slave
return $stat->{seconds_behind_master};
my $vp = VersionParser->new($dbh);
my $source_name = 'source';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
}
my $stat = $self->get_replica_status($dbh);
return unless $stat; # server is not a replica
return $stat->{"seconds_behind_${source_name}"};
}
sub pos_cmp {
@@ -2910,9 +3003,9 @@ sub pos_cmp {
sub short_host {
my ( $self, $dsn ) = @_;
my ($host, $port);
if ( $dsn->{master_host} ) {
$host = $dsn->{master_host};
$port = $dsn->{master_port};
if ( $dsn->{source_host} ) {
$host = $dsn->{source_host};
$port = $dsn->{source_port};
}
else {
$host = $dsn->{h};
@@ -2927,7 +3020,7 @@ sub is_replication_thread {
my $type = lc($args{type} || 'all');
die "Invalid type: $type"
unless $type =~ m/^binlog_dump|slave_io|slave_sql|all$/i;
unless $type =~ m/^binlog_dump|slave_io|slave_sql|replica_io|replica_sql|all$/i;
my $match = 0;
if ( $type =~ m/binlog_dump|all/i ) {
@@ -2936,7 +3029,7 @@ sub is_replication_thread {
}
if ( !$match ) {
if ( ($query->{User} || $query->{user} || '') eq "system user" ) {
PTDEBUG && _d("Slave replication thread");
PTDEBUG && _d("Replica replication thread");
if ( $type ne 'all' ) {
my $state = $query->{State} || $query->{state} || '';
@@ -2945,15 +3038,16 @@ sub is_replication_thread {
$match = 1;
}
else {
my ($slave_sql) = $state =~ m/
my ($replica_sql) = $state =~ m/
^(Waiting\sfor\sthe\snext\sevent
|Reading\sevent\sfrom\sthe\srelay\slog
|Has\sread\sall\srelay\slog;\swaiting
|Making\stemp\sfile
|Waiting\sfor\sslave\smutex\son\sexit)/xi;
|Waiting\sfor\sslave\smutex\son\sexit
|Waiting\sfor\sreplica\smutex\son\sexit)/xi;
$match = $type eq 'slave_sql' && $slave_sql ? 1
: $type eq 'slave_io' && !$slave_sql ? 1
$match = $type eq 'replica_sql' && $replica_sql ? 1
: $type eq 'replica_io' && !$replica_sql ? 1
: 0;
}
}
@@ -2994,9 +3088,15 @@ sub get_replication_filters {
}
my ($dbh) = @args{@required_args};
my $vp = VersionParser->new($dbh);
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$replica_name = 'slave';
}
my %filters = ();
my $status = $self->get_master_status($dbh);
my $status = $self->get_source_status($dbh);
if ( $status ) {
map { $filters{$_} = $status->{$_} }
grep { defined $status->{$_} && $status->{$_} ne '' }
@@ -3006,7 +3106,7 @@ sub get_replication_filters {
);
}
$status = $self->get_slave_status($dbh);
$status = $self->get_replica_status($dbh);
if ( $status ) {
map { $filters{$_} = $status->{$_} }
grep { defined $status->{$_} && $status->{$_} ne '' }
@@ -3019,10 +3119,10 @@ sub get_replication_filters {
replicate_wild_ignore_table
);
my $sql = "SHOW VARIABLES LIKE 'slave_skip_errors'";
my $sql = "SHOW VARIABLES LIKE '${replica_name}_skip_errors'";
PTDEBUG && _d($dbh, $sql);
my $row = $dbh->selectrow_arrayref($sql);
$filters{slave_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
$filters{replica_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
}
return \%filters;
@@ -4007,7 +4107,7 @@ sub main {
DSNParser => $dp,
Quoter => "Quoter",
);
$ms->recurse_to_slaves(
$ms->recurse_to_replicas(
{ dbh => $dbh,
dsn => $master_dsn,
callback => sub {
@@ -4123,12 +4223,20 @@ sub print_node_summary {
return;
}
$dbh->{FetchHashKeyName} = 'NAME_lc';
my $vp = VersionParser->new($dbh);
my $source_name = 'source';
my $replica_name = 'replica';
if ( $vp < '8.1' || $vp->flavor() =~ m/maria/ ) {
$source_name = 'master';
$replica_name = 'slave';
}
# Build the summary line by line, then join and print the lines.
my @lines;
my $vars = $dbh->selectall_hashref("SHOW VARIABLES", 'variable_name');
my $stats = $dbh->selectall_hashref("SHOW STATUS", 'variable_name');
my $ss = $dbh->selectrow_hashref("SHOW SLAVE STATUS");
my $ss = $dbh->selectrow_hashref("SHOW ${replica_name} STATUS");
my $proc = $dbh->selectall_hashref("SHOW PROCESSLIST", 'id');
push @lines, ['Version', $vars->{version}->{value}];
@@ -4163,8 +4271,8 @@ sub print_node_summary {
my $slave_status = '';
if ( $ss ) {
$slave_status = sprintf("%s seconds behind, %s, %s",
defined $ss->{seconds_behind_master} ? $ss->{seconds_behind_master} : '',
$ss->{slave_io_running} eq 'Yes' && $ss->{slave_sql_running} eq 'Yes' ? 'running' : 'not running',
defined $ss->{"seconds_behind_${source_name}"} ? $ss->{"seconds_behind_${source_name}"} : '',
$ss->{"${replica_name}_io_running"} eq 'Yes' && $ss->{"${replica_name}_sql_running"} eq 'Yes' ? 'running' : 'not running',
$ss->{last_errno} ? "error $ss->{last_errno}" : "no errors",
);
}
@@ -4563,6 +4671,12 @@ dsn: user; copy: yes
User for login if not current user.
=item * s
dsn: mysql_ssl; copy: yes
Create SSL connection
=back
=head1 ENVIRONMENT

File diff suppressed because it is too large Load Diff

View File

@@ -1028,7 +1028,11 @@ collect_mysql_data_loop() {
(echo $ts; ps_prepared_statements "$d/prepared_statements.isrunnning") >> "$d/$p-prepared-statements" &
fi
slave_status "$d/$p-slave-status" "${mysql_version}"
local replica_name='replica'
if [ "${mysql_version}" '<' "8.1" ]; then
replica_name="slave"
fi
replica_status "$d/$p-${replica_name}-status" "${mysql_version}"
}
collect_system_data_loop() {
@@ -1312,14 +1316,13 @@ ps_prepared_statements() {
fi
}
slave_status() {
replica_status() {
local outfile=$1
local mysql_version=$2
local sql="SHOW REPLICA STATUS\G"
if [ "${mysql_version}" '<' "8.1" ]; then
local sql="SHOW SLAVE STATUS\G"
else
local sql="SHOW REPLICA STATUS\G"
sql="SHOW SLAVE STATUS\G"
fi
echo -e "\n$sql\n" >> $outfile

View File

@@ -761,7 +761,7 @@ section () {
}
NAME_VAL_LEN=12
name_val () {
name_val() {
printf "%+*s | %s\n" "${NAME_VAL_LEN}" "$1" "$2"
}
@@ -787,7 +787,7 @@ shorten() {
'
}
group_concat () {
group_concat() {
sed -e '{H; $!d;}' -e 'x' -e 's/\n[[:space:]]*\([[:digit:]]*\)[[:space:]]*/, \1x/g' -e 's/[[:space:]][[:space:]]*/ /g' -e 's/, //' "${1}"
}

File diff suppressed because it is too large Load Diff