mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-10-22 19:04:30 +00:00
First working scaffolding and fondation of host-to-host comparison.
This commit is contained in:
@@ -25,7 +25,20 @@ use warnings FATAL => 'all';
|
||||
use English qw(-no_match_vars);
|
||||
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
||||
|
||||
use Mo;
|
||||
use POSIX qw(signal_h);
|
||||
use Data::Dumper;
|
||||
|
||||
use Lmo;
|
||||
|
||||
##
|
||||
# Required
|
||||
##
|
||||
|
||||
has 'file_iter' => (
|
||||
is => 'ro',
|
||||
isa => 'CodeRef',
|
||||
required => 1,
|
||||
);
|
||||
|
||||
has 'parser' => (
|
||||
is => 'ro',
|
||||
@@ -33,13 +46,23 @@ has 'parser' => (
|
||||
required => 1,
|
||||
);
|
||||
|
||||
has 'fingerprint' => (
|
||||
is => 'ro',
|
||||
isa => 'CodeRef',
|
||||
required => 1,
|
||||
);
|
||||
|
||||
has 'oktorun' => (
|
||||
is => 'ro',
|
||||
isa => 'CodeRef',
|
||||
required => 1,
|
||||
);
|
||||
|
||||
has 'database' => (
|
||||
##
|
||||
# Optional
|
||||
##
|
||||
|
||||
has 'default_database' => (
|
||||
is => 'rw',
|
||||
isa => 'Maybe[Str]',
|
||||
required => 0,
|
||||
@@ -58,10 +81,66 @@ has 'read_only' => (
|
||||
default => 0,
|
||||
);
|
||||
|
||||
has 'read_timeout' => (
|
||||
is => 'ro',
|
||||
isa => 'Int',
|
||||
required => 0,
|
||||
default => 0,
|
||||
);
|
||||
|
||||
##
|
||||
# Private
|
||||
##
|
||||
|
||||
has 'stats' => (
|
||||
is => 'ro',
|
||||
isa => 'HashRef',
|
||||
required => 0,
|
||||
default => sub { return {} },
|
||||
);
|
||||
|
||||
has 'database' => (
|
||||
is => 'rw',
|
||||
isa => 'Maybe[Str]',
|
||||
required => 0,
|
||||
);
|
||||
|
||||
has '_fh' => (
|
||||
is => 'rw',
|
||||
isa => 'Maybe[FileHandle]',
|
||||
required => 0,
|
||||
);
|
||||
|
||||
has '_file_name' => (
|
||||
is => 'rw',
|
||||
isa => 'Maybe[Str]',
|
||||
required => 0,
|
||||
);
|
||||
|
||||
has '_file_size' => (
|
||||
is => 'rw',
|
||||
isa => 'Maybe[Int]',
|
||||
required => 0,
|
||||
);
|
||||
|
||||
has '_offset' => (
|
||||
is => 'rw',
|
||||
isa => 'Maybe[Int]',
|
||||
required => 0,
|
||||
);
|
||||
|
||||
has '_parser_args' => (
|
||||
is => 'rw',
|
||||
isa => 'HashRef',
|
||||
required => 0,
|
||||
);
|
||||
|
||||
sub BUILDARGS {
|
||||
my $class = shift;
|
||||
my $args = $class->SUPER::BUILDARGS(@_);
|
||||
|
||||
my $filter_code;
|
||||
if ( my $filter = $args{filter} ) {
|
||||
if ( my $filter = $args->{filter} ) {
|
||||
if ( -f $filter && -r $filter ) {
|
||||
PTDEBUG && _d('Reading file', $filter, 'for --filter code');
|
||||
open my $fh, "<", $filter or die "Cannot open $filter: $OS_ERROR";
|
||||
@@ -71,7 +150,11 @@ sub BUILDARGS {
|
||||
else {
|
||||
$filter = "( $filter )"; # issue 565
|
||||
}
|
||||
my $code = "sub { PTDEBUG && _d('callback: filter'); my(\$event) = shift; $filter && return \$event; };";
|
||||
my $code = "sub {
|
||||
PTDEBUG && _d('callback: filter');
|
||||
my(\$event) = shift;
|
||||
$filter && return \$event;
|
||||
};";
|
||||
PTDEBUG && _d('--filter code:', $code);
|
||||
$filter_code = eval $code
|
||||
or die "Error compiling --filter code: $code\n$EVAL_ERROR";
|
||||
@@ -80,45 +163,82 @@ sub BUILDARGS {
|
||||
$filter_code = sub { return 1 };
|
||||
}
|
||||
|
||||
my $self = {
|
||||
%$args,
|
||||
filter => $filter_code,
|
||||
};
|
||||
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub next {
|
||||
my ($self) = @_;
|
||||
|
||||
if ( !$self->_fh ) {
|
||||
my ($fh, $file_name, $file_size) = $self->file_iter->();
|
||||
return unless $fh;
|
||||
|
||||
PTDEBUG && _d('Reading', $file_name);
|
||||
$self->_fh($fh);
|
||||
$self->_file_name($file_name);
|
||||
$self->_file_size($file_size);
|
||||
|
||||
my $parser_args = {};
|
||||
|
||||
if ( my $read_timeout = $self->read_timeout ) {
|
||||
$parser_args->{next_event}
|
||||
= sub { return _read_timeout($fh, $read_timeout); };
|
||||
}
|
||||
else {
|
||||
$parser_args->{next_event} = sub { return <$fh>; };
|
||||
}
|
||||
|
||||
$parser_args->{tell} = sub {
|
||||
my $offset = tell $fh; # update global $offset
|
||||
$self->_offset($offset);
|
||||
return $offset; # legacy: return global $offset
|
||||
};
|
||||
|
||||
$self->_parser_args($parser_args);
|
||||
}
|
||||
|
||||
EVENT:
|
||||
while (
|
||||
$self->oktorun()
|
||||
&& (my $event = $parser->parse_event(%args))
|
||||
$self->oktorun
|
||||
&& (my $event = $self->parser->parse_event(%{ $self->_parser_args }) )
|
||||
) {
|
||||
|
||||
$self->stats->{events}++;
|
||||
$self->stats->{queries_read}++;
|
||||
|
||||
if ( ($event->{cmd} || '') ne 'Query' ) {
|
||||
PTDEBUG && _d('Skipping non-Query cmd');
|
||||
$stats->{not_query}++;
|
||||
$self->stats->{not_query}++;
|
||||
next EVENT;
|
||||
}
|
||||
|
||||
if ( !$event->{arg} ) {
|
||||
PTDEBUG && _d('Skipping empty arg');
|
||||
$stats->{empty_query}++;
|
||||
$self->stats->{empty_query}++;
|
||||
next EVENT;
|
||||
}
|
||||
|
||||
if ( !$self->filter->($event) ) {
|
||||
$self->stats->{queries_filtered}++;
|
||||
next EVENT;
|
||||
}
|
||||
|
||||
next EVENT unless $self->filter->();
|
||||
|
||||
if ( $self->read_only ) {
|
||||
if ( $event->{arg} !~ m/(?:^SELECT|(?:\*\/\s*SELECT))/i ) {
|
||||
PTDEBUG && _d('Skipping non-SELECT query');
|
||||
$stats->{not_select}++;
|
||||
$self->stats->{not_select}++;
|
||||
next EVENT;
|
||||
}
|
||||
}
|
||||
|
||||
$event->{fingerprint} = $qr->fingerprint($event->{arg});
|
||||
$event->{fingerprint} = $self->fingerprint->($event->{arg});
|
||||
|
||||
my $db = $event->{db} || $event->{Schema} || $hosts->[0]->{dsn}->{D};
|
||||
if ( $db && (!$current_db || $db ne $current_db) ) {
|
||||
my $current_db = $self->database;
|
||||
my $db = $event->{db} || $event->{Schema} || $self->default_database;
|
||||
if ( $db && (!$current_db || $current_db ne $db) ) {
|
||||
$self->database($db);
|
||||
}
|
||||
else {
|
||||
@@ -126,12 +246,58 @@ sub next {
|
||||
}
|
||||
|
||||
return $event;
|
||||
} # EVENT
|
||||
}
|
||||
|
||||
PTDEBUG && _d('Done reading', $self->_file_name);
|
||||
close $self->_fh if $self->_fh;
|
||||
$self->_fh(undef);
|
||||
$self->_file_name(undef);
|
||||
$self->_file_size(undef);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
no Mo;
|
||||
# Read the fh and timeout after t seconds.
|
||||
sub _read_timeout {
|
||||
my ( $fh, $t ) = @_;
|
||||
return unless $fh;
|
||||
$t ||= 0; # will reset alarm and cause read to wait forever
|
||||
|
||||
# Set the SIGALRM handler.
|
||||
my $mask = POSIX::SigSet->new(&POSIX::SIGALRM);
|
||||
my $action = POSIX::SigAction->new(
|
||||
sub {
|
||||
# This sub is called when a SIGALRM is received.
|
||||
die 'read timeout';
|
||||
},
|
||||
$mask,
|
||||
);
|
||||
my $oldaction = POSIX::SigAction->new();
|
||||
sigaction(&POSIX::SIGALRM, $action, $oldaction);
|
||||
|
||||
my $res;
|
||||
eval {
|
||||
alarm $t;
|
||||
$res = <$fh>;
|
||||
alarm 0;
|
||||
};
|
||||
if ( $EVAL_ERROR ) {
|
||||
PTDEBUG && _d('Read error:', $EVAL_ERROR);
|
||||
die $EVAL_ERROR unless $EVAL_ERROR =~ m/read timeout/;
|
||||
$res = undef; # res is a blank string after a timeout
|
||||
}
|
||||
return $res;
|
||||
}
|
||||
|
||||
sub _d {
|
||||
my ($package, undef, $line) = caller 0;
|
||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||
map { defined $_ ? $_ : 'undef' }
|
||||
@_;
|
||||
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
||||
}
|
||||
|
||||
no Lmo;
|
||||
1;
|
||||
}
|
||||
# ###########################################################################
|
||||
|
Reference in New Issue
Block a user