Files
percona-toolkit/lib/QueryIterator.pm
Daniel Nichter 51af4bde2f Fix --progress.
2013-03-11 17:38:42 -06:00

313 lines
7.0 KiB
Perl

# This program is copyright 2013 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# QueryIterator package
# ###########################################################################
{
package QueryIterator;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use POSIX qw(signal_h);
use Data::Dumper;
use Lmo;
##
# Required
##
has 'file_iter' => (
is => 'ro',
isa => 'CodeRef',
required => 1,
);
has 'parser' => (
is => 'ro',
isa => 'CodeRef',
required => 1,
);
has 'fingerprint' => (
is => 'ro',
isa => 'CodeRef',
required => 1,
);
has 'oktorun' => (
is => 'ro',
isa => 'CodeRef',
required => 1,
);
##
# Optional
##
has 'filter' => (
is => 'ro',
isa => 'CodeRef',
required => 0,
);
has 'read_only' => (
is => 'ro',
isa => 'Bool',
required => 0,
default => 0,
);
has 'read_timeout' => (
is => 'ro',
isa => 'Int',
required => 0,
default => 0,
);
has 'progress' => (
is => 'ro',
isa => 'Maybe[Str]',
required => 0,
default => sub { return },
);
##
# Private
##
has '_progress' => (
is => 'rw',
isa => 'Maybe[Object]',
required => 0,
default => sub { return },
);
has 'stats' => (
is => 'ro',
isa => 'HashRef',
required => 0,
default => sub { return {} },
);
has '_fh' => (
is => 'rw',
isa => 'Maybe[FileHandle]',
required => 0,
);
has '_file_name' => (
is => 'rw',
isa => 'Maybe[Str]',
required => 0,
);
has '_file_size' => (
is => 'rw',
isa => 'Maybe[Int]',
required => 0,
);
has '_offset' => (
is => 'rw',
isa => 'Maybe[Int]',
required => 0,
);
has '_parser_args' => (
is => 'rw',
isa => 'HashRef',
required => 0,
);
sub BUILDARGS {
my $class = shift;
my $args = $class->SUPER::BUILDARGS(@_);
my $filter_code;
if ( my $filter = $args->{filter} ) {
if ( -f $filter && -r $filter ) {
PTDEBUG && _d('Reading file', $filter, 'for --filter code');
open my $fh, "<", $filter or die "Cannot open $filter: $OS_ERROR";
$filter = do { local $/ = undef; <$fh> };
close $fh;
}
else {
$filter = "( $filter )"; # issue 565
}
my $code = "sub {
PTDEBUG && _d('callback: filter');
my(\$event) = shift;
$filter && return \$event;
};";
PTDEBUG && _d('--filter code:', $code);
$filter_code = eval $code
or die "Error compiling --filter code: $code\n$EVAL_ERROR";
}
else {
$filter_code = sub { return 1 };
}
my $self = {
%$args,
filter => $filter_code,
};
return $self;
}
sub next {
my ($self) = @_;
if ( !$self->_fh ) {
my ($fh, $file_name, $file_size) = $self->file_iter->();
return unless $fh;
PTDEBUG && _d('Reading', $file_name);
$self->_fh($fh);
$self->_file_name($file_name);
$self->_file_size($file_size);
my $parser_args = {};
if ( my $read_timeout = $self->read_timeout ) {
$parser_args->{next_event}
= sub { return _read_timeout($fh, $read_timeout); };
}
else {
$parser_args->{next_event} = sub { return <$fh>; };
}
$parser_args->{tell} = sub {
my $offset = tell $fh; # update global $offset
$self->_offset($offset);
return $offset; # legacy: return global $offset
};
my $_progress;
if ( my $spec = $self->progress ) {
$_progress = new Progress(
jobsize => $file_size,
spec => $spec,
name => $file_name,
);
}
$self->_progress($_progress);
$self->_parser_args($parser_args);
}
EVENT:
while (
$self->oktorun
&& (my $event = $self->parser->(%{ $self->_parser_args }) )
) {
$self->stats->{queries_read}++;
if ( my $pr = $self->_progress ) {
$pr->update($self->_parser_args->{tell});
}
if ( ($event->{cmd} || '') ne 'Query' ) {
PTDEBUG && _d('Skipping non-Query cmd');
$self->stats->{not_query}++;
next EVENT;
}
if ( !$event->{arg} ) {
PTDEBUG && _d('Skipping empty arg');
$self->stats->{empty_query}++;
next EVENT;
}
if ( !$self->filter->($event) ) {
$self->stats->{queries_filtered}++;
next EVENT;
}
if ( $self->read_only ) {
if ( $event->{arg} !~ m{^(?:/\*[^!].*?\*/)?\s*(?:SELECT|SET)}i ) {
PTDEBUG && _d('Skipping non-SELECT query');
$self->stats->{not_select}++;
next EVENT;
}
}
$event->{fingerprint} = $self->fingerprint->($event->{arg});
return $event;
}
PTDEBUG && _d('Done reading', $self->_file_name);
close $self->_fh if $self->_fh;
$self->_fh(undef);
$self->_file_name(undef);
$self->_file_size(undef);
return;
}
# Read the fh and timeout after t seconds.
sub _read_timeout {
my ( $fh, $t ) = @_;
return unless $fh;
$t ||= 0; # will reset alarm and cause read to wait forever
# Set the SIGALRM handler.
my $mask = POSIX::SigSet->new(&POSIX::SIGALRM);
my $action = POSIX::SigAction->new(
sub {
# This sub is called when a SIGALRM is received.
die 'read timeout';
},
$mask,
);
my $oldaction = POSIX::SigAction->new();
sigaction(&POSIX::SIGALRM, $action, $oldaction);
my $res;
eval {
alarm $t;
$res = <$fh>;
alarm 0;
};
if ( $EVAL_ERROR ) {
PTDEBUG && _d('Read error:', $EVAL_ERROR);
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
$res = undef; # res is a blank string after a timeout
}
return $res;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
no Lmo;
1;
}
# ###########################################################################
# End QueryIterator package
# ###########################################################################