diff --git a/bin/pt-archiver b/bin/pt-archiver index 7de572c6..1ecbba38 100755 --- a/bin/pt-archiver +++ b/bin/pt-archiver @@ -27,6 +27,8 @@ BEGIN { TableNibbler Daemon MasterSlave + FlowControlWaiter + Cxn HTTP::Micro VersionCheck )); @@ -4198,6 +4200,369 @@ sub _d { # End MasterSlave package # ########################################################################### +# ########################################################################### +# FlowControlWaiter package +# This package is a copy without comments from the original. The original +# with comments and its test file can be found in the Bazaar repository at, +# lib/FlowControlWaiter.pm +# t/lib/FlowControlWaiter.t +# See https://launchpad.net/percona-toolkit for more information. +# ########################################################################### +{ +package FlowControlWaiter; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant PTDEBUG => $ENV{PTDEBUG} || 0; + +use Time::HiRes qw(sleep time); +use Data::Dumper; + +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(oktorun node sleep max_flow_ctl); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + + my $self = { + %args + }; + + $self->{last_time} = time(); + + my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + + $self->{last_fc_secs} = $last_fc_ns/1000_000_000; + + return bless $self, $class; +} + +sub wait { + my ( $self, %args ) = @_; + my @required_args = qw(); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my $pr = $args{Progress}; + + my $oktorun = $self->{oktorun}; + my $sleep = $self->{sleep}; + my $node = $self->{node}; + my $max_avg = $self->{max_flow_ctl}/100; + + my $too_much_fc = 1; + + my $pr_callback; + if ( $pr ) { + $pr_callback = sub { + print STDERR "Pausing because PXC Flow Control is active\n"; + return; + }; + $pr->set_callback($pr_callback); + } + + while ( $oktorun->() && $too_much_fc ) { + my $current_time = time(); + my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + my $current_fc_secs = $current_fc_ns/1000_000_000; + my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time}); + if ( $current_avg > $max_avg ) { + if ( $pr ) { + $pr->update(sub { return 0; }); + } + PTDEBUG && _d('Calling sleep callback'); + if ( $self->{simple_progress} ) { + print STDERR "Waiting for Flow Control to abate\n"; + } + $sleep->(); + } else { + $too_much_fc = 0; + } + $self->{last_time} = $current_time; + $self->{last_fc_secs} = $current_fc_secs; + + + } + + PTDEBUG && _d('Flow Control is Ok'); + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End FlowControlWaiter package +# ########################################################################### + +# ########################################################################### +# Cxn package +# This package is a copy without comments from the original. The original +# with comments and its test file can be found in the Bazaar repository at, +# lib/Cxn.pm +# t/lib/Cxn.t +# See https://launchpad.net/percona-toolkit for more information. +# ########################################################################### +{ +package Cxn; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use Scalar::Util qw(blessed); +use constant { + PTDEBUG => $ENV{PTDEBUG} || 0, + PERCONA_TOOLKIT_TEST_USE_DSN_NAMES => $ENV{PERCONA_TOOLKIT_TEST_USE_DSN_NAMES} || 0, +}; + +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(DSNParser OptionParser); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + }; + my ($dp, $o) = @args{@required_args}; + + my $dsn_defaults = $dp->parse_options($o); + my $prev_dsn = $args{prev_dsn}; + my $dsn = $args{dsn}; + if ( !$dsn ) { + $args{dsn_string} ||= 'h=' . ($dsn_defaults->{h} || 'localhost'); + + $dsn = $dp->parse( + $args{dsn_string}, $prev_dsn, $dsn_defaults); + } + elsif ( $prev_dsn ) { + $dsn = $dp->copy($prev_dsn, $dsn); + } + + my $dsn_name = $dp->as_string($dsn, [qw(h P S)]) + || $dp->as_string($dsn, [qw(F)]) + || ''; + + my $self = { + dsn => $dsn, + dbh => $args{dbh}, + dsn_name => $dsn_name, + hostname => '', + set => $args{set}, + NAME_lc => defined($args{NAME_lc}) ? $args{NAME_lc} : 1, + dbh_set => 0, + ask_pass => $o->get('ask-pass'), + DSNParser => $dp, + is_cluster_node => undef, + parent => $args{parent}, + }; + + return bless $self, $class; +} + +sub connect { + my ( $self, %opts ) = @_; + my $dsn = $opts{dsn} || $self->{dsn}; + my $dp = $self->{DSNParser}; + + my $dbh = $self->{dbh}; + if ( !$dbh || !$dbh->ping() ) { + if ( $self->{ask_pass} && !$self->{asked_for_pass} && !defined $dsn->{p} ) { + $dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: "); + $self->{asked_for_pass} = 1; + } + $dbh = $dp->get_dbh( + $dp->get_cxn_params($dsn), + { + AutoCommit => 1, + %opts, + }, + ); + } + + $dbh = $self->set_dbh($dbh); + if ( $opts{dsn} ) { + $self->{dsn} = $dsn; + $self->{dsn_name} = $dp->as_string($dsn, [qw(h P S)]) + || $dp->as_string($dsn, [qw(F)]) + || ''; + + } + PTDEBUG && _d($dbh, 'Connected dbh to', $self->{hostname},$self->{dsn_name}); + return $dbh; +} + +sub set_dbh { + my ($self, $dbh) = @_; + + if ( $self->{dbh} && $self->{dbh} == $dbh && $self->{dbh_set} ) { + PTDEBUG && _d($dbh, 'Already set dbh'); + return $dbh; + } + + PTDEBUG && _d($dbh, 'Setting dbh'); + + $dbh->{FetchHashKeyName} = 'NAME_lc' if $self->{NAME_lc}; + + my $sql = 'SELECT @@server_id /*!50038 , @@hostname*/'; + PTDEBUG && _d($dbh, $sql); + my ($server_id, $hostname) = $dbh->selectrow_array($sql); + PTDEBUG && _d($dbh, 'hostname:', $hostname, $server_id); + if ( $hostname ) { + $self->{hostname} = $hostname; + } + + if ( $self->{parent} ) { + PTDEBUG && _d($dbh, 'Setting InactiveDestroy=1 in parent'); + $dbh->{InactiveDestroy} = 1; + } + + if ( my $set = $self->{set}) { + $set->($dbh); + } + + $self->{dbh} = $dbh; + $self->{dbh_set} = 1; + return $dbh; +} + +sub lost_connection { + my ($self, $e) = @_; + return 0 unless $e; + return $e =~ m/MySQL server has gone away/ + || $e =~ m/Lost connection to MySQL server/; +} + +sub dbh { + my ($self) = @_; + return $self->{dbh}; +} + +sub dsn { + my ($self) = @_; + return $self->{dsn}; +} + +sub name { + my ($self) = @_; + return $self->{dsn_name} if PERCONA_TOOLKIT_TEST_USE_DSN_NAMES; + return $self->{hostname} || $self->{dsn_name} || 'unknown host'; +} + +sub get_id { + my ($self, $cxn) = @_; + + $cxn ||= $self; + + my $unique_id; + if ($cxn->is_cluster_node()) { # for cluster we concatenate various variables to maximize id 'uniqueness' across versions + my $sql = q{SHOW STATUS LIKE 'wsrep\_local\_index'}; + my (undef, $wsrep_local_index) = $cxn->dbh->selectrow_array($sql); + PTDEBUG && _d("Got cluster wsrep_local_index: ",$wsrep_local_index); + $unique_id = $wsrep_local_index."|"; + foreach my $val ('server\_id', 'wsrep\_sst\_receive\_address', 'wsrep\_node\_name', 'wsrep\_node\_address') { + my $sql = "SHOW VARIABLES LIKE '$val'"; + PTDEBUG && _d($cxn->name, $sql); + my (undef, $val) = $cxn->dbh->selectrow_array($sql); + $unique_id .= "|$val"; + } + } else { + my $sql = 'SELECT @@SERVER_ID'; + PTDEBUG && _d($sql); + $unique_id = $cxn->dbh->selectrow_array($sql); + } + PTDEBUG && _d("Generated unique id for cluster:", $unique_id); + return $unique_id; +} + + +sub is_cluster_node { + my ($self, $cxn) = @_; + + $cxn ||= $self; + + my $sql = "SHOW VARIABLES LIKE 'wsrep\_on'"; + + my $dbh; + if ($cxn->isa('DBI::db')) { + $dbh = $cxn; + PTDEBUG && _d($sql); #don't invoke name() if it's not a Cxn! + } + else { + $dbh = $cxn->dbh(); + PTDEBUG && _d($cxn->name, $sql); + } + + my $row = $dbh->selectrow_arrayref($sql); + return $row && $row->[1] && ($row->[1] eq 'ON' || $row->[1] eq '1') ? 1 : 0; + +} + +sub remove_duplicate_cxns { + my ($self, %args) = @_; + my @cxns = @{$args{cxns}}; + my $seen_ids = $args{seen_ids} || {}; + PTDEBUG && _d("Removing duplicates from ", join(" ", map { $_->name } @cxns)); + my @trimmed_cxns; + + for my $cxn ( @cxns ) { + + my $id = $cxn->get_id(); + PTDEBUG && _d('Server ID for ', $cxn->name, ': ', $id); + + if ( ! $seen_ids->{$id}++ ) { + push @trimmed_cxns, $cxn + } + else { + PTDEBUG && _d("Removing ", $cxn->name, + ", ID ", $id, ", because we've already seen it"); + } + } + + return \@trimmed_cxns; +} + +sub DESTROY { + my ($self) = @_; + + PTDEBUG && _d('Destroying cxn'); + + if ( $self->{parent} ) { + PTDEBUG && _d($self->{dbh}, 'Not disconnecting dbh in parent'); + } + elsif ( $self->{dbh} + && blessed($self->{dbh}) + && $self->{dbh}->can("disconnect") ) + { + PTDEBUG && _d($self->{dbh}, 'Disconnecting dbh on', $self->{hostname}, + $self->{dsn_name}); + $self->{dbh}->disconnect(); + } + + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End Cxn package +# ########################################################################### + + # ########################################################################### # HTTP::Micro package # This package is a copy without comments from the original. The original @@ -5460,6 +5825,7 @@ my $archive_fh; my $get_sth; my ( $OUT_OF_RETRIES, $ROLLED_BACK, $ALL_IS_WELL ) = ( 0, -1, 1 ); my ( $src, $dst ); +my $pxc_version = '0'; # Holds the arguments for the $sth's bind variables, so it can be re-tried # easily. @@ -5482,6 +5848,7 @@ sub main { undef *trace; ($OUT_OF_RETRIES, $ROLLED_BACK, $ALL_IS_WELL ) = (0, -1, 1); + # ######################################################################## # Get configuration information. # ######################################################################## @@ -5726,6 +6093,35 @@ sub main { ); } + # ####################################################################### + # Check if it's a cluster and if so get version + # Create FlowControlWaiter object if max-flow-ctl was specified and + # PXC version supports it + # ####################################################################### + + my $flow_ctl; + if ( $src && $src->{dbh} && Cxn::is_cluster_node($src->{dbh}) ) { + $pxc_version = VersionParser->new($src->{'dbh'}); + if ( $o->got('max-flow-ctl') ) { + if ( $pxc_version < '5.6' ) { + die "Option '--max-flow-ctl' is only available for PXC version 5.6 " + . "or higher." + } else { + $flow_ctl = new FlowControlWaiter( + node => $src->{'dbh'}, + max_flow_ctl => $o->get('max-flow-ctl'), + oktorun => sub { return $oktorun }, + sleep => sub { sleep($o->get('check-interval')) }, + simple_progress => $o->got('progress') ? 1 : 0, + ); + } + } + } + + if ( $src && $src->{dbh} && !Cxn::is_cluster_node($src->{dbh}) && $o->got('max-flow-ctl') ) { + die "Option '--max-flow-ctl' is for use with PXC clusters." + } + # ######################################################################## # Set up general plugin. # ######################################################################## @@ -6074,6 +6470,7 @@ sub main { # This row is the first row fetched from each 'chunk'. my $first_row = [ @$row ]; my $csv_row; + my $flow_ctl_count = 0; ROW: while ( # Quit if: @@ -6123,8 +6520,8 @@ sub main { } $ins_sth ||= $ins_row; # Default to the sth decided before. my $success = do_with_retries($o, 'inserting', sub { - $ins_sth->execute(@{$row}[@ins_slice]); - PTDEBUG && _d('Inserted', $del_row->rows, 'rows'); + my $ins_cnt = $ins_sth->execute(@{$row}[@ins_slice]); + PTDEBUG && _d('Inserted', $ins_cnt, 'rows'); $statistics{INSERT} += $ins_sth->rows; }); if ( $success == $OUT_OF_RETRIES ) { @@ -6313,6 +6710,12 @@ sub main { $lag = $ms->get_slave_lag($lag_dbh); } } + + # if it's a cluster, check for flow control every 100 rows + if ( $flow_ctl && $flow_ctl_count++ % 100 == 0) { + $flow_ctl->wait(); + } + } # ROW PTDEBUG && _d('Done fetching rows'); @@ -7074,6 +7477,16 @@ Adds the LOW_PRIORITY modifier to INSERT or REPLACE statements. See L for details. +=item --max-flow-ctl + +type: float + +Somewhat similar to --max-lag but for PXC clusters. +Check average time cluster spent pausing for Flow Control and make tool pause if +it goes over the percentage indicated in the option. +Default is no Flow Control checking. +This option is available for PXC versions 5.6 or higher. + =item --max-lag type: time; default: 1s diff --git a/lib/Cxn.pm b/lib/Cxn.pm index 4fbc1a8b..d9fa90fd 100644 --- a/lib/Cxn.pm +++ b/lib/Cxn.pm @@ -262,9 +262,22 @@ sub is_cluster_node { my ($self, $cxn) = @_; $cxn ||= $self; + my $sql = "SHOW VARIABLES LIKE 'wsrep\_on'"; - PTDEBUG && _d($cxn->name, $sql); - my $row = $cxn->dbh->selectrow_arrayref($sql); + + # here we check if a DBI object was passed instead if a Cxn + # just a convenience for tools that don't use a proper Cxn + my $dbh; + if ($cxn->isa('DBI::db')) { + $dbh = $cxn; + PTDEBUG && _d($sql); #don't invoke name() if it's not a Cxn! + } + else { + $dbh = $cxn->dbh(); + PTDEBUG && _d($cxn->name, $sql); + } + + my $row = $dbh->selectrow_arrayref($sql); return $row && $row->[1] && ($row->[1] eq 'ON' || $row->[1] eq '1') ? 1 : 0; } diff --git a/lib/FlowControlWaiter.pm b/lib/FlowControlWaiter.pm new file mode 100644 index 00000000..33a21f7e --- /dev/null +++ b/lib/FlowControlWaiter.pm @@ -0,0 +1,145 @@ +# This program is copyright 2015 Percona LLC. +# Feedback and improvements are welcome. +# +# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED +# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF +# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar +# systems, you can issue `man perlgpl' or `man perlartistic' to read these +# licenses. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 59 Temple +# Place, Suite 330, Boston, MA 02111-1307 USA. +# ########################################################################### +# FlowControlWaiter package +# ########################################################################### +{ +# Package: FlowControlWaiter +# FlowControlWaiter helps limit load when there's too much Flow Control pausing +# It is based on the other "Waiter" modules: +# ReplicaLagWaiter & MySQLStatusWaiter +package FlowControlWaiter; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant PTDEBUG => $ENV{PTDEBUG} || 0; + +use Time::HiRes qw(sleep time); +use Data::Dumper; + +# Sub: new +# +# Required Arguments: +# oktorun - Callback that returns true if it's ok to continue running +# node - Node dbh on which to check for wsrep_flow_control_paused_ns +# sleep - Callback to sleep between checks. +# max_pct - Max percent of flow control caused pause time to tolerate +# +# Returns: +# FlowControlWaiter object +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(oktorun node sleep max_flow_ctl); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + + my $self = { + %args + }; + + # Get current hi-res epoch seconds + $self->{last_time} = time(); + + # Get nanoseconds server has been paused due to Flow Control + my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + + # Convert to seconds (float) + $self->{last_fc_secs} = $last_fc_ns/1000_000_000; + + return bless $self, $class; +} + +# Sub: wait +# Wait for average flow control paused time fall below --max-flow-ctl +# +# Optional Arguments: +# Progress - object to report waiting +# +# Returns: +# 1 if average falls below max before timeout, else 0 if continue=yes, else die. +sub wait { + my ( $self, %args ) = @_; + my @required_args = qw(); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my $pr = $args{Progress}; + + my $oktorun = $self->{oktorun}; + my $sleep = $self->{sleep}; + my $node = $self->{node}; + my $max_avg = $self->{max_flow_ctl}/100; + + my $too_much_fc = 1; + + my $pr_callback; + if ( $pr ) { + # If you use the default Progress report callback, you'll need to + # to add Transformers.pm to this tool. + $pr_callback = sub { + print STDERR "Pausing because PXC Flow Control is active\n"; + return; + }; + $pr->set_callback($pr_callback); + } + + # Loop where we wait for average pausing time caused by FC to fall below --max-flow-ctl + # Average pause time is calculated starting from the last iteration. + while ( $oktorun->() && $too_much_fc ) { + my $current_time = time(); + my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + my $current_fc_secs = $current_fc_ns/1000_000_000; + my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time}); + if ( $current_avg > $max_avg ) { + if ( $pr ) { + # There's no real progress because we can't estimate how long + # it will take the values to abate. + $pr->update(sub { return 0; }); + } + PTDEBUG && _d('Calling sleep callback'); + if ( $self->{simple_progress} ) { + print STDERR "Waiting for Flow Control to abate\n"; + } + $sleep->(); + } else { + $too_much_fc = 0; + } + $self->{last_time} = $current_time; + $self->{last_fc_secs} = $current_fc_secs; + + + } + + PTDEBUG && _d('Flow Control is Ok'); + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End FlowControlWaiter package +# ###########################################################################