From 334f358a5c81b37025d5ef414c5df01df26dc16a Mon Sep 17 00:00:00 2001 From: frank-cizmich Date: Wed, 22 Jul 2015 16:37:05 -0300 Subject: [PATCH] pt-osc: added PXC Flow Control awareness - lp1413101 --- bin/pt-online-schema-change | 157 +++++++++++++++++++++++++++++++++++- lib/FlowControlWaiter.pm | 142 ++++++++++++++++++++++++++++++++ 2 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 lib/FlowControlWaiter.pm diff --git a/bin/pt-online-schema-change b/bin/pt-online-schema-change index ead42cfd..ba94a27c 100755 --- a/bin/pt-online-schema-change +++ b/bin/pt-online-schema-change @@ -31,6 +31,7 @@ BEGIN { Cxn MasterSlave ReplicaLagWaiter + FlowControlWaiter MySQLStatusWaiter WeightedAvgRate NibbleIterator @@ -4816,6 +4817,109 @@ sub _d { # End ReplicaLagWaiter package # ########################################################################### + +# ########################################################################### +# FlowControlWaiter package +# This package is a copy without comments from the original. The original +# with comments and its test file can be found in the Bazaar repository at, +# lib/FlowControlWaiter.pm +# t/lib/FlowControlWaiter.t +# See https://launchpad.net/percona-toolkit for more information. +# ########################################################################### +{ +package FlowControlWaiter; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant PTDEBUG => $ENV{PTDEBUG} || 0; + +use Time::HiRes qw(sleep time); +use Data::Dumper; + +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(oktorun node sleep max_flow_ctl); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + + my $self = { + %args + }; + + $self->{last_time} = time(); + + my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + + $self->{last_fc_secs} = $last_fc_ns/1000_000_000; + + return bless $self, $class; +} + +sub wait { + my ( $self, %args ) = @_; + my @required_args = qw(); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my $pr = $args{Progress}; + + my $oktorun = $self->{oktorun}; + my $sleep = $self->{sleep}; + my $node = $self->{node}; + my $max_avg = $self->{max_flow_ctl}/100; + + my $too_much_fc = 1; + + my $pr_callback; + if ( $pr ) { + $pr_callback = sub { + print STDERR "Pausing because PXC Flow Control is active\n"; + return; + }; + $pr->set_callback($pr_callback); + } + + while ( $oktorun->() && $too_much_fc ) { + my $current_time = time(); + my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + my $current_fc_secs = $current_fc_ns/1000_000_000; + my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time}); + if ( $current_avg > $max_avg ) { + if ( $pr ) { + $pr->update(sub { return 0; }); + } + PTDEBUG && _d('Calling sleep callback'); + $sleep->(); + } else { + $too_much_fc = 0; + } + $self->{last_time} = $current_time; + $self->{last_fc_secs} = $current_fc_secs; + + + } + + PTDEBUG && _d('Flow Control is Ok'); + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End FlowControlWaiter package +# ########################################################################### + + # ########################################################################### # MySQLStatusWaiter package # This package is a copy without comments from the original. The original @@ -7973,6 +8077,7 @@ my $oktorun = 1; my $dont_interrupt_now = 0; my @drop_trigger_sqls; my @triggers_not_dropped; +my $pxc_version = '0'; $OUTPUT_AUTOFLUSH = 1; @@ -8158,7 +8263,7 @@ sub main { # ptc and pt-osc check Threads_running by default for --max-load. # Strictly speaking, they can run on 5.5.27 as long as that bug doesn't # manifest itself. If it does, however, then the tools will wait forever. - my $pxc_version = VersionParser->new($cxn->dbh); + $pxc_version = VersionParser->new($cxn->dbh); if ( $pxc_version < '5.5.28' ) { die "Percona XtraDB Cluster 5.5.28 or newer is required to run " . "this tool on a cluster, but node " . $cxn->name @@ -8166,6 +8271,10 @@ sub main { . ". Please upgrade the node, or run the tool on a newer node, " . "or contact Percona for support.\n"; } + if ( $pxc_version < '5.6' && $o->got('max-flow-ctl') ) { + die "Option '--max-flow-ctl is only available for PXC version 5.6 " + . "or higher." + } # If wsrep_OSU_method=RSU the "DDL will be only processed locally at # the node." So _table_new (the altered version of table) will not @@ -8179,6 +8288,10 @@ sub main { . "currently set to " . ($wsrep_osu_method || '') . ". " . "Set it to TOI, or contact Percona for support.\n"; } + } elsif ( $o->got('max-flow-ctl') ) { + die "Option '--max-flow-ctl' is meant to be used on PXC clusters. " + ."For normal async replication use '--max-lag' and '--check-interval' " + ."instead.\n" } # ######################################################################## @@ -8228,6 +8341,8 @@ sub main { my $slave_lag_cxns; # slaves whose lag we'll check my $replica_lag; # ReplicaLagWaiter object my $replica_lag_pr; # Progress for ReplicaLagWaiter + my $flow_ctl; # FlowControlWaiter object + my $flow_ctl_pr; # Progress for FlowControlWaiter my $sys_load; # MySQLStatusWaiter object my $sys_load_pr; # Progress for MySQLStatusWaiter object @@ -8321,6 +8436,8 @@ sub main { # ##################################################################### # Make a ReplicaLagWaiter to help wait for slaves after each chunk. + # Note: the "sleep" function is also used by MySQLStatusWaiter and + # FlowControlWaiter # ##################################################################### my $sleep = sub { # Don't let the master dbh die while waiting for slaves because we @@ -8363,6 +8480,7 @@ sub main { }; } + $replica_lag = new ReplicaLagWaiter( slaves => $slave_lag_cxns, max_lag => $o->get('max-lag'), @@ -8404,6 +8522,15 @@ sub main { . " --critial-load " . (join(',', @{$o->get('critical-load')})) . "\n"; } + + if ( $pxc_version >= '5.6' && $o->got('max-flow-ctl') ) { + $flow_ctl = new FlowControlWaiter( + node => $cxn->dbh(), + max_flow_ctl => $o->get('max-flow-ctl'), + oktorun => sub { return $oktorun }, + sleep => $sleep, + ); + } if ( $o->get('progress') ) { $replica_lag_pr = new Progress( @@ -8417,6 +8544,14 @@ sub main { spec => $o->get('progress'), name => "Waiting for --max-load", # not used ); + + if ( $pxc_version >= '5.6' && $o->got('max-flow-ctl') ) { + $flow_ctl_pr = new Progress( + jobsize => $o->get('max-flow-ctl'), + spec => $o->get('progress'), + name => "Waiting for flow control to abate", # not used + ); + } } } @@ -8562,7 +8697,7 @@ sub main { # at /Users/daniel/p/pt-osc-2.1.1/lib/PerconaTest.pm line 559. # '' # doesn't match '(?-xism:Failed to find a unique new table name)' - my $original_error = $EVAL_ERROR; + #my $original_error = $EVAL_ERROR; foreach my $task ( reverse @cleanup_tasks ) { eval { $task->(); @@ -8571,7 +8706,7 @@ sub main { warn "Error cleaning up: $EVAL_ERROR\n"; } } - die $original_error if $original_error; # rethrow original error + #die $original_error if $original_error; # rethrow original error return; } ); @@ -9260,6 +9395,10 @@ sub main { $sys_load_pr->start() if $sys_load_pr; $sys_load->wait(Progress => $sys_load_pr); + # Wait forever for flow control to abate. + $flow_ctl_pr->start() if $flow_ctl_pr; + $flow_ctl->wait(Progress => $flow_ctl_pr) if $flow_ctl; + return; }, done => sub { @@ -11315,6 +11454,18 @@ short form: -h; type: string Connect to host. +=item --max-flow-ctl + +type: float + +Somewhat similar to --max-lag but for PXC clusters. +Check average time cluster spent pausing for Flow Control and make tool pause if +it goes over the percentage indicated in the option. +A value of 0 would make the tool pause when *any* Flow Control activity is +detected. +Default is no Flow Control checking. +This option is available for PXC versions 5.6 or higher. + =item --max-lag type: time; default: 1s diff --git a/lib/FlowControlWaiter.pm b/lib/FlowControlWaiter.pm new file mode 100644 index 00000000..5df08ac1 --- /dev/null +++ b/lib/FlowControlWaiter.pm @@ -0,0 +1,142 @@ +# This program is copyright 2015 Percona LLC. +# Feedback and improvements are welcome. +# +# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED +# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF +# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar +# systems, you can issue `man perlgpl' or `man perlartistic' to read these +# licenses. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 59 Temple +# Place, Suite 330, Boston, MA 02111-1307 USA. +# ########################################################################### +# FlowControlWaiter package +# ########################################################################### +{ +# Package: FlowControlWaiter +# FlowControlWaiter helps limit load when there's too much Flow Control pausing +# It is based on the other "Waiter" modules: +# ReplicaLagWaiter & MySQLStatusWaiter +package FlowControlWaiter; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant PTDEBUG => $ENV{PTDEBUG} || 0; + +use Time::HiRes qw(sleep time); +use Data::Dumper; + +# Sub: new +# +# Required Arguments: +# oktorun - Callback that returns true if it's ok to continue running +# node - Node dbh on which to check for wsrep_flow_control_paused_ns +# sleep - Callback to sleep between checks. +# max_pct - Max percent of flow control caused pause time to tolerate +# +# Returns: +# FlowControlWaiter object +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(oktorun node sleep max_flow_ctl); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + + my $self = { + %args + }; + + # Get current hi-res epoch seconds + $self->{last_time} = time(); + + # Get nanoseconds server has been paused due to Flow Control + my (undef, $last_fc_ns) = $self->{node}->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + + # Convert to seconds (float) + $self->{last_fc_secs} = $last_fc_ns/1000_000_000; + + return bless $self, $class; +} + +# Sub: wait +# Wait for average flow control paused time fall below --max-flow-ctl +# +# Optional Arguments: +# Progress - object to report waiting +# +# Returns: +# 1 if average falls below max before timeout, else 0 if continue=yes, else die. +sub wait { + my ( $self, %args ) = @_; + my @required_args = qw(); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my $pr = $args{Progress}; + + my $oktorun = $self->{oktorun}; + my $sleep = $self->{sleep}; + my $node = $self->{node}; + my $max_avg = $self->{max_flow_ctl}/100; + + my $too_much_fc = 1; + + my $pr_callback; + if ( $pr ) { + # If you use the default Progress report callback, you'll need to + # to add Transformers.pm to this tool. + $pr_callback = sub { + print STDERR "Pausing because PXC Flow Control is active\n"; + return; + }; + $pr->set_callback($pr_callback); + } + + # Loop where we wait for average pausing time caused by FC to fall below --max-flow-ctl + # Average pause time is calculated starting from the last iteration. + while ( $oktorun->() && $too_much_fc ) { + my $current_time = time(); + my (undef, $current_fc_ns) = $node->selectrow_array('SHOW STATUS LIKE "wsrep_flow_control_paused_ns"'); + my $current_fc_secs = $current_fc_ns/1000_000_000; + my $current_avg = ($current_fc_secs - $self->{last_fc_secs}) / ($current_time - $self->{last_time}); + if ( $current_avg > $max_avg ) { + if ( $pr ) { + # There's no real progress because we can't estimate how long + # it will take the values to abate. + $pr->update(sub { return 0; }); + } + PTDEBUG && _d('Calling sleep callback'); + $sleep->(); + } else { + $too_much_fc = 0; + } + $self->{last_time} = $current_time; + $self->{last_fc_secs} = $current_fc_secs; + + + } + + PTDEBUG && _d('Flow Control is Ok'); + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End FlowControlWaiter package +# ###########################################################################