mirror of
				https://github.com/percona/percona-toolkit.git
				synced 2025-10-20 17:49:56 +00:00 
			
		
		
		
	 2bd40d8c39
			
		
	
	2bd40d8c39
	
	
	
		
			
			* Remove trailing spaces * PR-665 - Remove trailing spaces - Updated not stable test t/pt-online-schema-change/preserve_triggers.t - Updated utilities in bin directory * PR-665 - Remove trailing spaces - Fixed typos * PR-665 - Remove trailing spaces - Fixed typos --------- Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
		
			
				
	
	
		
			351 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
			
		
		
	
	
			351 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
| # This program is copyright 2009-2011 Percona Ireland Ltd.
 | |
| # Feedback and improvements are welcome.
 | |
| #
 | |
| # THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
 | |
| # WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
 | |
| # MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
 | |
| #
 | |
| # This program is free software; you can redistribute it and/or modify it under
 | |
| # the terms of the GNU General Public License as published by the Free Software
 | |
| # Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
 | |
| # systems, you can issue `man perlgpl' or `man perlartistic' to read these
 | |
| # licenses.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License along with
 | |
| # this program; if not, write to the Free Software Foundation, Inc., 59 Temple
 | |
| # Place, Suite 330, Boston, MA  02111-1307  USA.
 | |
| # ###########################################################################
 | |
| # ProtocolParser package
 | |
| # ###########################################################################
 | |
