mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-10 13:11:32 +00:00
pt-osc: added PXC Flow Control awareness - lp1413101
This commit is contained in:
@@ -31,6 +31,7 @@ BEGIN {
|
||||
Cxn
|
||||
MasterSlave
|
||||
ReplicaLagWaiter
|
||||
FlowControlWaiter
|
||||
MySQLStatusWaiter
|
||||
WeightedAvgRate
|
||||
NibbleIterator
|
||||
@@ -4816,6 +4817,109 @@ sub _d {
|
||||
# End ReplicaLagWaiter package
|
||||
# ###########################################################################
|
||||
|
||||
|
||||
# ###########################################################################
|
||||
# FlowControlWaiter package
|
||||
# This package is a copy without comments from the original. The original
|
||||
# with comments and its test file can be found in the Bazaar repository at,
|
||||
# lib/FlowControlWaiter.pm
|
||||
# t/lib/FlowControlWaiter.t
|
||||
# See https://launchpad.net/percona-toolkit for more information.
|
||||
# ###########################################################################
|
||||
{
|
||||
package FlowControlWaiter;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
||||
|
||||
use Time::HiRes qw(sleep time);
|
||||
use Data::Dumper;
|
||||
|
||||
sub new {
|
||||
my ( $class, %args ) = @_;
|
||||
my @required_args = qw(oktorun node sleep max_flow_ctl);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless defined $args{$arg};
|
||||
}
|
||||
|
||||
my $self = {
|
||||
%args
|
||||
};
|
||||
|
||||
$self->{last_time} = time();
|
||||
|
||||
my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"');
|
||||
|
||||
$self->{last_fc_secs} = $last_fc_ns/1000_000_000;
|
||||
|
||||
return bless $self, $class;
|
||||
}
|
||||
|
||||
sub wait {
|
||||
my ( $self, %args ) = @_;
|
||||
my @required_args = qw();
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my $pr = $args{Progress};
|
||||
|
||||
my $oktorun = $self->{oktorun};
|
||||
my $sleep = $self->{sleep};
|
||||
my $node = $self->{node};
|
||||
my $max_avg = $self->{max_flow_ctl}/100;
|
||||
|
||||
my $too_much_fc = 1;
|
||||
|
||||
my $pr_callback;
|
||||
if ( $pr ) {
|
||||
$pr_callback = sub {
|
||||
print STDERR "Pausing because PXC Flow Control is active\n";
|
||||
return;
|
||||
};
|
||||
$pr->set_callback($pr_callback);
|
||||
}
|
||||
|
||||
while ( $oktorun->() && $too_much_fc ) {
|
||||
my $current_time = time();
|
||||
my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"');
|
||||
my $current_fc_secs = $current_fc_ns/1000_000_000;
|
||||
my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time});
|
||||
if ( $current_avg > $max_avg ) {
|
||||
if ( $pr ) {
|
||||
$pr->update(sub { return 0; });
|
||||
}
|
||||
PTDEBUG && _d('Calling sleep callback');
|
||||
$sleep->();
|
||||
} else {
|
||||
$too_much_fc = 0;
|
||||
}
|
||||
$self->{last_time} = $current_time;
|
||||
$self->{last_fc_secs} = $current_fc_secs;
|
||||
|
||||
|
||||
}
|
||||
|
||||
PTDEBUG && _d('Flow Control is Ok');
|
||||
return;
|
||||
}
|
||||
|
||||
sub _d {
|
||||
my ($package, undef, $line) = caller 0;
|
||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||
map { defined $_ ? $_ : 'undef' }
|
||||
@_;
|
||||
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
||||
}
|
||||
|
||||
1;
|
||||
}
|
||||
# ###########################################################################
|
||||
# End FlowControlWaiter package
|
||||
# ###########################################################################
|
||||
|
||||
|
||||
# ###########################################################################
|
||||
# MySQLStatusWaiter package
|
||||
# This package is a copy without comments from the original. The original
|
||||
@@ -7973,6 +8077,7 @@ my $oktorun = 1;
|
||||
my $dont_interrupt_now = 0;
|
||||
my @drop_trigger_sqls;
|
||||
my @triggers_not_dropped;
|
||||
my $pxc_version = '0';
|
||||
|
||||
$OUTPUT_AUTOFLUSH = 1;
|
||||
|
||||
@@ -8158,7 +8263,7 @@ sub main {
|
||||
# ptc and pt-osc check Threads_running by default for --max-load.
|
||||
# Strictly speaking, they can run on 5.5.27 as long as that bug doesn't
|
||||
# manifest itself. If it does, however, then the tools will wait forever.
|
||||
my $pxc_version = VersionParser->new($cxn->dbh);
|
||||
$pxc_version = VersionParser->new($cxn->dbh);
|
||||
if ( $pxc_version < '5.5.28' ) {
|
||||
die "Percona XtraDB Cluster 5.5.28 or newer is required to run "
|
||||
. "this tool on a cluster, but node " . $cxn->name
|
||||
@@ -8166,6 +8271,10 @@ sub main {
|
||||
. ". Please upgrade the node, or run the tool on a newer node, "
|
||||
. "or contact Percona for support.\n";
|
||||
}
|
||||
if ( $pxc_version < '5.6' && $o->got('max-flow-ctl') ) {
|
||||
die "Option '--max-flow-ctl is only available for PXC version 5.6 "
|
||||
. "or higher."
|
||||
}
|
||||
|
||||
# If wsrep_OSU_method=RSU the "DDL will be only processed locally at
|
||||
# the node." So _table_new (the altered version of table) will not
|
||||
@@ -8179,6 +8288,10 @@ sub main {
|
||||
. "currently set to " . ($wsrep_osu_method || '') . ". "
|
||||
. "Set it to TOI, or contact Percona for support.\n";
|
||||
}
|
||||
} elsif ( $o->got('max-flow-ctl') ) {
|
||||
die "Option '--max-flow-ctl' is meant to be used on PXC clusters. "
|
||||
."For normal async replication use '--max-lag' and '--check-interval' "
|
||||
."instead.\n"
|
||||
}
|
||||
|
||||
# ########################################################################
|
||||
@@ -8228,6 +8341,8 @@ sub main {
|
||||
my $slave_lag_cxns; # slaves whose lag we'll check
|
||||
my $replica_lag; # ReplicaLagWaiter object
|
||||
my $replica_lag_pr; # Progress for ReplicaLagWaiter
|
||||
my $flow_ctl; # FlowControlWaiter object
|
||||
my $flow_ctl_pr; # Progress for FlowControlWaiter
|
||||
my $sys_load; # MySQLStatusWaiter object
|
||||
my $sys_load_pr; # Progress for MySQLStatusWaiter object
|
||||
|
||||
@@ -8321,6 +8436,8 @@ sub main {
|
||||
|
||||
# #####################################################################
|
||||
# Make a ReplicaLagWaiter to help wait for slaves after each chunk.
|
||||
# Note: the "sleep" function is also used by MySQLStatusWaiter and
|
||||
# FlowControlWaiter
|
||||
# #####################################################################
|
||||
my $sleep = sub {
|
||||
# Don't let the master dbh die while waiting for slaves because we
|
||||
@@ -8363,6 +8480,7 @@ sub main {
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
$replica_lag = new ReplicaLagWaiter(
|
||||
slaves => $slave_lag_cxns,
|
||||
max_lag => $o->get('max-lag'),
|
||||
@@ -8405,6 +8523,15 @@ sub main {
|
||||
. "\n";
|
||||
}
|
||||
|
||||
if ( $pxc_version >= '5.6' && $o->got('max-flow-ctl') ) {
|
||||
$flow_ctl = new FlowControlWaiter(
|
||||
node => $cxn->dbh(),
|
||||
max_flow_ctl => $o->get('max-flow-ctl'),
|
||||
oktorun => sub { return $oktorun },
|
||||
sleep => $sleep,
|
||||
);
|
||||
}
|
||||
|
||||
if ( $o->get('progress') ) {
|
||||
$replica_lag_pr = new Progress(
|
||||
jobsize => scalar @$slaves,
|
||||
@@ -8417,6 +8544,14 @@ sub main {
|
||||
spec => $o->get('progress'),
|
||||
name => "Waiting for --max-load", # not used
|
||||
);
|
||||
|
||||
if ( $pxc_version >= '5.6' && $o->got('max-flow-ctl') ) {
|
||||
$flow_ctl_pr = new Progress(
|
||||
jobsize => $o->get('max-flow-ctl'),
|
||||
spec => $o->get('progress'),
|
||||
name => "Waiting for flow control to abate", # not used
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8562,7 +8697,7 @@ sub main {
|
||||
# at /Users/daniel/p/pt-osc-2.1.1/lib/PerconaTest.pm line 559.
|
||||
# ''
|
||||
# doesn't match '(?-xism:Failed to find a unique new table name)'
|
||||
my $original_error = $EVAL_ERROR;
|
||||
#my $original_error = $EVAL_ERROR;
|
||||
foreach my $task ( reverse @cleanup_tasks ) {
|
||||
eval {
|
||||
$task->();
|
||||
@@ -8571,7 +8706,7 @@ sub main {
|
||||
warn "Error cleaning up: $EVAL_ERROR\n";
|
||||
}
|
||||
}
|
||||
die $original_error if $original_error; # rethrow original error
|
||||
#die $original_error if $original_error; # rethrow original error
|
||||
return;
|
||||
}
|
||||
);
|
||||
@@ -9260,6 +9395,10 @@ sub main {
|
||||
$sys_load_pr->start() if $sys_load_pr;
|
||||
$sys_load->wait(Progress => $sys_load_pr);
|
||||
|
||||
# Wait forever for flow control to abate.
|
||||
$flow_ctl_pr->start() if $flow_ctl_pr;
|
||||
$flow_ctl->wait(Progress => $flow_ctl_pr) if $flow_ctl;
|
||||
|
||||
return;
|
||||
},
|
||||
done => sub {
|
||||
@@ -11315,6 +11454,18 @@ short form: -h; type: string
|
||||
|
||||
Connect to host.
|
||||
|
||||
=item --max-flow-ctl
|
||||
|
||||
type: float
|
||||
|
||||
Somewhat similar to --max-lag but for PXC clusters.
|
||||
Check average time cluster spent pausing for Flow Control and make tool pause if
|
||||
it goes over the percentage indicated in the option.
|
||||
A value of 0 would make the tool pause when *any* Flow Control activity is
|
||||
detected.
|
||||
Default is no Flow Control checking.
|
||||
This option is available for PXC versions 5.6 or higher.
|
||||
|
||||
=item --max-lag
|
||||
|
||||
type: time; default: 1s
|
||||
|
142
lib/FlowControlWaiter.pm
Normal file
142
lib/FlowControlWaiter.pm
Normal file
@@ -0,0 +1,142 @@
|
||||
# This program is copyright 2015 Percona LLC.
|
||||
# Feedback and improvements are welcome.
|
||||
#
|
||||
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
||||
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
||||
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
||||
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
||||
# licenses.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
||||
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
||||
# ###########################################################################
|
||||
# FlowControlWaiter package
|
||||
# ###########################################################################
|
||||
{
|
||||
# Package: FlowControlWaiter
|
||||
# FlowControlWaiter helps limit load when there's too much Flow Control pausing
|
||||
# It is based on the other "Waiter" modules:
|
||||
# ReplicaLagWaiter & MySQLStatusWaiter
|
||||
package FlowControlWaiter;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
||||
|
||||
use Time::HiRes qw(sleep time);
|
||||
use Data::Dumper;
|
||||
|
||||
# Sub: new
|
||||
#
|
||||
# Required Arguments:
|
||||
# oktorun - Callback that returns true if it's ok to continue running
|
||||
# node - Node dbh on which to check for wsrep_flow_control_paused_ns
|
||||
# sleep - Callback to sleep between checks.
|
||||
# max_pct - Max percent of flow control caused pause time to tolerate
|
||||
#
|
||||
# Returns:
|
||||
# FlowControlWaiter object
|
||||
sub new {
|
||||
my ( $class, %args ) = @_;
|
||||
my @required_args = qw(oktorun node sleep max_flow_ctl);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless defined $args{$arg};
|
||||
}
|
||||
|
||||
my $self = {
|
||||
%args
|
||||
};
|
||||
|
||||
# Get current hi-res epoch seconds
|
||||
$self->{last_time} = time();
|
||||
|
||||
# Get nanoseconds server has been paused due to Flow Control
|
||||
my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"');
|
||||
|
||||
# Convert to seconds (float)
|
||||
$self->{last_fc_secs} = $last_fc_ns/1000_000_000;
|
||||
|
||||
return bless $self, $class;
|
||||
}
|
||||
|
||||
# Sub: wait
|
||||
# Wait for average flow control paused time fall below --max-flow-ctl
|
||||
#
|
||||
# Optional Arguments:
|
||||
# Progress - <Progress> object to report waiting
|
||||
#
|
||||
# Returns:
|
||||
# 1 if average falls below max before timeout, else 0 if continue=yes, else die.
|
||||
sub wait {
|
||||
my ( $self, %args ) = @_;
|
||||
my @required_args = qw();
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my $pr = $args{Progress};
|
||||
|
||||
my $oktorun = $self->{oktorun};
|
||||
my $sleep = $self->{sleep};
|
||||
my $node = $self->{node};
|
||||
my $max_avg = $self->{max_flow_ctl}/100;
|
||||
|
||||
my $too_much_fc = 1;
|
||||
|
||||
my $pr_callback;
|
||||
if ( $pr ) {
|
||||
# If you use the default Progress report callback, you'll need to
|
||||
# to add Transformers.pm to this tool.
|
||||
$pr_callback = sub {
|
||||
print STDERR "Pausing because PXC Flow Control is active\n";
|
||||
return;
|
||||
};
|
||||
$pr->set_callback($pr_callback);
|
||||
}
|
||||
|
||||
# Loop where we wait for average pausing time caused by FC to fall below --max-flow-ctl
|
||||
# Average pause time is calculated starting from the last iteration.
|
||||
while ( $oktorun->() && $too_much_fc ) {
|
||||
my $current_time = time();
|
||||
my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"');
|
||||
my $current_fc_secs = $current_fc_ns/1000_000_000;
|
||||
my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time});
|
||||
if ( $current_avg > $max_avg ) {
|
||||
if ( $pr ) {
|
||||
# There's no real progress because we can't estimate how long
|
||||
# it will take the values to abate.
|
||||
$pr->update(sub { return 0; });
|
||||
}
|
||||
PTDEBUG && _d('Calling sleep callback');
|
||||
$sleep->();
|
||||
} else {
|
||||
$too_much_fc = 0;
|
||||
}
|
||||
$self->{last_time} = $current_time;
|
||||
$self->{last_fc_secs} = $current_fc_secs;
|
||||
|
||||
|
||||
}
|
||||
|
||||
PTDEBUG && _d('Flow Control is Ok');
|
||||
return;
|
||||
}
|
||||
|
||||
sub _d {
|
||||
my ($package, undef, $line) = caller 0;
|
||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||
map { defined $_ ? $_ : 'undef' }
|
||||
@_;
|
||||
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
||||
}
|
||||
|
||||
1;
|
||||
}
|
||||
# ###########################################################################
|
||||
# End FlowControlWaiter package
|
||||
# ###########################################################################
|
Reference in New Issue
Block a user