mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-01 18:25:59 +00:00
1148 lines
39 KiB
Perl
1148 lines
39 KiB
Perl
# This program is copyright 2007-2011 Baron Schwartz, 2011-2012 Percona Ireland Ltd.
|
|
# Feedback and improvements are welcome.
|
|
#
|
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
|
# licenses.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
# ###########################################################################
|
|
# MasterSlave package
|
|
# ###########################################################################
|
|
{
|
|
# Package: MasterSlave
|
|
# MasterSlave handles common tasks related to source-replica setups.
|
|
package MasterSlave;
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use English qw(-no_match_vars);
|
|
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
|
|
|
# Sub: check_recursion_method
|
|
# Check that the arrayref of recursion methods passed in is valid
|
|
sub check_recursion_method {
|
|
my ($methods) = @_;
|
|
if ( @$methods != 1 ) {
|
|
if ( grep({ !m/processlist|hosts/i } @$methods)
|
|
&& $methods->[0] !~ /^dsn=/i )
|
|
{
|
|
die "Invalid combination of recursion methods: "
|
|
. join(", ", map { defined($_) ? $_ : 'undef' } @$methods) . ". "
|
|
. "Only hosts and processlist may be combined.\n"
|
|
}
|
|
}
|
|
else {
|
|
my ($method) = @$methods;
|
|
die "Invalid recursion method: " . ( $method || 'undef' )
|
|
unless $method && $method =~ m/^(?:processlist$|hosts$|none$|cluster$|dsn=)/i;
|
|
}
|
|
}
|
|
|
|
sub new {
|
|
my ( $class, %args ) = @_;
|
|
my @required_args = qw(OptionParser DSNParser Quoter);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my $self = {
|
|
%args,
|
|
replication_thread => {},
|
|
};
|
|
return bless $self, $class;
|
|
}
|
|
|
|
sub get_replicas {
|
|
my ($self, %args) = @_;
|
|
my @required_args = qw(make_cxn);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($make_cxn) = @args{@required_args};
|
|
|
|
my $replicas = [];
|
|
my $dp = $self->{DSNParser};
|
|
my $methods = $self->_resolve_recursion_methods($args{dsn});
|
|
|
|
return $replicas unless @$methods;
|
|
|
|
if ( grep { m/processlist|hosts/i } @$methods ) {
|
|
my @required_args = qw(dbh dsn);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($dbh, $dsn) = @args{@required_args};
|
|
my $o = $self->{OptionParser};
|
|
|
|
$self->recurse_to_replicas(
|
|
{ dbh => $dbh,
|
|
dsn => $dsn,
|
|
replica_user => ( $o->got('replica-user') or $o->got('slave-user') ) ? $o->get('replica-user') : '',
|
|
replica_password => ( $o->got('replica-password') or $o->got('slave-password') ) ? $o->get('replica-password') : '',
|
|
replicas => $args{replicas},
|
|
callback => sub {
|
|
my ( $dsn, $dbh, $level, $parent ) = @_;
|
|
return unless $level;
|
|
PTDEBUG && _d('Found replica:', $dp->as_string($dsn));
|
|
my $replica_dsn = $dsn;
|
|
if ( $o->got('replica-user') or $o->got('slave-user') ) {
|
|
$replica_dsn->{u} = $o->get('replica-user');
|
|
PTDEBUG && _d("Using replica user ".$o->get('replica-user')." on ".$replica_dsn->{h}.":".$replica_dsn->{P});
|
|
}
|
|
if ( $o->got('replica-password') or $o->got('slave-password') ) {
|
|
$replica_dsn->{p} = $o->get('replica-password');
|
|
PTDEBUG && _d("Replica password set");
|
|
}
|
|
push @$replicas, $make_cxn->(dsn => $replica_dsn, dbh => $dbh, parent => $parent);
|
|
return;
|
|
},
|
|
wait_no_die => $args{'wait_no_die'},
|
|
}
|
|
);
|
|
} elsif ( $methods->[0] =~ m/^dsn=/i ) {
|
|
(my $dsn_table_dsn = join ",", @$methods) =~ s/^dsn=//i;
|
|
$replicas = $self->get_cxn_from_dsn_table(
|
|
%args,
|
|
dsn_table_dsn => $dsn_table_dsn,
|
|
wait_no_die => $args{'wait_no_die'},
|
|
);
|
|
}
|
|
elsif ( $methods->[0] =~ m/none/i ) {
|
|
PTDEBUG && _d('Not getting to replicas');
|
|
}
|
|
else {
|
|
die "Unexpected recursion methods: @$methods";
|
|
}
|
|
|
|
return $replicas;
|
|
}
|
|
|
|
sub _resolve_recursion_methods {
|
|
my ($self, $dsn) = @_;
|
|
my $o = $self->{OptionParser};
|
|
if ( $o->got('recursion-method') ) {
|
|
return $o->get('recursion-method');
|
|
}
|
|
elsif ( $dsn && ($dsn->{P} || 3306) != 3306 ) {
|
|
# Special case: hosts is best when port is non-standard.
|
|
PTDEBUG && _d('Port number is non-standard; using only hosts method');
|
|
return [qw(hosts)];
|
|
}
|
|
else {
|
|
# Use the option's default.
|
|
return $o->get('recursion-method');
|
|
}
|
|
}
|
|
|
|
# Sub: recurse_to_replicas
|
|
# Descend to replicas by examining SHOW REPLICAS.
|
|
# The callback gets the replica's DSN, dbh, parent, and the recursion level
|
|
# as args. The recursion is tail recursion.
|
|
#
|
|
# Parameters:
|
|
# $args - Hashref of arguments
|
|
# $level - Recursion level
|
|
#
|
|
# Required Arguments:
|
|
# dsn - The DSN to connect to; if no dbh arg, connect using this.
|
|
# recurse - How many levels to recurse. 0 = none, undef = infinite.
|
|
# callback - Code to execute after finding a new replica.
|
|
# dsn_parser - <DSNParser> object
|
|
#
|
|
# Optional Arguments:
|
|
# dbh - dbh
|
|
# skip_callback - Execute with replicas that will be skipped.
|
|
# method - Whether to prefer HOSTS over PROCESSLIST
|
|
# parent - The DSN from which this call descended.
|
|
sub recurse_to_replicas {
|
|
my ( $self, $args, $level ) = @_;
|
|
$level ||= 0;
|
|
my $dp = $self->{DSNParser};
|
|
my $recurse = $args->{recurse} || $self->{OptionParser}->get('recurse');
|
|
my $dsn = $args->{dsn};
|
|
my $replica_user = $args->{replica_user} || '';
|
|
my $replica_password = $args->{replica_password} || '';
|
|
|
|
my $methods = $self->_resolve_recursion_methods($dsn);
|
|
PTDEBUG && _d('Recursion methods:', @$methods);
|
|
if ( lc($methods->[0]) eq 'none' ) {
|
|
PTDEBUG && _d('Not recursing to replicas');
|
|
return;
|
|
}
|
|
|
|
my $replica_dsn = $dsn;
|
|
if ($replica_user) {
|
|
$replica_dsn->{u} = $replica_user;
|
|
PTDEBUG && _d("Using replica user $replica_user on "
|
|
. $replica_dsn->{h} . ":" . ( $replica_dsn->{P} ? $replica_dsn->{P} : ""));
|
|
}
|
|
if ($replica_password) {
|
|
$replica_dsn->{p} = $replica_password;
|
|
PTDEBUG && _d("Replica password set");
|
|
}
|
|
|
|
my $dbh = $args->{dbh};
|
|
|
|
my $get_dbh = sub {
|
|
eval {
|
|
$dbh = $dp->get_dbh(
|
|
$dp->get_cxn_params($replica_dsn), { AutoCommit => 1 }
|
|
);
|
|
PTDEBUG && _d('Connected to', $dp->as_string($replica_dsn));
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
print STDERR "Cannot connect to ", $dp->as_string($replica_dsn), ": ", $EVAL_ERROR, "\n"
|
|
or die "Cannot print: $OS_ERROR";
|
|
return;
|
|
}
|
|
};
|
|
|
|
DBH: {
|
|
if ( !defined $dbh ) {
|
|
foreach my $known_replica ( @{$args->{replicas}} ) {
|
|
if ($known_replica->{dsn}->{h} eq $replica_dsn->{h} and
|
|
$known_replica->{dsn}->{P} eq $replica_dsn->{P} ) {
|
|
$dbh = $known_replica->{dbh};
|
|
last DBH;
|
|
}
|
|
}
|
|
$get_dbh->();
|
|
}
|
|
}
|
|
|
|
my $sql = 'SELECT @@SERVER_ID';
|
|
PTDEBUG && _d($sql);
|
|
my $id = undef;
|
|
do {
|
|
eval {
|
|
($id) = $dbh->selectrow_array($sql);
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
if ( $args->{wait_no_die} ) {
|
|
print STDERR "Error getting server id: ", $EVAL_ERROR,
|
|
"\nRetrying query for server ", $replica_dsn->{h}, ":", $replica_dsn->{P}, "\n";
|
|
sleep 1;
|
|
$dbh->disconnect();
|
|
$get_dbh->();
|
|
} else {
|
|
die $EVAL_ERROR;
|
|
}
|
|
}
|
|
} until (defined $id);
|
|
PTDEBUG && _d('Working on server ID', $id);
|
|
my $source_thinks_i_am = $dsn->{server_id};
|
|
if ( !defined $id
|
|
|| ( defined $source_thinks_i_am && $source_thinks_i_am != $id )
|
|
|| $args->{server_ids_seen}->{$id}++
|
|
) {
|
|
PTDEBUG && _d('Server ID seen, or not what source said');
|
|
if ( $args->{skip_callback} ) {
|
|
$args->{skip_callback}->($dsn, $dbh, $level, $args->{parent});
|
|
}
|
|
return;
|
|
}
|
|
|
|
$args->{callback}->($dsn, $dbh, $level, $args->{parent});
|
|
|
|
if ( !defined $recurse || $level < $recurse ) {
|
|
|
|
my @replicas =
|
|
grep { !$_->{source_id} || $_->{source_id} == $id } # Only my replicas.
|
|
$self->find_replica_hosts($dp, $dbh, $dsn, $methods);
|
|
|
|
foreach my $replica ( @replicas ) {
|
|
PTDEBUG && _d('Recursing from',
|
|
$dp->as_string($dsn), 'to', $dp->as_string($replica));
|
|
$self->recurse_to_replicas(
|
|
{ %$args, dsn => $replica, dbh => undef, parent => $dsn, replica_user => $replica_user, $replica_password => $replica_password }, $level + 1 );
|
|
}
|
|
}
|
|
}
|
|
|
|
# Finds replica hosts by trying different methods. The default preferred method
|
|
# is trying SHOW PROCESSLIST (processlist) and guessing which ones are replicas,
|
|
# and if that doesn't reveal anything, then try SHOW REPLICA STATUS (hosts).
|
|
# One exception is if the port is non-standard (3306), indicating that the port
|
|
# from SHOW REPLICAS may be important. Then only the hosts methods is used.
|
|
#
|
|
# Returns a list of DSN hashes. Optional extra keys in the DSN hash are
|
|
# source_id and server_id. Also, the 'source' key is either 'processlist' or
|
|
# 'hosts'.
|
|
#
|
|
# If a method is given, it becomes the preferred (first tried) method.
|
|
# Searching stops as soon as a method finds replicas.
|
|
sub find_replica_hosts {
|
|
my ( $self, $dsn_parser, $dbh, $dsn, $methods ) = @_;
|
|
|
|
PTDEBUG && _d('Looking for replicas on', $dsn_parser->as_string($dsn),
|
|
'using methods', @$methods);
|
|
|
|
my @replicas;
|
|
METHOD:
|
|
foreach my $method ( @$methods ) {
|
|
my $find_replicas = "_find_replicas_by_$method";
|
|
PTDEBUG && _d('Finding replicas with', $find_replicas);
|
|
@replicas = $self->$find_replicas($dsn_parser, $dbh, $dsn);
|
|
last METHOD if @replicas;
|
|
}
|
|
|
|
PTDEBUG && _d('Found', scalar(@replicas), 'replicas');
|
|
return @replicas;
|
|
}
|
|
|
|
sub _find_replicas_by_processlist {
|
|
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
|
|
my @connected_replicas = $self->get_connected_replicas($dbh);
|
|
my @replicas = $self->_process_replicas_list($dsn_parser, $dsn, \@connected_replicas);
|
|
return @replicas;
|
|
}
|
|
|
|
sub _process_replicas_list {
|
|
my ($self, $dsn_parser, $dsn, $connected_replicas) = @_;
|
|
my @replicas = map {
|
|
my $replica = $dsn_parser->parse("h=$_", $dsn);
|
|
$replica->{source} = 'processlist';
|
|
$replica;
|
|
}
|
|
grep { $_ }
|
|
map {
|
|
my ( $host ) = $_->{host} =~ m/^(.*):\d+$/;
|
|
if ( $host eq 'localhost' ) {
|
|
$host = '127.0.0.1'; # Replication never uses sockets.
|
|
}
|
|
if ($host =~ m/::/) {
|
|
$host = '['.$host.']';
|
|
}
|
|
$host;
|
|
} @$connected_replicas;
|
|
|
|
return @replicas;
|
|
}
|
|
|
|
# SHOW REPLICAS is significantly less reliable.
|
|
# Machines tend to share the host list around with every machine in the
|
|
# replication hierarchy, but they don't update each other when machines
|
|
# disconnect or change to use a different source or something. So there is
|
|
# lots of cruft in SHOW REPLICAS.
|
|
sub _find_replicas_by_hosts {
|
|
my ( $self, $dsn_parser, $dbh, $dsn ) = @_;
|
|
|
|
my @replicas;
|
|
|
|
my $vp = VersionParser->new($dbh);
|
|
my $sql = 'SHOW REPLICAS';
|
|
my $source_name = 'source';
|
|
if ( $vp lt '8.1' || $vp->flavor() =~ m/maria/i ) {
|
|
$sql = 'SHOW SLAVE HOSTS';
|
|
$source_name='master';
|
|
}
|
|
|
|
PTDEBUG && _d($dbh, $sql);
|
|
@replicas = @{$dbh->selectall_arrayref($sql, { Slice => {} })};
|
|
|
|
# Convert SHOW REPLICAS into DSN hashes.
|
|
if ( @replicas ) {
|
|
PTDEBUG && _d('Found some SHOW REPLICAS info');
|
|
@replicas = map {
|
|
my %hash;
|
|
@hash{ map { lc $_ } keys %$_ } = values %$_;
|
|
my $spec = "h=$hash{host},P=$hash{port}"
|
|
. ( $hash{user} ? ",u=$hash{user}" : '')
|
|
. ( $hash{password} ? ",p=$hash{password}" : '');
|
|
my $dsn = $dsn_parser->parse($spec, $dsn);
|
|
$dsn->{server_id} = $hash{server_id};
|
|
$dsn->{source_id} = $hash{"${source_name}_id"};
|
|
$dsn->{source} = 'hosts';
|
|
$dsn;
|
|
} @replicas;
|
|
}
|
|
|
|
return @replicas;
|
|
}
|
|
|
|
# Returns PROCESSLIST entries of connected replicas, normalized to lowercase
|
|
# column names.
|
|
sub get_connected_replicas {
|
|
my ( $self, $dbh ) = @_;
|
|
|
|
# Check for the PROCESS privilege.
|
|
my $show = "SHOW GRANTS FOR ";
|
|
my $user = 'CURRENT_USER()';
|
|
my $sql = $show . $user;
|
|
PTDEBUG && _d($dbh, $sql);
|
|
|
|
my $proc;
|
|
eval {
|
|
$proc = grep {
|
|
m/ALL PRIVILEGES.*?\*\.\*|PROCESS/
|
|
} @{$dbh->selectcol_arrayref($sql)};
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
|
|
if ( $EVAL_ERROR =~ m/no such grant defined for user/ ) {
|
|
# Try again without a host.
|
|
PTDEBUG && _d('Retrying SHOW GRANTS without host; error:',
|
|
$EVAL_ERROR);
|
|
($user) = split('@', $user);
|
|
$sql = $show . $user;
|
|
PTDEBUG && _d($sql);
|
|
eval {
|
|
$proc = grep {
|
|
m/ALL PRIVILEGES.*?\*\.\*|PROCESS/
|
|
} @{$dbh->selectcol_arrayref($sql)};
|
|
};
|
|
}
|
|
|
|
# The 2nd try above might have cleared $EVAL_ERROR.
|
|
# If not, die now.
|
|
die "Failed to $sql: $EVAL_ERROR" if $EVAL_ERROR;
|
|
}
|
|
if ( !$proc ) {
|
|
die "You do not have the PROCESS privilege";
|
|
}
|
|
|
|
$sql = 'SHOW FULL PROCESSLIST';
|
|
PTDEBUG && _d($dbh, $sql);
|
|
# It's probably a replica if it's doing a binlog dump.
|
|
grep { $_->{command} =~ m/Binlog Dump/i }
|
|
map { # Lowercase the column names
|
|
my %hash;
|
|
@hash{ map { lc $_ } keys %$_ } = values %$_;
|
|
\%hash;
|
|
}
|
|
@{$dbh->selectall_arrayref($sql, { Slice => {} })};
|
|
}
|
|
|
|
# Verifies that $source is really the source of $replica. This is not an exact
|
|
# science, but there is a decent chance of catching some obvious cases when it
|
|
# is not the source. If not the source, it dies; otherwise returns true.
|
|
sub is_source_of {
|
|
my ( $self, $source, $replica ) = @_;
|
|
|
|
my $source_name = get_source_name($replica);
|
|
my $source_port = "${source_name}_port";
|
|
|
|
my $source_status = $self->get_source_status($source)
|
|
or die "The server specified as a source is not a source";
|
|
my $replica_status = $self->get_replica_status($replica)
|
|
or die "The server specified as a replica is not a replica";
|
|
my @connected = $self->get_connected_replicas($source)
|
|
or die "The server specified as a source has no connected replicas";
|
|
my (undef, $port) = $source->selectrow_array("SHOW VARIABLES LIKE 'port'");
|
|
|
|
if ( $port != $replica_status->{$source_port} ) {
|
|
die "The replica is connected to $replica_status->{$source_port} "
|
|
. "but the source's port is $port";
|
|
}
|
|
|
|
if ( !grep { $replica_status->{"${source_name}_user"} eq $_->{user} } @connected ) {
|
|
die "I don't see any replica I/O thread connected with user "
|
|
. $replica_status->{"${source_name}_user"};
|
|
}
|
|
|
|
if ( ( ($replica_status->{replica_io_state} || '')
|
|
eq 'Waiting for source to send event' )
|
|
|| ( ($replica_status->{replica_io_state} || '')
|
|
eq 'Waiting for master to send event' ) )
|
|
{
|
|
# The replica thinks its I/O thread is caught up to the source. Let's
|
|
# compare and make sure the source and replica are reasonably close to each
|
|
# other. Note that this is one of the few places where I check the I/O
|
|
# thread positions instead of the SQL thread positions!
|
|
# Source_Log_File/Read_Source_Log_Pos is the I/O thread's position on the
|
|
# source.
|
|
my ( $source_log_name, $source_log_num )
|
|
= $source_status->{file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
|
|
my ( $replica_log_name, $replica_log_num )
|
|
= $replica_status->{source_log_file} =~ m/^(.*?)\.0*([1-9][0-9]*)$/;
|
|
if ( $source_log_name ne $replica_log_name
|
|
|| abs($source_log_num - $replica_log_num) > 1 )
|
|
{
|
|
die "The replica thinks it is reading from "
|
|
. "$replica_status->{source_log_file}, but the "
|
|
. "source is writing to $source_status->{file}";
|
|
}
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
# Figures out how to connect to the source, by examining SHOW REPLICA STATUS. But
|
|
# does NOT use the value from Source_User for the username, because typically we
|
|
# want to perform operations as the username that was specified (usually to the
|
|
# program's --user option, or in a DSN), rather than as the replication user,
|
|
# which is often restricted.
|
|
sub get_source_dsn {
|
|
my ( $self, $dbh, $dsn, $dsn_parser ) = @_;
|
|
|
|
my $vp = VersionParser->new($dbh);
|
|
my $source_host = 'source_host';
|
|
my $source_port = 'source_port';
|
|
if ( $vp lt '8.1' || $vp->flavor() =~ m/maria/i ) {
|
|
$source_host = 'master_host';
|
|
$source_port = 'master_port';
|
|
}
|
|
|
|
my $source = $self->get_replica_status($dbh) or return undef;
|
|
my $spec = "h=$source->{${source_host}},P=$source->{${source_port}}";
|
|
return $dsn_parser->parse($spec, $dsn);
|
|
}
|
|
|
|
# Gets SHOW REPLICA STATUS, with column names all lowercased, as a hashref.
|
|
sub get_replica_status {
|
|
my ( $self, $dbh ) = @_;
|
|
|
|
my $replica_name = get_replica_name($dbh);
|
|
|
|
if ( !$self->{not_a_replica}->{$dbh} ) {
|
|
my $sth = $self->{sths}->{$dbh}->{REPLICA_STATUS}
|
|
||= $dbh->prepare("SHOW ${replica_name} STATUS");
|
|
PTDEBUG && _d($dbh, "SHOW ${replica_name} STATUS");
|
|
$sth->execute();
|
|
my ($sss_rows) = $sth->fetchall_arrayref({}); # Show Replica Status rows
|
|
|
|
# If SHOW REPLICA STATUS returns more than one row it means that this replica is connected to more
|
|
# than one source using replication channels.
|
|
# If we have a channel name as a parameter, we need to select the correct row and return it.
|
|
# If we don't have a channel name as a parameter, there is no way to know what the correct source is so,
|
|
# return an error.
|
|
my $ss;
|
|
if ( $sss_rows && @$sss_rows ) {
|
|
if (scalar @$sss_rows > 1) {
|
|
if (!$self->{channel}) {
|
|
die 'This server returned more than one row for SHOW REPLICA STATUS but "channel" was not specified on the command line';
|
|
}
|
|
my $replica_use_channels;
|
|
for my $row (@$sss_rows) {
|
|
$row = { map { lc($_) => $row->{$_} } keys %$row }; # lowercase the keys
|
|
if ($row->{channel_name}) {
|
|
$replica_use_channels = 1;
|
|
}
|
|
if ($row->{channel_name} eq $self->{channel}) {
|
|
$ss = $row;
|
|
last;
|
|
}
|
|
}
|
|
if (!$ss && $replica_use_channels) {
|
|
die 'This server is using replication channels but "channel" was not specified on the command line';
|
|
}
|
|
} else {
|
|
if ($sss_rows->[0]->{channel_name} && $sss_rows->[0]->{channel_name} ne $self->{channel}) {
|
|
die 'This server is using replication channels but "channel" was not specified on the command line';
|
|
} else {
|
|
$ss = $sss_rows->[0];
|
|
}
|
|
}
|
|
|
|
if ( $ss && %$ss ) {
|
|
$ss = { map { lc($_) => $ss->{$_} } keys %$ss }; # lowercase the keys
|
|
return $ss;
|
|
}
|
|
if (!$ss && $self->{channel}) {
|
|
die "Specified channel name is invalid";
|
|
}
|
|
}
|
|
|
|
PTDEBUG && _d('This server returns nothing for SHOW REPLICA STATUS');
|
|
$self->{not_a_replica}->{$dbh}++;
|
|
}
|
|
}
|
|
|
|
# Gets SHOW BINARY LOGS, with column names all lowercased, as a hashref.
|
|
sub get_source_status {
|
|
my ( $self, $dbh ) = @_;
|
|
|
|
if ( $self->{not_a_source}->{$dbh} ) {
|
|
PTDEBUG && _d('Server on dbh', $dbh, 'is not a source');
|
|
return;
|
|
}
|
|
|
|
my $vp = VersionParser->new($dbh);
|
|
my $source_name = 'binary log';
|
|
if ( $vp lt '8.1' || $vp->flavor() =~ m/maria/i ) {
|
|
$source_name = 'master';
|
|
}
|
|
|
|
my $sth;
|
|
if ( $self->{sths}->{$dbh} && $dbh && $self->{sths}->{$dbh} == $dbh ) {
|
|
$sth = $self->{sths}->{$dbh}->{SOURCE_STATUS}
|
|
||= $dbh->prepare("SHOW ${source_name} STATUS");
|
|
}
|
|
else {
|
|
$sth = $dbh->prepare("SHOW ${source_name} STATUS");
|
|
}
|
|
PTDEBUG && _d($dbh, "SHOW ${source_name} STATUS");
|
|
$sth->execute();
|
|
my ($ms) = @{$sth->fetchall_arrayref({})};
|
|
PTDEBUG && _d(
|
|
$ms ? map { "$_=" . (defined $ms->{$_} ? $ms->{$_} : '') } keys %$ms
|
|
: '');
|
|
|
|
if ( !$ms || scalar keys %$ms < 2 ) {
|
|
PTDEBUG && _d('Server on dbh', $dbh, 'does not seem to be a source');
|
|
$self->{not_a_source}->{$dbh}++;
|
|
}
|
|
|
|
return { map { lc($_) => $ms->{$_} } keys %$ms }; # lowercase the keys
|
|
}
|
|
|
|
# Sub: wait_for_source
|
|
# Execute SOURCE_POS_WAIT() to make replica wait for its source.
|
|
#
|
|
# Parameters:
|
|
# %args - Arguments
|
|
#
|
|
# Required Arguments:
|
|
# * source_status - Hashref returned by <get_source_status()>
|
|
# * replica_dbh - dbh for replica host
|
|
#
|
|
# Optional Arguments:
|
|
# * timeout - Wait time in seconds (default 60)
|
|
#
|
|
# Returns:
|
|
# Hashref with result of waiting, like:
|
|
# (start code)
|
|
# {
|
|
# result => the result returned by SOURCE_POS_WAIT: -1, undef, 0+
|
|
# waited => the number of seconds waited, might be zero
|
|
# }
|
|
# (end code)
|
|
sub wait_for_source {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(source_status replica_dbh);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($source_status, $replica_dbh) = @args{@required_args};
|
|
my $timeout = $args{timeout} || 60;
|
|
|
|
my $result;
|
|
my $waited;
|
|
if ( $source_status ) {
|
|
my $replica_status;
|
|
eval {
|
|
$replica_status = $self->get_replica_status($replica_dbh);
|
|
};
|
|
if ($EVAL_ERROR) {
|
|
return {
|
|
result => undef,
|
|
waited => 0,
|
|
error =>'Wait for source: this is a multi-source replica but "channel" was not specified on the command line',
|
|
};
|
|
}
|
|
my $source_name = get_source_name($replica_dbh);
|
|
my $vp = VersionParser->new($replica_dbh);
|
|
my $channel_sql = $vp > '5.6' && $self->{channel} ? ", '$self->{channel}'" : '';
|
|
my $sql = "SELECT ${source_name}_POS_WAIT('$source_status->{file}', $source_status->{position}, $timeout $channel_sql)";
|
|
PTDEBUG && _d($replica_dbh, $sql);
|
|
my $start = time;
|
|
($result) = $replica_dbh->selectrow_array($sql);
|
|
|
|
# If SOURCE_POS_WAIT() returned NULL and we waited at least 1s
|
|
# and the time we waited is less than the timeout then this is
|
|
# a strong indication that the replica was stopped while we were
|
|
# waiting.
|
|
$waited = time - $start;
|
|
|
|
PTDEBUG && _d('Result of waiting:', $result);
|
|
PTDEBUG && _d("Waited", $waited, "seconds");
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Not waiting: this server is not a source');
|
|
}
|
|
|
|
return {
|
|
result => $result,
|
|
waited => $waited,
|
|
};
|
|
}
|
|
|
|
# Executes STOP REPLICA.
|
|
sub stop_replica {
|
|
my ( $self, $dbh ) = @_;
|
|
my $replica_name = get_replica_name($dbh);
|
|
|
|
my $sth = $self->{sths}->{$dbh}->{STOP_REPLICA}
|
|
||= $dbh->prepare("STOP ${replica_name}");
|
|
PTDEBUG && _d($dbh, $sth->{Statement});
|
|
$sth->execute();
|
|
}
|
|
|
|
# Executes START REPLICA, optionally with UNTIL.
|
|
sub start_replica {
|
|
my ( $self, $dbh, $pos ) = @_;
|
|
|
|
my $source_name = get_source_name($dbh);
|
|
my $replica_name = get_replica_name($dbh);
|
|
|
|
if ( $pos ) {
|
|
# Just like with CHANGE REPLICATION SOURCE TO, you can't quote the position.
|
|
my $sql = "START ${replica_name} UNTIL ${source_name}_LOG_FILE='$pos->{file}', "
|
|
. "${source_name}_LOG_POS=$pos->{position}";
|
|
PTDEBUG && _d($dbh, $sql);
|
|
$dbh->do($sql);
|
|
}
|
|
else {
|
|
my $sth = $self->{sths}->{$dbh}->{START_REPLICA}
|
|
||= $dbh->prepare("START ${replica_name}");
|
|
PTDEBUG && _d($dbh, $sth->{Statement});
|
|
$sth->execute();
|
|
}
|
|
}
|
|
|
|
# Waits for the replica to catch up to its source, using START REPLICA UNTIL. When
|
|
# complete, the replica is caught up to the source, and the replica process is
|
|
# stopped on both servers.
|
|
sub catchup_to_source {
|
|
my ( $self, $replica, $source, $timeout ) = @_;
|
|
$self->stop_replica($source);
|
|
$self->stop_replica($replica);
|
|
my $replica_status = $self->get_replica_status($replica);
|
|
my $replica_pos = $self->repl_posn($replica_status);
|
|
my $source_status = $self->get_source_status($source);
|
|
my $source_pos = $self->repl_posn($source_status);
|
|
PTDEBUG && _d('Source position:', $self->pos_to_string($source_pos),
|
|
'Replica position:', $self->pos_to_string($replica_pos));
|
|
|
|
my $result;
|
|
if ( $self->pos_cmp($replica_pos, $source_pos) < 0 ) {
|
|
PTDEBUG && _d('Waiting for replica to catch up to source');
|
|
$self->start_replica($replica, $source_pos);
|
|
|
|
# The replica may catch up instantly and stop, in which case
|
|
# SOURCE_POS_WAIT will return NULL and $result->{result} will be undef.
|
|
# We must catch this; if it returns NULL, then we check that
|
|
# its position is as desired.
|
|
# TODO: what if source_pos_wait times out and $result == -1? retry?
|
|
$result = $self->wait_for_source(
|
|
source_status => $source_status,
|
|
replica_dbh => $replica,
|
|
timeout => $timeout,
|
|
source_status => $source_status
|
|
);
|
|
if ($result->{error}) {
|
|
die $result->{error};
|
|
}
|
|
if ( !defined $result->{result} ) {
|
|
$replica_status = $self->get_replica_status($replica);
|
|
|
|
my $replica_name = get_replica_name($replica);
|
|
|
|
if ( !$self->replica_is_running($replica_status, $replica_name) ) {
|
|
PTDEBUG && _d('Source position:',
|
|
$self->pos_to_string($source_pos),
|
|
'Replica position:', $self->pos_to_string($replica_pos));
|
|
$replica_pos = $self->repl_posn($replica_status);
|
|
if ( $self->pos_cmp($replica_pos, $source_pos) != 0 ) {
|
|
die "SOURCE_POS_WAIT() returned NULL but replica has not "
|
|
. "caught up to source";
|
|
}
|
|
PTDEBUG && _d('Replica is caught up to source and stopped');
|
|
}
|
|
else {
|
|
die "Replica has not caught up to source and it is still running";
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
PTDEBUG && _d("Replica is already caught up to source");
|
|
}
|
|
|
|
return $result;
|
|
}
|
|
|
|
# Makes one server catch up to the other in replication. When complete, both
|
|
# servers are stopped and at the same position.
|
|
sub catchup_to_same_pos {
|
|
my ( $self, $s1_dbh, $s2_dbh ) = @_;
|
|
$self->stop_replica($s1_dbh);
|
|
$self->stop_replica($s2_dbh);
|
|
my $s1_status = $self->get_replica_status($s1_dbh);
|
|
my $s2_status = $self->get_replica_status($s2_dbh);
|
|
my $s1_pos = $self->repl_posn($s1_status);
|
|
my $s2_pos = $self->repl_posn($s2_status);
|
|
if ( $self->pos_cmp($s1_pos, $s2_pos) < 0 ) {
|
|
$self->start_replica($s1_dbh, $s2_pos);
|
|
}
|
|
elsif ( $self->pos_cmp($s2_pos, $s1_pos) < 0 ) {
|
|
$self->start_replica($s2_dbh, $s1_pos);
|
|
}
|
|
|
|
# Re-fetch the replication statuses and positions.
|
|
$s1_status = $self->get_replica_status($s1_dbh);
|
|
$s2_status = $self->get_replica_status($s2_dbh);
|
|
$s1_pos = $self->repl_posn($s1_status);
|
|
$s2_pos = $self->repl_posn($s2_status);
|
|
|
|
my $replica1_name = get_replica_name($s1_dbh);
|
|
|
|
my $replica2_name = get_replica_name($s2_dbh);
|
|
|
|
# Verify that they are both stopped and are at the same position.
|
|
if ( $self->replica_is_running($s1_status, $replica1_name)
|
|
|| $self->replica_is_running($s2_status, $replica2_name)
|
|
|| $self->pos_cmp($s1_pos, $s2_pos) != 0)
|
|
{
|
|
die "The servers aren't both stopped at the same position";
|
|
}
|
|
|
|
}
|
|
|
|
# Returns true if the replica is running.
|
|
sub replica_is_running {
|
|
my ( $self, $replica_status, $replica_name ) = @_;
|
|
return ($replica_status->{"${replica_name}_sql_running"} || 'No') eq 'Yes';
|
|
}
|
|
|
|
# Returns true if the server's log_replica_updates option is enabled.
|
|
sub has_replica_updates {
|
|
my ( $self, $dbh ) = @_;
|
|
|
|
my $replica_name = get_replica_name($dbh);
|
|
|
|
my $sql = qq{SHOW VARIABLES LIKE 'log_${replica_name}_updates'};
|
|
PTDEBUG && _d($dbh, $sql);
|
|
my ($name, $value) = $dbh->selectrow_array($sql);
|
|
return $value && $value =~ m/^(1|ON)$/;
|
|
}
|
|
|
|
# Extracts the replication position out of either SHOW REPLICATION SOURCE STATUS or SHOW
|
|
# REPLICA STATUS, and returns it as a hashref { file, position }
|
|
sub repl_posn {
|
|
my ( $self, $status ) = @_;
|
|
if ( exists $status->{file} && exists $status->{position} ) {
|
|
# It's the output of SHOW BINARY LOG STATUS
|
|
return {
|
|
file => $status->{file},
|
|
position => $status->{position},
|
|
};
|
|
}
|
|
elsif ( exists $status->{relay_source_log_file} && exists $status->{exec_source_log_pos} ) {
|
|
# We are on MySQL 8.0.22+
|
|
return {
|
|
file => $status->{relay_source_log_file},
|
|
position => $status->{exec_source_log_pos},
|
|
};
|
|
}
|
|
else {
|
|
return {
|
|
file => $status->{relay_master_log_file},
|
|
position => $status->{exec_master_log_pos},
|
|
};
|
|
}
|
|
}
|
|
|
|
# Gets the replica's lag. TODO: permit using a heartbeat table.
|
|
sub get_replica_lag {
|
|
my ( $self, $dbh ) = @_;
|
|
|
|
my $source_name = get_source_name($dbh);
|
|
|
|
my $stat = $self->get_replica_status($dbh);
|
|
return unless $stat; # server is not a replica
|
|
return $stat->{"seconds_behind_${source_name}"};
|
|
}
|
|
|
|
# Compares two replication positions and returns -1, 0, or 1 just as the cmp
|
|
# operator does.
|
|
sub pos_cmp {
|
|
my ( $self, $a, $b ) = @_;
|
|
return $self->pos_to_string($a) cmp $self->pos_to_string($b);
|
|
}
|
|
|
|
# Sub: short_host
|
|
# Simplify a hostname as much as possible. For purposes of replication, a
|
|
# hostname is really just the combination of hostname and port, since
|
|
# replication always uses TCP connections (it does not work via sockets). If
|
|
# the port is the default 3306, it is omitted. As a convenience, this sub
|
|
# accepts either SHOW REPLICA STATUS or a DSN.
|
|
#
|
|
# Parameters:
|
|
# $dsn - DSN hashref
|
|
#
|
|
# Returns:
|
|
# Short hostname string
|
|
sub short_host {
|
|
my ( $self, $dsn ) = @_;
|
|
my ($host, $port);
|
|
if ( $dsn->{source_host} ) {
|
|
$host = $dsn->{source_host};
|
|
$port = $dsn->{source_port};
|
|
}
|
|
else {
|
|
$host = $dsn->{h};
|
|
$port = $dsn->{P};
|
|
}
|
|
return ($host || '[default]') . ( ($port || 3306) == 3306 ? '' : ":$port" );
|
|
}
|
|
|
|
# Sub: is_replication_thread
|
|
# Determine if a processlist item is a replication thread.
|
|
#
|
|
# Parameters:
|
|
# $query - Hashref of a processlist item
|
|
# %args - Arguments
|
|
#
|
|
# Arguments:
|
|
# type - Which kind of repl thread to match:
|
|
# all, binlog_dump (source), replica_io, or replica_sql
|
|
# (default: all)
|
|
# check_known_ids - Check known replication thread IDs (default: yes)
|
|
#
|
|
# Returns:
|
|
# True if the proclist item is the given type of replication thread.
|
|
sub is_replication_thread {
|
|
my ( $self, $query, %args ) = @_;
|
|
return unless $query;
|
|
|
|
my $type = lc($args{type} || 'all');
|
|
die "Invalid type: $type"
|
|
unless $type =~ m/^binlog_dump|slave_io|slave_sql|replica_io|replica_sql|all$/i;
|
|
|
|
my $match = 0;
|
|
if ( $type =~ m/binlog_dump|all/i ) {
|
|
$match = 1
|
|
if ($query->{Command} || $query->{command} || '') eq "Binlog Dump";
|
|
}
|
|
if ( !$match ) {
|
|
# On a replica, there are two threads. Both have user="system user".
|
|
if ( ($query->{User} || $query->{user} || '') eq "system user" ) {
|
|
PTDEBUG && _d("Replica replication thread");
|
|
if ( $type ne 'all' ) {
|
|
# Match a particular replica thread.
|
|
my $state = $query->{State} || $query->{state} || '';
|
|
|
|
if ( $state =~ m/^init|end$/ ) {
|
|
# http://code.google.com/p/maatkit/issues/detail?id=1121
|
|
PTDEBUG && _d("Special state:", $state);
|
|
$match = 1;
|
|
}
|
|
else {
|
|
# These patterns are abbreviated because if the first few words
|
|
# match chances are very high it's the full replica thd state.
|
|
my ($replica_sql) = $state =~ m/
|
|
^(Waiting\sfor\sthe\snext\sevent
|
|
|Reading\sevent\sfrom\sthe\srelay\slog
|
|
|Has\sread\sall\srelay\slog;\swaiting
|
|
|Making\stemp\sfile
|
|
|Waiting\sfor\sslave\smutex\son\sexit
|
|
|Waiting\sfor\sreplica\smutex\son\sexit)/xi;
|
|
|
|
# Type is either "replica_sql" or "replica_io". The second line
|
|
# implies that if this isn't the sql thread then it must be
|
|
# the io thread, so match is true if we were supposed to match
|
|
# the io thread.
|
|
$match = $type eq 'replica_sql' && $replica_sql ? 1
|
|
: $type eq 'replica_io' && !$replica_sql ? 1
|
|
: 0;
|
|
}
|
|
}
|
|
else {
|
|
# Type is "all" and it's not a source (binlog_dump) thread,
|
|
# else we wouldn't have gotten here. It's either of the 2
|
|
# replica threads and we don't care which.
|
|
$match = 1;
|
|
}
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Not system user');
|
|
}
|
|
|
|
# MySQL loves to trick us. Sometimes a replica replication thread will
|
|
# temporarily morph into what looks like a regular user thread when
|
|
# really it's still the same replica repl thread. So here we save known
|
|
# repl thread IDs and check if a non-matching event is actually a
|
|
# known repl thread ID and if yes then we make it match.
|
|
if ( !defined $args{check_known_ids} || $args{check_known_ids} ) {
|
|
my $id = $query->{Id} || $query->{id};
|
|
if ( $match ) {
|
|
$self->{replication_thread}->{$id} = 1;
|
|
}
|
|
else {
|
|
if ( $self->{replication_thread}->{$id} ) {
|
|
PTDEBUG && _d("Thread ID is a known replication thread ID");
|
|
$match = 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
PTDEBUG && _d('Matches', $type, 'replication thread:',
|
|
($match ? 'yes' : 'no'), '; match:', $match);
|
|
|
|
return $match;
|
|
}
|
|
|
|
|
|
# Sub: get_replication_filters
|
|
# Get any replication filters set on the host.
|
|
#
|
|
# Parameters:
|
|
# %args - Arguments
|
|
#
|
|
# Required Arguments:
|
|
# dbh - dbh, source or replica
|
|
#
|
|
# Returns:
|
|
# Hashref of any replication filters. If none are set, an empty hashref
|
|
# is returned.
|
|
sub get_replication_filters {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(dbh);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($dbh) = @args{@required_args};
|
|
|
|
my $replica_name = get_replica_name($dbh);
|
|
|
|
my %filters = ();
|
|
|
|
my $status = $self->get_source_status($dbh);
|
|
if ( $status ) {
|
|
map { $filters{$_} = $status->{$_} }
|
|
grep { defined $status->{$_} && $status->{$_} ne '' }
|
|
qw(
|
|
binlog_do_db
|
|
binlog_ignore_db
|
|
);
|
|
}
|
|
|
|
$status = $self->get_replica_status($dbh);
|
|
if ( $status ) {
|
|
map { $filters{$_} = $status->{$_} }
|
|
grep { defined $status->{$_} && $status->{$_} ne '' }
|
|
qw(
|
|
replicate_do_db
|
|
replicate_ignore_db
|
|
replicate_do_table
|
|
replicate_ignore_table
|
|
replicate_wild_do_table
|
|
replicate_wild_ignore_table
|
|
);
|
|
|
|
my $sql = "SHOW VARIABLES LIKE '${replica_name}_skip_errors'";
|
|
PTDEBUG && _d($dbh, $sql);
|
|
my $row = $dbh->selectrow_arrayref($sql);
|
|
# "OFF" in 5.0, "" in 5.1
|
|
$filters{replica_skip_errors} = $row->[1] if $row->[1] && $row->[1] ne 'OFF';
|
|
}
|
|
|
|
return \%filters;
|
|
}
|
|
|
|
|
|
# Sub: pos_to_string
|
|
# Stringify a position in a way that's string-comparable.
|
|
#
|
|
# Parameters:
|
|
# $pos - Hashref with file and position
|
|
#
|
|
# Returns:
|
|
# String like "file/posNNNNN"
|
|
sub pos_to_string {
|
|
my ( $self, $pos ) = @_;
|
|
my $fmt = '%s/%020d';
|
|
return sprintf($fmt, @{$pos}{qw(file position)});
|
|
}
|
|
|
|
sub reset_known_replication_threads {
|
|
my ( $self ) = @_;
|
|
$self->{replication_thread} = {};
|
|
return;
|
|
}
|
|
|
|
sub get_cxn_from_dsn_table {
|
|
my ($self, %args) = @_;
|
|
my @required_args = qw(dsn_table_dsn make_cxn);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($dsn_table_dsn, $make_cxn) = @args{@required_args};
|
|
PTDEBUG && _d('DSN table DSN:', $dsn_table_dsn);
|
|
|
|
my $dp = $self->{DSNParser};
|
|
my $q = $self->{Quoter};
|
|
|
|
my $dsn = $dp->parse($dsn_table_dsn);
|
|
my $dsn_table;
|
|
if ( $dsn->{D} && $dsn->{t} ) {
|
|
$dsn_table = $q->quote($dsn->{D}, $dsn->{t});
|
|
}
|
|
elsif ( $dsn->{t} && $dsn->{t} =~ m/\./ ) {
|
|
$dsn_table = $q->quote($q->split_unquote($dsn->{t}));
|
|
}
|
|
else {
|
|
die "DSN table DSN does not specify a database (D) "
|
|
. "or a database-qualified table (t)";
|
|
}
|
|
|
|
my $done = 0;
|
|
my $dsn_tbl_cxn = $make_cxn->(dsn => $dsn);
|
|
my $dbh = $dsn_tbl_cxn->connect();
|
|
my $sql = "SELECT dsn FROM $dsn_table ORDER BY id";
|
|
PTDEBUG && _d($sql);
|
|
my @cxn;
|
|
use Data::Dumper;
|
|
DSN:
|
|
do {
|
|
@cxn = ();
|
|
my $dsn_strings = $dbh->selectcol_arrayref($sql);
|
|
if ( $dsn_strings ) {
|
|
foreach my $dsn_string ( @$dsn_strings ) {
|
|
PTDEBUG && _d('DSN from DSN table:', $dsn_string);
|
|
if ($args{wait_no_die}) {
|
|
my $lcxn;
|
|
eval {
|
|
$lcxn = $make_cxn->(dsn_string => $dsn_string);
|
|
};
|
|
if ( $EVAL_ERROR && ($dsn_tbl_cxn->lost_connection($EVAL_ERROR)
|
|
|| $EVAL_ERROR =~ m/Can't connect to MySQL server/)) {
|
|
PTDEBUG && _d("Server is not accessible, waiting when it is online again");
|
|
sleep(1);
|
|
goto DSN;
|
|
}
|
|
push @cxn, $lcxn;
|
|
} else {
|
|
push @cxn, $make_cxn->(dsn_string => $dsn_string);
|
|
}
|
|
}
|
|
}
|
|
$done = 1;
|
|
} until $done;
|
|
return \@cxn;
|
|
}
|
|
|
|
sub get_source_name {
|
|
my ($dbh) = @_;
|
|
my $vp = VersionParser->new($dbh);
|
|
return ($vp lt '8.1' || $vp->flavor() =~ m/maria/i) ? 'master' : 'source';
|
|
}
|
|
|
|
sub get_replica_name {
|
|
my ($dbh) = @_;
|
|
my $vp = VersionParser->new($dbh);
|
|
return ($vp lt '8.1' || $vp->flavor() =~ m/maria/i) ? 'slave' : 'replica';
|
|
}
|
|
|
|
sub _d {
|
|
my ($package, undef, $line) = caller 0;
|
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
|
map { defined $_ ? $_ : 'undef' }
|
|
@_;
|
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
|
}
|
|
|
|
1;
|
|
}
|
|
# ###########################################################################
|
|
# End MasterSlave package
|
|
# ###########################################################################
|