pt-archiver added flow control awareness for PXC

This commit is contained in:
frank-cizmich
2015-07-31 14:32:43 -03:00
parent 66c74af47b
commit e7052c013b
3 changed files with 575 additions and 4 deletions

View File

@@ -27,6 +27,8 @@ BEGIN {
TableNibbler
Daemon
MasterSlave
FlowControlWaiter
Cxn
HTTP::Micro
VersionCheck
));
@@ -4198,6 +4200,369 @@ sub _d {
# End MasterSlave package
# ###########################################################################
# ###########################################################################
# FlowControlWaiter package
# This package is a copy without comments from the original. The original
# with comments and its test file can be found in the Bazaar repository at,
# lib/FlowControlWaiter.pm
# t/lib/FlowControlWaiter.t
# See https://launchpad.net/percona-toolkit for more information.
# ###########################################################################
{
package FlowControlWaiter;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use Time::HiRes qw(sleep time);
use Data::Dumper;
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(oktorun node sleep max_flow_ctl);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my $self = {
%args
};
$self->{last_time} = time();
my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"');
$self->{last_fc_secs} = $last_fc_ns/1000_000_000;
return bless $self, $class;
}
sub wait {
my ( $self, %args ) = @_;
my @required_args = qw();
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $pr = $args{Progress};
my $oktorun = $self->{oktorun};
my $sleep = $self->{sleep};
my $node = $self->{node};
my $max_avg = $self->{max_flow_ctl}/100;
my $too_much_fc = 1;
my $pr_callback;
if ( $pr ) {
$pr_callback = sub {
print STDERR "Pausing because PXC Flow Control is active\n";
return;
};
$pr->set_callback($pr_callback);
}
while ( $oktorun->() && $too_much_fc ) {
my $current_time = time();
my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"');
my $current_fc_secs = $current_fc_ns/1000_000_000;
my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time});
if ( $current_avg > $max_avg ) {
if ( $pr ) {
$pr->update(sub { return 0; });
}
PTDEBUG && _d('Calling sleep callback');
if ( $self->{simple_progress} ) {
print STDERR "Waiting for Flow Control to abate\n";
}
$sleep->();
} else {
$too_much_fc = 0;
}
$self->{last_time} = $current_time;
$self->{last_fc_secs} = $current_fc_secs;
}
PTDEBUG && _d('Flow Control is Ok');
return;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End FlowControlWaiter package
# ###########################################################################
# ###########################################################################
# Cxn package
# This package is a copy without comments from the original. The original
# with comments and its test file can be found in the Bazaar repository at,
# lib/Cxn.pm
# t/lib/Cxn.t
# See https://launchpad.net/percona-toolkit for more information.
# ###########################################################################
{
package Cxn;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Scalar::Util qw(blessed);
use constant {
PTDEBUG => $ENV{PTDEBUG} || 0,
PERCONA_TOOLKIT_TEST_USE_DSN_NAMES => $ENV{PERCONA_TOOLKIT_TEST_USE_DSN_NAMES} || 0,
};
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(DSNParser OptionParser);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
};
my ($dp, $o) = @args{@required_args};
my $dsn_defaults = $dp->parse_options($o);
my $prev_dsn = $args{prev_dsn};
my $dsn = $args{dsn};
if ( !$dsn ) {
$args{dsn_string} ||= 'h=' . ($dsn_defaults->{h} || 'localhost');
$dsn = $dp->parse(
$args{dsn_string}, $prev_dsn, $dsn_defaults);
}
elsif ( $prev_dsn ) {
$dsn = $dp->copy($prev_dsn, $dsn);
}
my $dsn_name = $dp->as_string($dsn, [qw(h P S)])
|| $dp->as_string($dsn, [qw(F)])
|| '';
my $self = {
dsn => $dsn,
dbh => $args{dbh},
dsn_name => $dsn_name,
hostname => '',
set => $args{set},
NAME_lc => defined($args{NAME_lc}) ? $args{NAME_lc} : 1,
dbh_set => 0,
ask_pass => $o->get('ask-pass'),
DSNParser => $dp,
is_cluster_node => undef,
parent => $args{parent},
};
return bless $self, $class;
}
sub connect {
my ( $self, %opts ) = @_;
my $dsn = $opts{dsn} || $self->{dsn};
my $dp = $self->{DSNParser};
my $dbh = $self->{dbh};
if ( !$dbh || !$dbh->ping() ) {
if ( $self->{ask_pass} && !$self->{asked_for_pass} && !defined $dsn->{p} ) {
$dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: ");
$self->{asked_for_pass} = 1;
}
$dbh = $dp->get_dbh(
$dp->get_cxn_params($dsn),
{
AutoCommit => 1,
%opts,
},
);
}
$dbh = $self->set_dbh($dbh);
if ( $opts{dsn} ) {
$self->{dsn} = $dsn;
$self->{dsn_name} = $dp->as_string($dsn, [qw(h P S)])
|| $dp->as_string($dsn, [qw(F)])
|| '';
}
PTDEBUG && _d($dbh, 'Connected dbh to', $self->{hostname},$self->{dsn_name});
return $dbh;
}
sub set_dbh {
my ($self, $dbh) = @_;
if ( $self->{dbh} && $self->{dbh} == $dbh && $self->{dbh_set} ) {
PTDEBUG && _d($dbh, 'Already set dbh');
return $dbh;
}
PTDEBUG && _d($dbh, 'Setting dbh');
$dbh->{FetchHashKeyName} = 'NAME_lc' if $self->{NAME_lc};
my $sql = 'SELECT @@server_id /*!50038 , @@hostname*/';
PTDEBUG && _d($dbh, $sql);
my ($server_id, $hostname) = $dbh->selectrow_array($sql);
PTDEBUG && _d($dbh, 'hostname:', $hostname, $server_id);
if ( $hostname ) {
$self->{hostname} = $hostname;
}
if ( $self->{parent} ) {
PTDEBUG && _d($dbh, 'Setting InactiveDestroy=1 in parent');
$dbh->{InactiveDestroy} = 1;
}
if ( my $set = $self->{set}) {
$set->($dbh);
}
$self->{dbh} = $dbh;
$self->{dbh_set} = 1;
return $dbh;
}
sub lost_connection {
my ($self, $e) = @_;
return 0 unless $e;
return $e =~ m/MySQL server has gone away/
|| $e =~ m/Lost connection to MySQL server/;
}
sub dbh {
my ($self) = @_;
return $self->{dbh};
}
sub dsn {
my ($self) = @_;
return $self->{dsn};
}
sub name {
my ($self) = @_;
return $self->{dsn_name} if PERCONA_TOOLKIT_TEST_USE_DSN_NAMES;
return $self->{hostname} || $self->{dsn_name} || 'unknown host';
}
sub get_id {
my ($self, $cxn) = @_;
$cxn ||= $self;
my $unique_id;
if ($cxn->is_cluster_node()) { # for cluster we concatenate various variables to maximize id 'uniqueness' across versions
my $sql = q{SHOW STATUS LIKE 'wsrep\_local\_index'};
my (undef, $wsrep_local_index) = $cxn->dbh->selectrow_array($sql);
PTDEBUG && _d("Got cluster wsrep_local_index: ",$wsrep_local_index);
$unique_id = $wsrep_local_index."|";
foreach my $val ('server\_id', 'wsrep\_sst\_receive\_address', 'wsrep\_node\_name', 'wsrep\_node\_address') {
my $sql = "SHOW VARIABLES LIKE '$val'";
PTDEBUG && _d($cxn->name, $sql);
my (undef, $val) = $cxn->dbh->selectrow_array($sql);
$unique_id .= "|$val";
}
} else {
my $sql = 'SELECT @@SERVER_ID';
PTDEBUG && _d($sql);
$unique_id = $cxn->dbh->selectrow_array($sql);
}
PTDEBUG && _d("Generated unique id for cluster:", $unique_id);
return $unique_id;
}
sub is_cluster_node {
my ($self, $cxn) = @_;
$cxn ||= $self;
my $sql = "SHOW VARIABLES LIKE 'wsrep\_on'";
my $dbh;
if ($cxn->isa('DBI::db')) {
$dbh = $cxn;
PTDEBUG && _d($sql); #don't invoke name() if it's not a Cxn!
}
else {
$dbh = $cxn->dbh();
PTDEBUG && _d($cxn->name, $sql);
}
my $row = $dbh->selectrow_arrayref($sql);
return $row && $row->[1] && ($row->[1] eq 'ON' || $row->[1] eq '1') ? 1 : 0;
}
sub remove_duplicate_cxns {
my ($self, %args) = @_;
my @cxns = @{$args{cxns}};
my $seen_ids = $args{seen_ids} || {};
PTDEBUG && _d("Removing duplicates from ", join(" ", map { $_->name } @cxns));
my @trimmed_cxns;
for my $cxn ( @cxns ) {
my $id = $cxn->get_id();
PTDEBUG && _d('Server ID for ', $cxn->name, ': ', $id);
if ( ! $seen_ids->{$id}++ ) {
push @trimmed_cxns, $cxn
}
else {
PTDEBUG && _d("Removing ", $cxn->name,
", ID ", $id, ", because we've already seen it");
}
}
return \@trimmed_cxns;
}
sub DESTROY {
my ($self) = @_;
PTDEBUG && _d('Destroying cxn');
if ( $self->{parent} ) {
PTDEBUG && _d($self->{dbh}, 'Not disconnecting dbh in parent');
}
elsif ( $self->{dbh}
&& blessed($self->{dbh})
&& $self->{dbh}->can("disconnect") )
{
PTDEBUG && _d($self->{dbh}, 'Disconnecting dbh on', $self->{hostname},
$self->{dsn_name});
$self->{dbh}->disconnect();
}
return;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End Cxn package
# ###########################################################################
# ###########################################################################
# HTTP::Micro package
# This package is a copy without comments from the original. The original
@@ -5460,6 +5825,7 @@ my $archive_fh;
my $get_sth;
my ( $OUT_OF_RETRIES, $ROLLED_BACK, $ALL_IS_WELL ) = ( 0, -1, 1 );
my ( $src, $dst );
my $pxc_version = '0';
# Holds the arguments for the $sth's bind variables, so it can be re-tried
# easily.
@@ -5482,6 +5848,7 @@ sub main {
undef *trace;
($OUT_OF_RETRIES, $ROLLED_BACK, $ALL_IS_WELL ) = (0, -1, 1);
# ########################################################################
# Get configuration information.
# ########################################################################
@@ -5726,6 +6093,35 @@ sub main {
);
}
# #######################################################################
# Check if it's a cluster and if so get version
# Create FlowControlWaiter object if max-flow-ctl was specified and
# PXC version supports it
# #######################################################################
my $flow_ctl;
if ( $src && $src->{dbh} && Cxn::is_cluster_node($src->{dbh}) ) {
$pxc_version = VersionParser->new($src->{'dbh'});
if ( $o->got('max-flow-ctl') ) {
if ( $pxc_version < '5.6' ) {
die "Option '--max-flow-ctl' is only available for PXC version 5.6 "
. "or higher."
} else {
$flow_ctl = new FlowControlWaiter(
node => $src->{'dbh'},
max_flow_ctl => $o->get('max-flow-ctl'),
oktorun => sub { return $oktorun },
sleep => sub { sleep($o->get('check-interval')) },
simple_progress => $o->got('progress') ? 1 : 0,
);
}
}
}
if ( $src && $src->{dbh} && !Cxn::is_cluster_node($src->{dbh}) && $o->got('max-flow-ctl') ) {
die "Option '--max-flow-ctl' is for use with PXC clusters."
}
# ########################################################################
# Set up general plugin.
# ########################################################################
@@ -6074,6 +6470,7 @@ sub main {
# This row is the first row fetched from each 'chunk'.
my $first_row = [ @$row ];
my $csv_row;
my $flow_ctl_count = 0;
ROW:
while ( # Quit if:
@@ -6123,8 +6520,8 @@ sub main {
}
$ins_sth ||= $ins_row; # Default to the sth decided before.
my $success = do_with_retries($o, 'inserting', sub {
$ins_sth->execute(@{$row}[@ins_slice]);
PTDEBUG && _d('Inserted', $del_row->rows, 'rows');
my $ins_cnt = $ins_sth->execute(@{$row}[@ins_slice]);
PTDEBUG && _d('Inserted', $ins_cnt, 'rows');
$statistics{INSERT} += $ins_sth->rows;
});
if ( $success == $OUT_OF_RETRIES ) {
@@ -6313,6 +6710,12 @@ sub main {
$lag = $ms->get_slave_lag($lag_dbh);
}
}
# if it's a cluster, check for flow control every 100 rows
if ( $flow_ctl && $flow_ctl_count++ % 100 == 0) {
$flow_ctl->wait();
}
} # ROW
PTDEBUG && _d('Done fetching rows');
@@ -7074,6 +7477,16 @@ Adds the LOW_PRIORITY modifier to INSERT or REPLACE statements.
See L<http://dev.mysql.com/doc/en/insert.html> for details.
=item --max-flow-ctl
type: float
Somewhat similar to --max-lag but for PXC clusters.
Check average time cluster spent pausing for Flow Control and make tool pause if
it goes over the percentage indicated in the option.
Default is no Flow Control checking.
This option is available for PXC versions 5.6 or higher.
=item --max-lag
type: time; default: 1s