| {
 | |
| # Package: ProtocolParser
 | |
| # ProtocolParser is a parent class for protocol-specific parsers.
 | |
| package ProtocolParser;
 | |
| 
 | |
| use strict;
 | |
| use warnings FATAL => 'all';
 | |
| use English qw(-no_match_vars);
 | |
| use constant PTDEBUG => $ENV{PTDEBUG} || 0;
 | |
| 
 | |
| use File::Basename qw(basename);
 | |
| use File::Temp qw(tempfile);
 | |
| 
 | |
| eval {
 | |
|    require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib
 | |
|    IO::Uncompress::Inflate->import(qw(inflate $InflateError));
 | |
| };
 | |
| 
 | |
| use Data::Dumper;
 | |
| $Data::Dumper::Indent    = 1;
 | |
| $Data::Dumper::Sortkeys  = 1;
 | |
| $Data::Dumper::Quotekeys = 0;
 | |
| 
 | |
| sub new {
 | |
|    my ( $class, %args ) = @_;
 | |
| 
 | |
|    my $self = {
 | |
|       server      => $args{server},
 | |
|       port        => $args{port},
 | |
|       sessions    => {},
 | |
|       o           => $args{o},
 | |
|    };
 | |
| 
 | |
|    return bless $self, $class;
 | |
| }
 | |
| 
 | |
| sub parse_event {
 | |
|    my ( $self, %args ) = @_;
 | |
|    my @required_args = qw(event);
 | |
|    foreach my $arg ( @required_args ) {
 | |
|       die "I need a $arg argument" unless $args{$arg};
 | |
|    }
 | |
|    my $packet = @args{@required_args};
 | |
| 
 | |
|    # Save each session's packets until its closed by the client.
 | |
|    # This allows us to ensure that packets are processed in order.
 | |
|    if ( $self->{buffer} ) {
 | |
|       my ($packet_from, $session) = $self->_get_session($packet);
 | |
|       if ( $packet->{data_len} ) {
 | |
|          if ( $packet_from eq 'client' ) {
 | |
|             push @{$session->{client_packets}}, $packet;
 | |
|             PTDEBUG && _d('Saved client packet');
 | |
|          }
 | |
|          else {
 | |
|             push @{$session->{server_packets}}, $packet;
 | |
|             PTDEBUG && _d('Saved server packet');
 | |
|          }
 | |
|       }
 | |
| 
 | |
|       # Process the session's packets when the client closes the connection.
 | |
|       return unless ($packet_from eq 'client')
 | |
|                     && ($packet->{fin} || $packet->{rst});
 | |
| 
 | |
|       my $event;
 | |
|       map {
 | |
|          $event = $self->_parse_packet($_, $args{misc});
 | |
|          $args{stats}->{events_parsed}++ if $args{stats};
 | |
|       } sort { $a->{seq} <=> $b->{seq} }
 | |
|       @{$session->{client_packets}};
 | |
| 
 | |
|       map {
 | |
|          $event = $self->_parse_packet($_, $args{misc});
 | |
|          $args{stats}->{events_parsed}++ if $args{stats};
 | |
|       } sort { $a->{seq} <=> $b->{seq} }
 | |
|       @{$session->{server_packets}};
 | |
| 
 | |
|       return $event;
 | |
|    }
 | |
| 
 | |
|    if ( $packet->{data_len} == 0 ) {
 | |
|       # Return early if there's no TCP data.  These are usually ACK packets, but
 | |
|       # they could also be FINs in which case, we should close and delete the
 | |
|       # client's session.
 | |
|       PTDEBUG && _d('No TCP data');
 | |
|       return;
 | |
|    }
 | |
| 
 | |
|    my $event = $self->_parse_packet($packet, $args{misc});
 | |
|    $args{stats}->{events_parsed}++ if $args{stats};
 | |
|    return $event;
 | |
| }
 | |
| 
 | |
| # The packet arg should be a hashref from TcpdumpParser::parse_event().
 | |
| # misc is a placeholder for future features.
 | |
| sub _parse_packet {
 | |
|    my ( $self, $packet, $misc ) = @_;
 | |
| 
 | |
|    my ($packet_from, $session) = $self->_get_session($packet);
 | |
|    PTDEBUG && _d('State:', $session->{state});
 | |
| 
 | |
|    # Save raw packets to dump later in case something fails.
 | |
|    push @{$session->{raw_packets}}, $packet->{raw_packet}
 | |
|       unless $misc->{recurse};
 | |
| 
 | |
|    if ( $session->{buff} ) {
 | |
|       # Previous packets were not complete so append this data
 | |
|       # to what we've been buffering.
 | |
|       $session->{buff_left} -= $packet->{data_len};
 | |
|       if ( $session->{buff_left} > 0 ) {
 | |
|          PTDEBUG && _d('Added data to buff; expecting', $session->{buff_left},
 | |
|             'more bytes');
 | |
|          return;
 | |
|       }
 | |
| 
 | |
|       PTDEBUG && _d('Got all data; buff left:', $session->{buff_left});
 | |
|       $packet->{data}       = $session->{buff} . $packet->{data};
 | |
|       $packet->{data_len}  += length $session->{buff};
 | |
|       $session->{buff}      = '';
 | |
|       $session->{buff_left} = 0;
 | |
|    }
 | |
| 
 | |
|    # Finally, parse the packet and maybe create an event.
 | |
|    $packet->{data} = pack('H*', $packet->{data}) unless $misc->{recurse};
 | |
|    my $event;
 | |
|    if ( $packet_from eq 'server' ) {
 | |
|       $event = $self->_packet_from_server($packet, $session, $misc);
 | |
|    }
 | |
|    elsif ( $packet_from eq 'client' ) {
 | |
|       $event = $self->_packet_from_client($packet, $session, $misc);
 | |
|    }
 | |
|    else {
 | |
|       # Should not get here.
 | |
|       die 'Packet origin unknown';
 | |
|    }
 | |
|    PTDEBUG && _d('State:', $session->{state});
 | |
| 
 | |
|    if ( $session->{out_of_order} ) {
 | |
|       PTDEBUG && _d('Session packets are out of order');
 | |
|       push @{$session->{packets}}, $packet;
 | |
|       $session->{ts_min}
 | |
|          = $packet->{ts} if $packet->{ts} lt ($session->{ts_min} || '');
 | |
|       $session->{ts_max}
 | |
|          = $packet->{ts} if $packet->{ts} gt ($session->{ts_max} || '');
 | |
|       if ( $session->{have_all_packets} ) {
 | |
|          PTDEBUG && _d('Have all packets; ordering and processing');
 | |
|          delete $session->{out_of_order};
 | |
|          delete $session->{have_all_packets};
 | |
|          map {
 | |
|             $event = $self->_parse_packet($_, { recurse => 1 });
 | |
|          } sort { $a->{seq} <=> $b->{seq} } @{$session->{packets}};
 | |
|       }
 | |
|    }
 | |
| 
 | |
|    PTDEBUG && _d('Done with packet; event:', Dumper($event));
 | |
|    return $event;
 | |
| }
 | |
| 
 | |
| sub _get_session {
 | |
|    my ( $self, $packet ) = @_;
 | |
| 
 | |
|    my $src_host = "$packet->{src_host}:$packet->{src_port}";
 | |
|    my $dst_host = "$packet->{dst_host}:$packet->{dst_port}";
 | |
| 
 | |
|    if ( my $server = $self->{server} ) {  # Watch only the given server.
 | |
|       $server .= ":$self->{port}";
 | |
|       if ( $src_host ne $server && $dst_host ne $server ) {
 | |
|          PTDEBUG && _d('Packet is not to or from', $server);
 | |
|          return;
 | |
|       }
 | |
|    }
 | |
| 
 | |
|    # Auto-detect the server by looking for its port.
 | |
|    my $packet_from;
 | |
|    my $client;
 | |
|    if ( $src_host =~ m/:$self->{port}$/ ) {
 | |
|       $packet_from = 'server';
 | |
|       $client      = $dst_host;
 | |
|    }
 | |
|    elsif ( $dst_host =~ m/:$self->{port}$/ ) {
 | |
|       $packet_from = 'client';
 | |
|       $client      = $src_host;
 | |
|    }
 | |
|    else {
 | |
|       warn 'Packet is not to or from server: ', Dumper($packet);
 | |
|       return;
 | |
|    }
 | |
|    PTDEBUG && _d('Client:', $client);
 | |
| 
 | |
|    # Get the client's session info or create a new session if the
 | |
|    # client hasn't been seen before.
 | |
|    if ( !exists $self->{sessions}->{$client} ) {
 | |
|       PTDEBUG && _d('New session');
 | |
|       $self->{sessions}->{$client} = {
 | |
|          client      => $client,
 | |
|          state       => undef,
 | |
|          raw_packets => [],
 | |
|          # ts -- wait for ts later.
 | |
|       };
 | |
|    };
 | |
|    my $session = $self->{sessions}->{$client};
 | |
| 
 | |
|    return $packet_from, $session;
 | |
| }
 | |
| 
 | |
| sub _packet_from_server {
 | |
|    die "Don't call parent class _packet_from_server()";
 | |
| }
 | |
| 
 | |
| sub _packet_from_client {
 | |
|    die "Don't call parent class _packet_from_client()";
 | |
| }
 | |
| 
 | |
| sub make_event {
 | |
|    my ( $self, $session, $packet ) = @_;
 | |
|    die "Event has no attributes" unless scalar keys %{$session->{attribs}};
 | |
|    die "Query has no arg attribute" unless $session->{attribs}->{arg};
 | |
|    my $start_request = $session->{start_request} || 0;
 | |
|    my $start_reply   = $session->{start_reply}   || 0;
 | |
|    my $end_reply     = $session->{end_reply}     || 0;
 | |
|    PTDEBUG && _d('Request start:', $start_request,
 | |
|       'reply start:', $start_reply, 'reply end:', $end_reply);
 | |
|    my $event = {
 | |
|       Query_time    => $self->timestamp_diff($start_request, $start_reply),
 | |
|       Transmit_time => $self->timestamp_diff($start_reply, $end_reply),
 | |
|    };
 | |
|    @{$event}{keys %{$session->{attribs}}} = values %{$session->{attribs}};
 | |
|    return $event;
 | |
| }
 | |
| 
 | |
| sub _get_errors_fh {
 | |
|    my ( $self ) = @_;
 | |
|    return $self->{errors_fh} if $self->{errors_fh};
 | |
| 
 | |
|    my $exec = basename($0);
 | |
|    # Errors file isn't open yet; try to open it.
 | |
|    my ($errors_fh, $filename);
 | |
|    if ( $filename = $ENV{PERCONA_TOOLKIT_TCP_ERRORS_FILE} ) {
 | |
|       open $errors_fh, ">", $filename
 | |
|          or die "Cannot open $filename for writing (supplied from "
 | |
|               . "PERCONA_TOOLKIT_TCP_ERRORS_FILE): $OS_ERROR";
 | |
|    }
 | |
|    else {
 | |
|       ($errors_fh, $filename) = tempfile("/tmp/$exec-errors.XXXXXXX", UNLINK => 0);
 | |
|    }
 | |
| 
 | |
|    $self->{errors_file} = $filename;
 | |
|    $self->{errors_fh}   = $errors_fh;
 | |
|    return $errors_fh;
 | |
| }
 | |
| 
 | |
| sub fail_session {
 | |
|    my ( $self, $session, $reason ) = @_;
 | |
|    PTDEBUG && _d('Failed session', $session->{client}, 'because', $reason);
 | |
|    delete $self->{sessions}->{$session->{client}};
 | |
| 
 | |
|    return if $self->{_no_save_error};
 | |
| 
 | |
|    my $errors_fh = $self->_get_errors_fh();
 | |
| 
 | |
|    warn "TCP session $session->{client} had errors, will save them in $self->{errors_file}\n"
 | |
|       unless $self->{_warned_for}->{$self->{errors_file}}++;
 | |
| 
 | |
|    my $raw_packets = delete $session->{raw_packets};
 | |
|    $session->{reason_for_failure} = $reason;
 | |
|    my $session_dump = '# ' . Dumper($session);
 | |
|    chomp $session_dump;
 | |
|    $session_dump =~ s/\n/\n# /g;
 | |
|    print $errors_fh join("\n", $session_dump, @$raw_packets), "\n";
 | |
|    return;
 | |
| }
 | |
| 
 | |
| # Returns the difference between two tcpdump timestamps.
 | |
| sub timestamp_diff {
 | |
|    my ( $self, $start, $end ) = @_;
 | |
|    return 0 unless $start && $end;
 | |
|    my $sd = substr($start, 0, 11, '');
 | |
|    my $ed = substr($end,   0, 11, '');
 | |
|    my ( $sh, $sm, $ss ) = split(/:/, $start);
 | |
|    my ( $eh, $em, $es ) = split(/:/, $end);
 | |
|    my $esecs = ($eh * 3600 + $em * 60 + $es);
 | |
|    my $ssecs = ($sh * 3600 + $sm * 60 + $ss);
 | |
|    if ( $sd eq $ed ) {
 | |
|       return sprintf '%.6f', $esecs - $ssecs;
 | |
|    }
 | |
|    else { # Assume only one day boundary has been crossed, no DST, etc
 | |
|       return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs;
 | |
|    }
 | |
| }
 | |
| 
 | |
| # Takes a scalarref to a hex string of compressed data.
 | |
| # Returns a scalarref to a hex string of the uncompressed data.
 | |
| # The given hex string of compressed data is not modified.
 | |
| sub uncompress_data {
 | |
|    my ( $self, $data, $len ) = @_;
 | |
|    die "I need data" unless $data;
 | |
|    die "I need a len argument" unless $len;
 | |
|    die "I need a scalar reference to data" unless ref $data eq 'SCALAR';
 | |
|    PTDEBUG && _d('Uncompressing data');
 | |
|    our $InflateError;
 | |
| 
 | |
|    # Pack hex string into compressed binary data.
 | |
|    my $comp_bin_data = pack('H*', $$data);
 | |
| 
 | |
|    # Uncompress the compressed binary data.
 | |
|    my $uncomp_bin_data = '';
 | |
|    my $z = new IO::Uncompress::Inflate(
 | |
|       \$comp_bin_data
 | |
|    ) or die "IO::Uncompress::Inflate failed: $InflateError";
 | |
|    my $status = $z->read(\$uncomp_bin_data, $len)
 | |
|       or die "IO::Uncompress::Inflate failed: $InflateError";
 | |
| 
 | |
|    # Unpack the uncompressed binary data back into a hex string.
 | |
|    # This is the original MySQL packet(s).
 | |
|    my $uncomp_data = unpack('H*', $uncomp_bin_data);
 | |
| 
 | |
|    return \$uncomp_data;
 | |
| }
 | |
| 
 | |
| sub _d {
 | |
|    my ($package, undef, $line) = caller 0;
 | |
|    @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
 | |
|         map { defined $_ ? $_ : 'undef' }
 | |
|         @_;
 | |
|    print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
 | |
| }
 | |
| 
 | |
| 1;
 | |
| }
 | |
| # ###########################################################################
 | |
| # End ProtocolParser package
 | |
| # ###########################################################################
 |