mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-01 18:25:59 +00:00
221 lines
7.1 KiB
Perl
221 lines
7.1 KiB
Perl
# This program is copyright 2011 Percona Ireland Ltd.
|
|
# Feedback and improvements are welcome.
|
|
#
|
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
|
# licenses.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
# ###########################################################################
|
|
# Pipeline package
|
|
# ###########################################################################
|
|
{
|
|
# Package: Pipeline
|
|
# Pipeline executes and controls a list of pipeline processes.
|
|
package Pipeline;
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use English qw(-no_match_vars);
|
|
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
|
|
|
use Data::Dumper;
|
|
$Data::Dumper::Indent = 1;
|
|
$Data::Dumper::Sortkeys = 1;
|
|
$Data::Dumper::Quotekeys = 0;
|
|
use Time::HiRes qw(time);
|
|
|
|
sub new {
|
|
my ( $class, %args ) = @_;
|
|
my @required_args = qw();
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless defined $args{$arg};
|
|
}
|
|
|
|
my $self = {
|
|
# default values for optional args
|
|
instrument => PTDEBUG,
|
|
continue_on_error => 0,
|
|
|
|
# specified arg values override defaults
|
|
%args,
|
|
|
|
# private/internal vars
|
|
procs => [], # coderefs for pipeline processes
|
|
names => [], # names for each ^ pipeline proc
|
|
instrumentation => { # keyed on proc index in procs
|
|
Pipeline => {
|
|
time => 0,
|
|
calls => 0,
|
|
},
|
|
},
|
|
};
|
|
return bless $self, $class;
|
|
}
|
|
|
|
sub add {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(process name);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless defined $args{$arg};
|
|
}
|
|
my ($process, $name) = @args{@required_args};
|
|
|
|
push @{$self->{procs}}, $process;
|
|
push @{$self->{names}}, $name;
|
|
$self->{retries}->{$name} = $args{retry_on_error} || 100;
|
|
if ( $self->{instrument} ) {
|
|
$self->{instrumentation}->{$name} = { time => 0, calls => 0 };
|
|
}
|
|
PTDEBUG && _d("Added pipeline process", $name);
|
|
|
|
return;
|
|
}
|
|
|
|
sub processes {
|
|
my ( $self ) = @_;
|
|
return @{$self->{names}};
|
|
}
|
|
|
|
# Sub: execute
|
|
# Execute all pipeline processes until not oktorun. The oktorun arg
|
|
# must be a reference. The pipeline will run until oktorun is false.
|
|
# The oktorun ref is passed to every pipeline proc so they can completely
|
|
# terminate pipeline execution. A proc signals that it wants to restart
|
|
# execution of the pipeline from the first proc by returning undef.
|
|
# If a proc both sets oktorun to false and returns undef, this sub will
|
|
# return with some info about where the pipeline stopped.
|
|
#
|
|
# Parameters:
|
|
# %args - Arguments passed to each pipeline process.
|
|
#
|
|
# Required Arguments:
|
|
# oktorun - Scalar ref that indicates it's ok to run when true.
|
|
#
|
|
# Optional Arguments:
|
|
# pipeline_data - Hashref passed through all processes.
|
|
#
|
|
# Returns:
|
|
# Hashref with information about where and why the pipeline terminated.
|
|
sub execute {
|
|
my ( $self, %args ) = @_;
|
|
|
|
die "Cannot execute pipeline because no process have been added"
|
|
unless scalar @{$self->{procs}};
|
|
|
|
my $oktorun = $args{oktorun};
|
|
die "I need an oktorun argument" unless $oktorun;
|
|
die '$oktorun argument must be a reference' unless ref $oktorun;
|
|
|
|
my $pipeline_data = $args{pipeline_data} || {};
|
|
$pipeline_data->{oktorun} = $oktorun;
|
|
|
|
my $stats = $args{stats}; # optional
|
|
|
|
PTDEBUG && _d("Pipeline starting at", time);
|
|
my $instrument = $self->{instrument};
|
|
my $processes = $self->{procs};
|
|
EVENT:
|
|
while ( $$oktorun ) {
|
|
my $procno = 0; # so we can see which proc if one causes an error
|
|
my $output;
|
|
eval {
|
|
PIPELINE_PROCESS:
|
|
while ( $procno < scalar @{$self->{procs}} ) {
|
|
my $call_start = $instrument ? time : 0;
|
|
|
|
# Execute this pipeline process.
|
|
PTDEBUG && _d("Pipeline process", $self->{names}->[$procno]);
|
|
$output = $processes->[$procno]->($pipeline_data);
|
|
|
|
if ( $instrument ) {
|
|
my $call_end = time;
|
|
my $call_t = $call_end - $call_start;
|
|
$self->{instrumentation}->{$self->{names}->[$procno]}->{time} += $call_t;
|
|
$self->{instrumentation}->{$self->{names}->[$procno]}->{count}++;
|
|
$self->{instrumentation}->{Pipeline}->{time} += $call_t;
|
|
$self->{instrumentation}->{Pipeline}->{count}++;
|
|
}
|
|
if ( !$output ) {
|
|
PTDEBUG && _d("Pipeline restarting early after",
|
|
$self->{names}->[$procno]);
|
|
if ( $stats ) {
|
|
$stats->{"pipeline_restarted_after_"
|
|
.$self->{names}->[$procno]}++;
|
|
}
|
|
last PIPELINE_PROCESS;
|
|
}
|
|
$procno++;
|
|
}
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
my $name = $self->{names}->[$procno] || "";
|
|
my $msg = "Pipeline process " . ($procno + 1)
|
|
. " ($name) caused an error: "
|
|
. $EVAL_ERROR;
|
|
if ( !$self->{continue_on_error} ) {
|
|
die $msg . "Terminating pipeline because --continue-on-error "
|
|
. "is false.\n";
|
|
}
|
|
elsif ( defined $self->{retries}->{$name} ) {
|
|
my $n = $self->{retries}->{$name};
|
|
if ( $n ) {
|
|
warn $msg . "Will retry pipeline process $procno ($name) "
|
|
. "$n more " . ($n > 1 ? "times" : "time") . ".\n";
|
|
$self->{retries}->{$name}--;
|
|
}
|
|
else {
|
|
die $msg . "Terminating pipeline because process $procno "
|
|
. "($name) caused too many errors.\n";
|
|
}
|
|
}
|
|
else {
|
|
warn $msg;
|
|
}
|
|
}
|
|
}
|
|
|
|
PTDEBUG && _d("Pipeline stopped at", time);
|
|
return;
|
|
}
|
|
|
|
sub instrumentation {
|
|
my ( $self ) = @_;
|
|
return $self->{instrumentation};
|
|
}
|
|
|
|
sub reset {
|
|
my ( $self ) = @_;
|
|
foreach my $proc_name ( @{$self->{names}} ) {
|
|
if ( exists $self->{instrumentation}->{$proc_name} ) {
|
|
$self->{instrumentation}->{$proc_name}->{calls} = 0;
|
|
$self->{instrumentation}->{$proc_name}->{time} = 0;
|
|
}
|
|
}
|
|
$self->{instrumentation}->{Pipeline}->{calls} = 0;
|
|
$self->{instrumentation}->{Pipeline}->{time} = 0;
|
|
return;
|
|
}
|
|
|
|
sub _d {
|
|
my ($package, undef, $line) = caller 0;
|
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
|
map { defined $_ ? $_ : 'undef' }
|
|
@_;
|
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
|
}
|
|
|
|
1;
|
|
}
|
|
# ###########################################################################
|
|
# End Pipeline package
|
|
# ###########################################################################
|