Test and fix all the log --types. Add backwards-compat null_event arg to MySQLProtocolParser.

This commit is contained in:
Daniel Nichter
2013-03-11 16:13:38 -06:00
parent f00d53adc3
commit 3c07da8499
8 changed files with 610 additions and 35 deletions

View File

@@ -6655,9 +6655,6 @@ sub main {
# just tweak the code below to be like pt-query-digest.
my %alias_for = (
slowlog => ['SlowLogParser'],
# binlog => ['BinaryLogParser'],
# genlog => ['GeneralLogParser'],
# tcpdump => ['TcpdumpParser','MySQLProtocolParser'],
);
my $type = ['slowlog'];
$type = $alias_for{$type->[0]} if $alias_for{$type->[0]};

View File

@@ -41,6 +41,8 @@ BEGIN {
SlowLogParser
GeneralLogParser
BinaryLogParser
RawLogParser
ProtocolParser
TcpdumpParser
MySQLProtocolParser
Runtime
@@ -5104,7 +5106,7 @@ has 'file_iter' => (
has 'parser' => (
is => 'ro',
isa => 'Object',
isa => 'CodeRef',
required => 1,
);
@@ -5249,7 +5251,7 @@ sub next {
EVENT:
while (
$self->oktorun
&& (my $event = $self->parser->parse_event(%{ $self->_parser_args }) )
&& (my $event = $self->parser->(%{ $self->_parser_args }) )
) {
$self->stats->{queries_read}++;
@@ -6897,6 +6899,389 @@ sub _d {
# End BinaryLogParser package
# ###########################################################################
# ###########################################################################
# RawLogParser package
# This package is a copy without comments from the original. The original
# with comments and its test file can be found in the Bazaar repository at,
# lib/RawLogParser.pm
# t/lib/RawLogParser.t
# See https://launchpad.net/percona-toolkit for more information.
# ###########################################################################
{
package RawLogParser;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use Data::Dumper;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
sub new {
my ( $class ) = @_;
my $self = {
};
return bless $self, $class;
}
sub parse_event {
my ( $self, %args ) = @_;
my @required_args = qw(next_event tell);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($next_event, $tell) = @args{@required_args};
my $line;
my $pos_in_log = $tell->();
LINE:
while ( defined($line = $next_event->()) ) {
PTDEBUG && _d($line);
chomp($line);
my @properties = (
'pos_in_log', $pos_in_log,
'cmd', 'Query',
'bytes', length($line),
'Query_time', 0,
'arg', $line,
);
$pos_in_log = $tell->();
PTDEBUG && _d('Properties of event:', Dumper(\@properties));
my $event = { @properties };
if ( $args{stats} ) {
$args{stats}->{events_read}++;
$args{stats}->{events_parsed}++;
}
return $event;
}
$args{oktorun}->(0) if $args{oktorun};
return;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End RawLogParser package
# ###########################################################################
# ###########################################################################
# ProtocolParser package
# This package is a copy without comments from the original. The original
# with comments and its test file can be found in the Bazaar repository at,
# lib/ProtocolParser.pm
# t/lib/ProtocolParser.t
# See https://launchpad.net/percona-toolkit for more information.
# ###########################################################################
{
package ProtocolParser;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use File::Basename qw(basename);
use File::Temp qw(tempfile);
eval {
require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib
IO::Uncompress::Inflate->import(qw(inflate $InflateError));
};
use Data::Dumper;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
sub new {
my ( $class, %args ) = @_;
my $self = {
server => $args{server},
port => $args{port},
sessions => {},
o => $args{o},
};
return bless $self, $class;
}
sub parse_event {
my ( $self, %args ) = @_;
my @required_args = qw(event);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $packet = @args{@required_args};
if ( $self->{buffer} ) {
my ($packet_from, $session) = $self->_get_session($packet);
if ( $packet->{data_len} ) {
if ( $packet_from eq 'client' ) {
push @{$session->{client_packets}}, $packet;
PTDEBUG && _d('Saved client packet');
}
else {
push @{$session->{server_packets}}, $packet;
PTDEBUG && _d('Saved server packet');
}
}
return unless ($packet_from eq 'client')
&& ($packet->{fin} || $packet->{rst});
my $event;
map {
$event = $self->_parse_packet($_, $args{misc});
$args{stats}->{events_parsed}++ if $args{stats};
} sort { $a->{seq} <=> $b->{seq} }
@{$session->{client_packets}};
map {
$event = $self->_parse_packet($_, $args{misc});
$args{stats}->{events_parsed}++ if $args{stats};
} sort { $a->{seq} <=> $b->{seq} }
@{$session->{server_packets}};
return $event;
}
if ( $packet->{data_len} == 0 ) {
PTDEBUG && _d('No TCP data');
return;
}
my $event = $self->_parse_packet($packet, $args{misc});
$args{stats}->{events_parsed}++ if $args{stats};
return $event;
}
sub _parse_packet {
my ( $self, $packet, $misc ) = @_;
my ($packet_from, $session) = $self->_get_session($packet);
PTDEBUG && _d('State:', $session->{state});
push @{$session->{raw_packets}}, $packet->{raw_packet}
unless $misc->{recurse};
if ( $session->{buff} ) {
$session->{buff_left} -= $packet->{data_len};
if ( $session->{buff_left} > 0 ) {
PTDEBUG && _d('Added data to buff; expecting', $session->{buff_left},
'more bytes');
return;
}
PTDEBUG && _d('Got all data; buff left:', $session->{buff_left});
$packet->{data} = $session->{buff} . $packet->{data};
$packet->{data_len} += length $session->{buff};
$session->{buff} = '';
$session->{buff_left} = 0;
}
$packet->{data} = pack('H*', $packet->{data}) unless $misc->{recurse};
my $event;
if ( $packet_from eq 'server' ) {
$event = $self->_packet_from_server($packet, $session, $misc);
}
elsif ( $packet_from eq 'client' ) {
$event = $self->_packet_from_client($packet, $session, $misc);
}
else {
die 'Packet origin unknown';
}
PTDEBUG && _d('State:', $session->{state});
if ( $session->{out_of_order} ) {
PTDEBUG && _d('Session packets are out of order');
push @{$session->{packets}}, $packet;
$session->{ts_min}
= $packet->{ts} if $packet->{ts} lt ($session->{ts_min} || '');
$session->{ts_max}
= $packet->{ts} if $packet->{ts} gt ($session->{ts_max} || '');
if ( $session->{have_all_packets} ) {
PTDEBUG && _d('Have all packets; ordering and processing');
delete $session->{out_of_order};
delete $session->{have_all_packets};
map {
$event = $self->_parse_packet($_, { recurse => 1 });
} sort { $a->{seq} <=> $b->{seq} } @{$session->{packets}};
}
}
PTDEBUG && _d('Done with packet; event:', Dumper($event));
return $event;
}
sub _get_session {
my ( $self, $packet ) = @_;
my $src_host = "$packet->{src_host}:$packet->{src_port}";
my $dst_host = "$packet->{dst_host}:$packet->{dst_port}";
if ( my $server = $self->{server} ) { # Watch only the given server.
$server .= ":$self->{port}";
if ( $src_host ne $server && $dst_host ne $server ) {
PTDEBUG && _d('Packet is not to or from', $server);
return;
}
}
my $packet_from;
my $client;
if ( $src_host =~ m/:$self->{port}$/ ) {
$packet_from = 'server';
$client = $dst_host;
}
elsif ( $dst_host =~ m/:$self->{port}$/ ) {
$packet_from = 'client';
$client = $src_host;
}
else {
warn 'Packet is not to or from server: ', Dumper($packet);
return;
}
PTDEBUG && _d('Client:', $client);
if ( !exists $self->{sessions}->{$client} ) {
PTDEBUG && _d('New session');
$self->{sessions}->{$client} = {
client => $client,
state => undef,
raw_packets => [],
};
};
my $session = $self->{sessions}->{$client};
return $packet_from, $session;
}
sub _packet_from_server {
die "Don't call parent class _packet_from_server()";
}
sub _packet_from_client {
die "Don't call parent class _packet_from_client()";
}
sub make_event {
my ( $self, $session, $packet ) = @_;
die "Event has no attributes" unless scalar keys %{$session->{attribs}};
die "Query has no arg attribute" unless $session->{attribs}->{arg};
my $start_request = $session->{start_request} || 0;
my $start_reply = $session->{start_reply} || 0;
my $end_reply = $session->{end_reply} || 0;
PTDEBUG && _d('Request start:', $start_request,
'reply start:', $start_reply, 'reply end:', $end_reply);
my $event = {
Query_time => $self->timestamp_diff($start_request, $start_reply),
Transmit_time => $self->timestamp_diff($start_reply, $end_reply),
};
@{$event}{keys %{$session->{attribs}}} = values %{$session->{attribs}};
return $event;
}
sub _get_errors_fh {
my ( $self ) = @_;
return $self->{errors_fh} if $self->{errors_fh};
my $exec = basename($0);
my ($errors_fh, $filename) = tempfile("/tmp/$exec-errors.XXXXXXX", UNLINK => 0);
$self->{errors_file} = $filename;
$self->{errors_fh} = $errors_fh;
return $errors_fh;
}
sub fail_session {
my ( $self, $session, $reason ) = @_;
PTDEBUG && _d('Failed session', $session->{client}, 'because', $reason);
delete $self->{sessions}->{$session->{client}};
return if $self->{_no_save_error};
my $errors_fh = $self->_get_errors_fh();
print "Session $session->{client} had errors, will save them in $self->{errors_file}\n";
my $raw_packets = delete $session->{raw_packets};
$session->{reason_for_failure} = $reason;
my $session_dump = '# ' . Dumper($session);
chomp $session_dump;
$session_dump =~ s/\n/\n# /g;
print $errors_fh join("\n", $session_dump, @$raw_packets), "\n";
return;
}
sub timestamp_diff {
my ( $self, $start, $end ) = @_;
return 0 unless $start && $end;
my $sd = substr($start, 0, 11, '');
my $ed = substr($end, 0, 11, '');
my ( $sh, $sm, $ss ) = split(/:/, $start);
my ( $eh, $em, $es ) = split(/:/, $end);
my $esecs = ($eh * 3600 + $em * 60 + $es);
my $ssecs = ($sh * 3600 + $sm * 60 + $ss);
if ( $sd eq $ed ) {
return sprintf '%.6f', $esecs - $ssecs;
}
else { # Assume only one day boundary has been crossed, no DST, etc
return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs;
}
}
sub uncompress_data {
my ( $self, $data, $len ) = @_;
die "I need data" unless $data;
die "I need a len argument" unless $len;
die "I need a scalar reference to data" unless ref $data eq 'SCALAR';
PTDEBUG && _d('Uncompressing data');
our $InflateError;
my $comp_bin_data = pack('H*', $$data);
my $uncomp_bin_data = '';
my $z = new IO::Uncompress::Inflate(
\$comp_bin_data
) or die "IO::Uncompress::Inflate failed: $InflateError";
my $status = $z->read(\$uncomp_bin_data, $len)
or die "IO::Uncompress::Inflate failed: $InflateError";
my $uncomp_data = unpack('H*', $uncomp_bin_data);
return \$uncomp_data;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End ProtocolParser package
# ###########################################################################
# ###########################################################################
# TcpdumpParser package
# This package is a copy without comments from the original. The original
@@ -7225,6 +7610,7 @@ sub new {
sessions => {},
o => $args{o},
fake_thread_id => 2**32, # see _make_event()
null_event => $args{null_event},
};
PTDEBUG && $self->{server} && _d('Watching only server', $self->{server});
return bless $self, $class;
@@ -7245,7 +7631,7 @@ sub parse_event {
$server .= ":$self->{port}";
if ( $src_host ne $server && $dst_host ne $server ) {
PTDEBUG && _d('Packet is not to or from', $server);
return;
return $self->{null_event};
}
}
@@ -7261,7 +7647,7 @@ sub parse_event {
}
else {
PTDEBUG && _d('Packet is not to or from a MySQL server');
return;
return $self->{null_event};
}
PTDEBUG && _d('Client', $client);
@@ -7279,7 +7665,7 @@ sub parse_event {
else {
PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,',
'packetno', $packetno);
return;
return $self->{null_event};
}
$self->{sessions}->{$client} = {
@@ -7322,7 +7708,7 @@ sub parse_event {
delete $self->{sessions}->{$session->{client}};
return $event;
}
return;
return $self->{null_event};
}
if ( $session->{compress} ) {
@@ -7348,7 +7734,7 @@ sub parse_event {
PTDEBUG && _d('remove_mysql_header() failed; failing session');
$session->{EVAL_ERROR} = $EVAL_ERROR;
$self->fail_session($session, 'remove_mysql_header() failed');
return;
return $self->{null_event};
}
}
@@ -7363,7 +7749,7 @@ sub parse_event {
$self->_delete_buff($session);
}
else {
return; # waiting for more data; buff_left was reported earlier
return $self->{null_event}; # waiting for more data; buff_left was reported earlier
}
}
elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) {
@@ -7384,7 +7770,7 @@ sub parse_event {
PTDEBUG && _d('Data not complete; expecting',
$session->{buff_left}, 'more bytes');
return;
return $self->{null_event};
}
if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) {
@@ -7407,7 +7793,7 @@ sub parse_event {
}
$args{stats}->{events_parsed}++ if $args{stats};
return $event;
return $event || $self->{null_event};
}
sub _packet_from_server {
@@ -8958,6 +9344,7 @@ sub make_parser {
$module->new(
server => $server,
port => $port,
null_event => {},
);
};
if ( $EVAL_ERROR ) {
@@ -8967,17 +9354,21 @@ sub make_parser {
}
if ( @parsers == 1 ) {
return $parsers[0];
return sub {
my (%args) = @_;
return $parsers[0]->parse_event(%args);
};
}
my $parser = sub {
my (%args) = @_;
my $event = $parsers[0]->parse_event(%args);
if ( $event ) {
while ( my $event = $parsers[0]->parse_event(%args) ) {
$args{event} = $event;
$event = $parsers[1]->parse_event(%args);
}
if ( $event && scalar %$event ) {
return $event;
}
}
};
return $parser;
@@ -9283,6 +9674,8 @@ sub save_results {
default_database => $database,
);
$stats->{queries_written} = 0;
# Execute queries and save the results.
my $errors = 0;
TRY:
@@ -9307,6 +9700,8 @@ sub save_results {
event => $event,
results => $host_results,
);
$stats->{queries_written}++;
}
};
if ( $EVAL_ERROR ) {
@@ -9322,9 +9717,6 @@ sub save_results {
# Did we finish because time ran out?
$run_time->have_time() or $exit_status |= 8;
# Report whatever is left.
$results->report_unreported_classes() or $exit_status |= 1;
return;
}

View File

@@ -225,6 +225,7 @@ sub new {
sessions => {},
o => $args{o},
fake_thread_id => 2**32, # see _make_event()
null_event => $args{null_event},
};
PTDEBUG && $self->{server} && _d('Watching only server', $self->{server});
return bless $self, $class;
@@ -247,7 +248,7 @@ sub parse_event {
$server .= ":$self->{port}";
if ( $src_host ne $server && $dst_host ne $server ) {
PTDEBUG && _d('Packet is not to or from', $server);
return;
return $self->{null_event};
}
}
@@ -265,7 +266,7 @@ sub parse_event {
}
else {
PTDEBUG && _d('Packet is not to or from a MySQL server');
return;
return $self->{null_event};
}
PTDEBUG && _d('Client', $client);
@@ -290,7 +291,7 @@ sub parse_event {
else {
PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,',
'packetno', $packetno);
return;
return $self->{null_event};
}
$self->{sessions}->{$client} = {
@@ -341,7 +342,7 @@ sub parse_event {
delete $self->{sessions}->{$session->{client}};
return $event;
}
return;
return $self->{null_event};
}
# Return unless the compressed packet can be uncompressed.
@@ -380,7 +381,7 @@ sub parse_event {
PTDEBUG && _d('remove_mysql_header() failed; failing session');
$session->{EVAL_ERROR} = $EVAL_ERROR;
$self->fail_session($session, 'remove_mysql_header() failed');
return;
return $self->{null_event};
}
}
@@ -397,7 +398,7 @@ sub parse_event {
$self->_delete_buff($session);
}
else {
return; # waiting for more data; buff_left was reported earlier
return $self->{null_event}; # waiting for more data; buff_left was reported earlier
}
}
elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) {
@@ -425,7 +426,7 @@ sub parse_event {
PTDEBUG && _d('Data not complete; expecting',
$session->{buff_left}, 'more bytes');
return;
return $self->{null_event};
}
if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) {
@@ -456,7 +457,7 @@ sub parse_event {
}
$args{stats}->{events_parsed}++ if $args{stats};
return $event;
return $event || $self->{null_event};
}
# Handles a packet from the server given the state of the session.

View File

@@ -42,7 +42,7 @@ has 'file_iter' => (
has 'parser' => (
is => 'ro',
isa => 'Object',
isa => 'CodeRef',
required => 1,
);
@@ -193,7 +193,7 @@ sub next {
EVENT:
while (
$self->oktorun
&& (my $event = $self->parser->parse_event(%{ $self->_parser_args }) )
&& (my $event = $self->parser->(%{ $self->_parser_args }) )
) {
$self->stats->{queries_read}++;

View File

@@ -54,7 +54,7 @@ my $files = $file_iter->get_file_itr(
$query_iter = QueryIterator->new(
file_iter => $files,
parser => $parser,
parser => sub { return $parser->parse_event(@_) },
fingerprint => sub { return $qr->fingerprint(@_) },
oktorun => sub { return 1 },
read_only => 1,

178
t/pt-upgrade/log_types.t Normal file
View File

@@ -0,0 +1,178 @@
#!/usr/bin/env perl
BEGIN {
die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n"
unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH};
unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib";
};
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Test::More;
use File::Basename;
use File::Temp qw(tempdir);
$ENV{PERCONA_TOOLKIT_TEST_USE_DSN_NAMES} = 1;
$ENV{PRETTY_RESULTS} = 1;
use PerconaTest;
use Sandbox;
require "$trunk/bin/pt-upgrade";
my $dp = new DSNParser(opts=>$dsn_opts);
my $sb = new Sandbox(basedir => '/tmp', DSNParser => $dp);
my $dbh1 = $sb->get_dbh_for('host1');
# Just testing that the other log types work, so we don't need
# the second host. By "other" I mean gen, bin, tcpdump, and raw
# because other tests make extensive use of slow logs.
# DO NOT test the results here. That's better done in compare_hosts.t
# or compare_results.t by creating a numbered dir (e.g. 005/) with
# sample log and output files.
if ( !$dbh1 ) {
plan skip_all => 'Cannot connect to sandbox host1';
}
my $host1_dsn = $sb->dsn_for('host1');
my $tmpdir = tempdir("/tmp/pt-upgrade.$PID.XXXXXX", CLEANUP => 1);
my $samples = "$trunk/t/pt-upgrade/samples";
my $lib_samples = "$trunk/t/lib/samples";
my $exit_status = 0;
my $output;
# #############################################################################
# genlog
# #############################################################################
$output = output(
sub {
$exit_status = pt_upgrade::main($host1_dsn, '--save-results', $tmpdir,
qw(--type genlog),
"$samples/genlog001.txt",
)},
stderr => 1,
);
is(
$exit_status,
0,
"genlog001: exit 0"
);
# There are 7 events, but only 1 SELECT query. The INSERT query
# should be filtered out by default.
like(
$output,
qr/queries_written\s+1/,
"genlog001: wrote 1 query"
);
# #############################################################################
# binlog
# #############################################################################
$output = output(
sub {
$exit_status = pt_upgrade::main($host1_dsn, '--save-results', $tmpdir,
qw(--type binlog),
"$lib_samples/binlogs/binlog001.txt",
)},
stderr => 1,
);
is(
$exit_status,
0,
"binlog001: exit 0 (read-only)"
);
# There are 7 events, but only 1 SELECT query. The INSERT query
# should be filtered out by default.
like(
$output,
qr/queries_written\s+0/,
"binlog001: no queries (read-only)"
);
$output = output(
sub {
$exit_status = pt_upgrade::main($host1_dsn, '--save-results', $tmpdir,
qw(--type binlog --no-read-only),
"$lib_samples/binlogs/binlog001.txt",
)},
stderr => 1,
);
is(
$exit_status,
0,
"binlog001: exit 0"
);
# There are 7 events, but only 1 SELECT query. The INSERT query
# should be filtered out by default.
like(
$output,
qr/queries_written\s+10/,
"binlog001: wrote 10 queries"
);
# #############################################################################
# tcpdump
# #############################################################################
$output = output(
sub {
$exit_status = pt_upgrade::main($host1_dsn, '--save-results', $tmpdir,
qw(--type tcpdump),
"$lib_samples/tcpdump/tcpdump002.txt",
)},
stderr => 1,
);
is(
$exit_status,
0,
"tcpdump001: exit 0",
);
like(
$output,
qr/queries_written\s+2/,
"tcpdump002: wrote 2 queries"
);
# #############################################################################
# rawlog
# #############################################################################
$output = output(
sub {
$exit_status = pt_upgrade::main($host1_dsn, '--save-results', $tmpdir,
qw(--type rawlog),
"$lib_samples/rawlogs/rawlog002.txt",
)},
stderr => 1,
);
is(
$exit_status,
0,
"rawlog001: exit 0",
);
like(
$output,
qr/queries_written\s+2/,
"rawlog002: wrote 2 queries"
);
# #############################################################################
# Done.
# #############################################################################
$sb->wipe_clean($dbh1);
ok($sb->ok(), "Sandbox servers") or BAIL_OUT(__FILE__ . " broke the sandbox");
done_testing;

View File

@@ -0,0 +1,7 @@
051007 21:55:24 42 Connect root@localhost on db1
42 Query SELECT * FROM test.t WHERE id > 5
42 Quit
061226 15:42:36 11 Connect root@localhost on
11 Init DB test
11 Query INSERT INTO t VALUES (null, 'zzz')
061226 16:44:48 11 Quit

View File

@@ -104,6 +104,6 @@ close $dh;
# #############################################################################
# Done.
# #############################################################################
#$sb->wipe_clean($dbh1);
#ok($sb->ok(), "Sandbox servers") or BAIL_OUT(__FILE__ . " broke the sandbox");
$sb->wipe_clean($dbh1);
ok($sb->ok(), "Sandbox servers") or BAIL_OUT(__FILE__ . " broke the sandbox");
done_testing;