Files
percona-toolkit/lib/Pipeline.pm
2013-01-31 14:52:34 -03:00

221 lines
7.1 KiB
Perl

# This program is copyright 2011 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# Pipeline package
# ###########################################################################
{
# Package: Pipeline
# Pipeline executes and controls a list of pipeline processes.
package Pipeline;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use Data::Dumper;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
use Time::HiRes qw(time);
sub new {
my ( $class, %args ) = @_;
my @required_args = qw();
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my $self = {
# default values for optional args
instrument => PTDEBUG,
continue_on_error => 0,
# specified arg values override defaults
%args,
# private/internal vars
procs => [], # coderefs for pipeline processes
names => [], # names for each ^ pipeline proc
instrumentation => { # keyed on proc index in procs
Pipeline => {
time => 0,
calls => 0,
},
},
};
return bless $self, $class;
}
sub add {
my ( $self, %args ) = @_;
my @required_args = qw(process name);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my ($process, $name) = @args{@required_args};
push @{$self->{procs}}, $process;
push @{$self->{names}}, $name;
$self->{retries}->{$name} = $args{retry_on_error} || 100;
if ( $self->{instrument} ) {
$self->{instrumentation}->{$name} = { time => 0, calls => 0 };
}
PTDEBUG && _d("Added pipeline process", $name);
return;
}
sub processes {
my ( $self ) = @_;
return @{$self->{names}};
}
# Sub: execute
# Execute all pipeline processes until not oktorun. The oktorun arg
# must be a reference. The pipeline will run until oktorun is false.
# The oktorun ref is passed to every pipeline proc so they can completely
# terminate pipeline execution. A proc signals that it wants to restart
# execution of the pipeline from the first proc by returning undef.
# If a proc both sets oktorun to false and returns undef, this sub will
# return with some info about where the pipeline stopped.
#
# Parameters:
# %args - Arguments passed to each pipeline process.
#
# Required Arguments:
# oktorun - Scalar ref that indicates it's ok to run when true.
#
# Optional Arguments:
# pipeline_data - Hashref passed through all processes.
#
# Returns:
# Hashref with information about where and why the pipeline terminated.
sub execute {
my ( $self, %args ) = @_;
die "Cannot execute pipeline because no process have been added"
unless scalar @{$self->{procs}};
my $oktorun = $args{oktorun};
die "I need an oktorun argument" unless $oktorun;
die '$oktorun argument must be a reference' unless ref $oktorun;
my $pipeline_data = $args{pipeline_data} || {};
$pipeline_data->{oktorun} = $oktorun;
my $stats = $args{stats}; # optional
PTDEBUG && _d("Pipeline starting at", time);
my $instrument = $self->{instrument};
my $processes = $self->{procs};
EVENT:
while ( $$oktorun ) {
my $procno = 0; # so we can see which proc if one causes an error
my $output;
eval {
PIPELINE_PROCESS:
while ( $procno < scalar @{$self->{procs}} ) {
my $call_start = $instrument ? time : 0;
# Execute this pipeline process.
PTDEBUG && _d("Pipeline process", $self->{names}->[$procno]);
$output = $processes->[$procno]->($pipeline_data);
if ( $instrument ) {
my $call_end = time;
my $call_t = $call_end - $call_start;
$self->{instrumentation}->{$self->{names}->[$procno]}->{time} += $call_t;
$self->{instrumentation}->{$self->{names}->[$procno]}->{count}++;
$self->{instrumentation}->{Pipeline}->{time} += $call_t;
$self->{instrumentation}->{Pipeline}->{count}++;
}
if ( !$output ) {
PTDEBUG && _d("Pipeline restarting early after",
$self->{names}->[$procno]);
if ( $stats ) {
$stats->{"pipeline_restarted_after_"
.$self->{names}->[$procno]}++;
}
last PIPELINE_PROCESS;
}
$procno++;
}
};
if ( $EVAL_ERROR ) {
my $name = $self->{names}->[$procno] || "";
my $msg = "Pipeline process " . ($procno + 1)
. " ($name) caused an error: "
. $EVAL_ERROR;
if ( !$self->{continue_on_error} ) {
die $msg . "Terminating pipeline because --continue-on-error "
. "is false.\n";
}
elsif ( defined $self->{retries}->{$name} ) {
my $n = $self->{retries}->{$name};
if ( $n ) {
warn $msg . "Will retry pipeline process $procno ($name) "
. "$n more " . ($n > 1 ? "times" : "time") . ".\n";
$self->{retries}->{$name}--;
}
else {
die $msg . "Terminating pipeline because process $procno "
. "($name) caused too many errors.\n";
}
}
else {
warn $msg;
}
}
}
PTDEBUG && _d("Pipeline stopped at", time);
return;
}
sub instrumentation {
my ( $self ) = @_;
return $self->{instrumentation};
}
sub reset {
my ( $self ) = @_;
foreach my $proc_name ( @{$self->{names}} ) {
if ( exists $self->{instrumentation}->{$proc_name} ) {
$self->{instrumentation}->{$proc_name}->{calls} = 0;
$self->{instrumentation}->{$proc_name}->{time} = 0;
}
}
$self->{instrumentation}->{Pipeline}->{calls} = 0;
$self->{instrumentation}->{Pipeline}->{time} = 0;
return;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End Pipeline package
# ###########################################################################