Files
percona-toolkit/lib/ProtocolParser.pm
Viktor Szépe 2bd40d8c39 Remove trailing spaces (#665)
* Remove trailing spaces

* PR-665 -  Remove trailing spaces

- Updated not stable test t/pt-online-schema-change/preserve_triggers.t
- Updated utilities in bin directory

* PR-665 -  Remove trailing spaces

- Fixed typos

* PR-665 -  Remove trailing spaces

- Fixed typos

---------

Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
2023-09-06 01:15:12 +03:00

351 lines
11 KiB
Perl

# This program is copyright 2009-2011 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# ProtocolParser package
# ###########################################################################
{
# Package: ProtocolParser
# ProtocolParser is a parent class for protocol-specific parsers.
package ProtocolParser;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use File::Basename qw(basename);
use File::Temp qw(tempfile);
eval {
require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib
IO::Uncompress::Inflate->import(qw(inflate $InflateError));
};
use Data::Dumper;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
sub new {
my ( $class, %args ) = @_;
my $self = {
server => $args{server},
port => $args{port},
sessions => {},
o => $args{o},
};
return bless $self, $class;
}
sub parse_event {
my ( $self, %args ) = @_;
my @required_args = qw(event);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $packet = @args{@required_args};
# Save each session's packets until its closed by the client.
# This allows us to ensure that packets are processed in order.
if ( $self->{buffer} ) {
my ($packet_from, $session) = $self->_get_session($packet);
if ( $packet->{data_len} ) {
if ( $packet_from eq 'client' ) {
push @{$session->{client_packets}}, $packet;
PTDEBUG && _d('Saved client packet');
}
else {
push @{$session->{server_packets}}, $packet;
PTDEBUG && _d('Saved server packet');
}
}
# Process the session's packets when the client closes the connection.
return unless ($packet_from eq 'client')
&& ($packet->{fin} || $packet->{rst});
my $event;
map {
$event = $self->_parse_packet($_, $args{misc});
$args{stats}->{events_parsed}++ if $args{stats};
} sort { $a->{seq} <=> $b->{seq} }
@{$session->{client_packets}};
map {
$event = $self->_parse_packet($_, $args{misc});
$args{stats}->{events_parsed}++ if $args{stats};
} sort { $a->{seq} <=> $b->{seq} }
@{$session->{server_packets}};
return $event;
}
if ( $packet->{data_len} == 0 ) {
# Return early if there's no TCP data. These are usually ACK packets, but
# they could also be FINs in which case, we should close and delete the
# client's session.
PTDEBUG && _d('No TCP data');
return;
}
my $event = $self->_parse_packet($packet, $args{misc});
$args{stats}->{events_parsed}++ if $args{stats};
return $event;
}
# The packet arg should be a hashref from TcpdumpParser::parse_event().
# misc is a placeholder for future features.
sub _parse_packet {
my ( $self, $packet, $misc ) = @_;
my ($packet_from, $session) = $self->_get_session($packet);
PTDEBUG && _d('State:', $session->{state});
# Save raw packets to dump later in case something fails.
push @{$session->{raw_packets}}, $packet->{raw_packet}
unless $misc->{recurse};
if ( $session->{buff} ) {
# Previous packets were not complete so append this data
# to what we've been buffering.
$session->{buff_left} -= $packet->{data_len};
if ( $session->{buff_left} > 0 ) {
PTDEBUG && _d('Added data to buff; expecting', $session->{buff_left},
'more bytes');
return;
}
PTDEBUG && _d('Got all data; buff left:', $session->{buff_left});
$packet->{data} = $session->{buff} . $packet->{data};
$packet->{data_len} += length $session->{buff};
$session->{buff} = '';
$session->{buff_left} = 0;
}
# Finally, parse the packet and maybe create an event.
$packet->{data} = pack('H*', $packet->{data}) unless $misc->{recurse};
my $event;
if ( $packet_from eq 'server' ) {
$event = $self->_packet_from_server($packet, $session, $misc);
}
elsif ( $packet_from eq 'client' ) {
$event = $self->_packet_from_client($packet, $session, $misc);
}
else {
# Should not get here.
die 'Packet origin unknown';
}
PTDEBUG && _d('State:', $session->{state});
if ( $session->{out_of_order} ) {
PTDEBUG && _d('Session packets are out of order');
push @{$session->{packets}}, $packet;
$session->{ts_min}
= $packet->{ts} if $packet->{ts} lt ($session->{ts_min} || '');
$session->{ts_max}
= $packet->{ts} if $packet->{ts} gt ($session->{ts_max} || '');
if ( $session->{have_all_packets} ) {
PTDEBUG && _d('Have all packets; ordering and processing');
delete $session->{out_of_order};
delete $session->{have_all_packets};
map {
$event = $self->_parse_packet($_, { recurse => 1 });
} sort { $a->{seq} <=> $b->{seq} } @{$session->{packets}};
}
}
PTDEBUG && _d('Done with packet; event:', Dumper($event));
return $event;
}
sub _get_session {
my ( $self, $packet ) = @_;
my $src_host = "$packet->{src_host}:$packet->{src_port}";
my $dst_host = "$packet->{dst_host}:$packet->{dst_port}";
if ( my $server = $self->{server} ) { # Watch only the given server.
$server .= ":$self->{port}";
if ( $src_host ne $server && $dst_host ne $server ) {
PTDEBUG && _d('Packet is not to or from', $server);
return;
}
}
# Auto-detect the server by looking for its port.
my $packet_from;
my $client;
if ( $src_host =~ m/:$self->{port}$/ ) {
$packet_from = 'server';
$client = $dst_host;
}
elsif ( $dst_host =~ m/:$self->{port}$/ ) {
$packet_from = 'client';
$client = $src_host;
}
else {
warn 'Packet is not to or from server: ', Dumper($packet);
return;
}
PTDEBUG && _d('Client:', $client);
# Get the client's session info or create a new session if the
# client hasn't been seen before.
if ( !exists $self->{sessions}->{$client} ) {
PTDEBUG && _d('New session');
$self->{sessions}->{$client} = {
client => $client,
state => undef,
raw_packets => [],
# ts -- wait for ts later.
};
};
my $session = $self->{sessions}->{$client};
return $packet_from, $session;
}
sub _packet_from_server {
die "Don't call parent class _packet_from_server()";
}
sub _packet_from_client {
die "Don't call parent class _packet_from_client()";
}
sub make_event {
my ( $self, $session, $packet ) = @_;
die "Event has no attributes" unless scalar keys %{$session->{attribs}};
die "Query has no arg attribute" unless $session->{attribs}->{arg};
my $start_request = $session->{start_request} || 0;
my $start_reply = $session->{start_reply} || 0;
my $end_reply = $session->{end_reply} || 0;
PTDEBUG && _d('Request start:', $start_request,
'reply start:', $start_reply, 'reply end:', $end_reply);
my $event = {
Query_time => $self->timestamp_diff($start_request, $start_reply),
Transmit_time => $self->timestamp_diff($start_reply, $end_reply),
};
@{$event}{keys %{$session->{attribs}}} = values %{$session->{attribs}};
return $event;
}
sub _get_errors_fh {
my ( $self ) = @_;
return $self->{errors_fh} if $self->{errors_fh};
my $exec = basename($0);
# Errors file isn't open yet; try to open it.
my ($errors_fh, $filename);
if ( $filename = $ENV{PERCONA_TOOLKIT_TCP_ERRORS_FILE} ) {
open $errors_fh, ">", $filename
or die "Cannot open $filename for writing (supplied from "
. "PERCONA_TOOLKIT_TCP_ERRORS_FILE): $OS_ERROR";
}
else {
($errors_fh, $filename) = tempfile("/tmp/$exec-errors.XXXXXXX", UNLINK => 0);
}
$self->{errors_file} = $filename;
$self->{errors_fh} = $errors_fh;
return $errors_fh;
}
sub fail_session {
my ( $self, $session, $reason ) = @_;
PTDEBUG && _d('Failed session', $session->{client}, 'because', $reason);
delete $self->{sessions}->{$session->{client}};
return if $self->{_no_save_error};
my $errors_fh = $self->_get_errors_fh();
warn "TCP session $session->{client} had errors, will save them in $self->{errors_file}\n"
unless $self->{_warned_for}->{$self->{errors_file}}++;
my $raw_packets = delete $session->{raw_packets};
$session->{reason_for_failure} = $reason;
my $session_dump = '# ' . Dumper($session);
chomp $session_dump;
$session_dump =~ s/\n/\n# /g;
print $errors_fh join("\n", $session_dump, @$raw_packets), "\n";
return;
}
# Returns the difference between two tcpdump timestamps.
sub timestamp_diff {
my ( $self, $start, $end ) = @_;
return 0 unless $start && $end;
my $sd = substr($start, 0, 11, '');
my $ed = substr($end, 0, 11, '');
my ( $sh, $sm, $ss ) = split(/:/, $start);
my ( $eh, $em, $es ) = split(/:/, $end);
my $esecs = ($eh * 3600 + $em * 60 + $es);
my $ssecs = ($sh * 3600 + $sm * 60 + $ss);
if ( $sd eq $ed ) {
return sprintf '%.6f', $esecs - $ssecs;
}
else { # Assume only one day boundary has been crossed, no DST, etc
return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs;
}
}
# Takes a scalarref to a hex string of compressed data.
# Returns a scalarref to a hex string of the uncompressed data.
# The given hex string of compressed data is not modified.
sub uncompress_data {
my ( $self, $data, $len ) = @_;
die "I need data" unless $data;
die "I need a len argument" unless $len;
die "I need a scalar reference to data" unless ref $data eq 'SCALAR';
PTDEBUG && _d('Uncompressing data');
our $InflateError;
# Pack hex string into compressed binary data.
my $comp_bin_data = pack('H*', $$data);
# Uncompress the compressed binary data.
my $uncomp_bin_data = '';
my $z = new IO::Uncompress::Inflate(
\$comp_bin_data
) or die "IO::Uncompress::Inflate failed: $InflateError";
my $status = $z->read(\$uncomp_bin_data, $len)
or die "IO::Uncompress::Inflate failed: $InflateError";
# Unpack the uncompressed binary data back into a hex string.
# This is the original MySQL packet(s).
my $uncomp_data = unpack('H*', $uncomp_bin_data);
return \$uncomp_data;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End ProtocolParser package
# ###########################################################################