# This program is copyright 2007-2011 Percona Ireland Ltd. # Feedback and improvements are welcome. # # THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED # WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF # MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation, version 2; OR the Perl Artistic License. On UNIX and similar # systems, you can issue `man perlgpl' or `man perlartistic' to read these # licenses. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 59 Temple # Place, Suite 330, Boston, MA 02111-1307 USA. # ########################################################################### # MySQLProtocolParser package # ########################################################################### { # Package: MySQLProtocolParser # MySQLProtocolParser parses MySQL events from tcpdump files. # The packets come from TcpdumpParser. MySQLProtocolParse::parse_packet() # should be first in the callback chain because it creates events for # subsequent callbacks. So the sequence is: # 1. mk-query-digest calls TcpdumpParser::parse_event($fh, ..., @callbacks) # 2. TcpdumpParser::parse_event() extracts raw MySQL packets from $fh and # passes them to the callbacks, the first of which is # MySQLProtocolParser::parse_packet(). # 3. MySQLProtocolParser::parse_packet() makes events from the packets # and returns them to TcpdumpParser::parse_event(). # 4. TcpdumpParser::parse_event() passes the newly created events to # the subsequent callbacks. # At times MySQLProtocolParser::parse_packet() will not return an event # because it usually takes a few packets to create one event. In such # cases, TcpdumpParser::parse_event() will not call the other callbacks. package MySQLProtocolParser; use strict; use warnings FATAL => 'all'; use English qw(-no_match_vars); use constant PTDEBUG => $ENV{PTDEBUG} || 0; eval { require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib IO::Uncompress::Inflate->import(qw(inflate $InflateError)); }; use Data::Dumper; $Data::Dumper::Indent = 1; $Data::Dumper::Sortkeys = 1; $Data::Dumper::Quotekeys = 0; BEGIN { our @ISA = 'ProtocolParser'; } use constant { COM_SLEEP => '00', COM_QUIT => '01', COM_INIT_DB => '02', COM_QUERY => '03', COM_FIELD_LIST => '04', COM_CREATE_DB => '05', COM_DROP_DB => '06', COM_REFRESH => '07', COM_SHUTDOWN => '08', COM_STATISTICS => '09', COM_PROCESS_INFO => '0a', COM_CONNECT => '0b', COM_PROCESS_KILL => '0c', COM_DEBUG => '0d', COM_PING => '0e', COM_TIME => '0f', COM_DELAYED_INSERT => '10', COM_CHANGE_USER => '11', COM_BINLOG_DUMP => '12', COM_TABLE_DUMP => '13', COM_CONNECT_OUT => '14', COM_REGISTER_SLAVE => '15', COM_STMT_PREPARE => '16', COM_STMT_EXECUTE => '17', COM_STMT_SEND_LONG_DATA => '18', COM_STMT_CLOSE => '19', COM_STMT_RESET => '1a', COM_SET_OPTION => '1b', COM_STMT_FETCH => '1c', SERVER_QUERY_NO_GOOD_INDEX_USED => 16, SERVER_QUERY_NO_INDEX_USED => 32, }; my %com_for = ( '00' => 'COM_SLEEP', '01' => 'COM_QUIT', '02' => 'COM_INIT_DB', '03' => 'COM_QUERY', '04' => 'COM_FIELD_LIST', '05' => 'COM_CREATE_DB', '06' => 'COM_DROP_DB', '07' => 'COM_REFRESH', '08' => 'COM_SHUTDOWN', '09' => 'COM_STATISTICS', '0a' => 'COM_PROCESS_INFO', '0b' => 'COM_CONNECT', '0c' => 'COM_PROCESS_KILL', '0d' => 'COM_DEBUG', '0e' => 'COM_PING', '0f' => 'COM_TIME', '10' => 'COM_DELAYED_INSERT', '11' => 'COM_CHANGE_USER', '12' => 'COM_BINLOG_DUMP', '13' => 'COM_TABLE_DUMP', '14' => 'COM_CONNECT_OUT', '15' => 'COM_REGISTER_SLAVE', '16' => 'COM_STMT_PREPARE', '17' => 'COM_STMT_EXECUTE', '18' => 'COM_STMT_SEND_LONG_DATA', '19' => 'COM_STMT_CLOSE', '1a' => 'COM_STMT_RESET', '1b' => 'COM_SET_OPTION', '1c' => 'COM_STMT_FETCH', ); my %flag_for = ( 'CLIENT_LONG_PASSWORD' => 1, # new more secure passwords 'CLIENT_FOUND_ROWS' => 2, # Found instead of affected rows 'CLIENT_LONG_FLAG' => 4, # Get all column flags 'CLIENT_CONNECT_WITH_DB' => 8, # One can specify db on connect 'CLIENT_NO_SCHEMA' => 16, # Don't allow database.table.column 'CLIENT_COMPRESS' => 32, # Can use compression protocol 'CLIENT_ODBC' => 64, # Odbc client 'CLIENT_LOCAL_FILES' => 128, # Can use LOAD DATA LOCAL 'CLIENT_IGNORE_SPACE' => 256, # Ignore spaces before '(' 'CLIENT_PROTOCOL_41' => 512, # New 4.1 protocol 'CLIENT_INTERACTIVE' => 1024, # This is an interactive client 'CLIENT_SSL' => 2048, # Switch to SSL after handshake 'CLIENT_IGNORE_SIGPIPE' => 4096, # IGNORE sigpipes 'CLIENT_TRANSACTIONS' => 8192, # Client knows about transactions 'CLIENT_RESERVED' => 16384, # Old flag for 4.1 protocol 'CLIENT_SECURE_CONNECTION' => 32768, # New 4.1 authentication 'CLIENT_MULTI_STATEMENTS' => 65536, # Enable/disable multi-stmt support 'CLIENT_MULTI_RESULTS' => 131072, # Enable/disable multi-results ); use constant { MYSQL_TYPE_DECIMAL => 0, MYSQL_TYPE_TINY => 1, MYSQL_TYPE_SHORT => 2, MYSQL_TYPE_LONG => 3, MYSQL_TYPE_FLOAT => 4, MYSQL_TYPE_DOUBLE => 5, MYSQL_TYPE_NULL => 6, MYSQL_TYPE_TIMESTAMP => 7, MYSQL_TYPE_LONGLONG => 8, MYSQL_TYPE_INT24 => 9, MYSQL_TYPE_DATE => 10, MYSQL_TYPE_TIME => 11, MYSQL_TYPE_DATETIME => 12, MYSQL_TYPE_YEAR => 13, MYSQL_TYPE_NEWDATE => 14, MYSQL_TYPE_VARCHAR => 15, MYSQL_TYPE_BIT => 16, MYSQL_TYPE_NEWDECIMAL => 246, MYSQL_TYPE_ENUM => 247, MYSQL_TYPE_SET => 248, MYSQL_TYPE_TINY_BLOB => 249, MYSQL_TYPE_MEDIUM_BLOB => 250, MYSQL_TYPE_LONG_BLOB => 251, MYSQL_TYPE_BLOB => 252, MYSQL_TYPE_VAR_STRING => 253, MYSQL_TYPE_STRING => 254, MYSQL_TYPE_GEOMETRY => 255, }; my %type_for = ( 0 => 'MYSQL_TYPE_DECIMAL', 1 => 'MYSQL_TYPE_TINY', 2 => 'MYSQL_TYPE_SHORT', 3 => 'MYSQL_TYPE_LONG', 4 => 'MYSQL_TYPE_FLOAT', 5 => 'MYSQL_TYPE_DOUBLE', 6 => 'MYSQL_TYPE_NULL', 7 => 'MYSQL_TYPE_TIMESTAMP', 8 => 'MYSQL_TYPE_LONGLONG', 9 => 'MYSQL_TYPE_INT24', 10 => 'MYSQL_TYPE_DATE', 11 => 'MYSQL_TYPE_TIME', 12 => 'MYSQL_TYPE_DATETIME', 13 => 'MYSQL_TYPE_YEAR', 14 => 'MYSQL_TYPE_NEWDATE', 15 => 'MYSQL_TYPE_VARCHAR', 16 => 'MYSQL_TYPE_BIT', 246 => 'MYSQL_TYPE_NEWDECIMAL', 247 => 'MYSQL_TYPE_ENUM', 248 => 'MYSQL_TYPE_SET', 249 => 'MYSQL_TYPE_TINY_BLOB', 250 => 'MYSQL_TYPE_MEDIUM_BLOB', 251 => 'MYSQL_TYPE_LONG_BLOB', 252 => 'MYSQL_TYPE_BLOB', 253 => 'MYSQL_TYPE_VAR_STRING', 254 => 'MYSQL_TYPE_STRING', 255 => 'MYSQL_TYPE_GEOMETRY', ); my %unpack_type = ( MYSQL_TYPE_NULL => sub { return 'NULL', 0; }, MYSQL_TYPE_TINY => sub { return to_num(@_, 1), 1; }, MySQL_TYPE_SHORT => sub { return to_num(@_, 2), 2; }, MYSQL_TYPE_LONG => sub { return to_num(@_, 4), 4; }, MYSQL_TYPE_LONGLONG => sub { return to_num(@_, 8), 8; }, MYSQL_TYPE_DOUBLE => sub { return to_double(@_), 8; }, MYSQL_TYPE_VARCHAR => \&unpack_string, MYSQL_TYPE_VAR_STRING => \&unpack_string, MYSQL_TYPE_STRING => \&unpack_string, ); # server is the "host:port" of the sever being watched. It's auto-guessed if # not specified. version is a placeholder for handling differences between # MySQL v4.0 and older and v4.1 and newer. Currently, we only handle v4.1. sub new { my ( $class, %args ) = @_; my $self = { server => $args{server}, port => $args{port} || '3306', version => '41', # MySQL proto version; not used yet sessions => {}, o => $args{o}, fake_thread_id => 2**32, # see _make_event() null_event => $args{null_event}, }; PTDEBUG && $self->{server} && _d('Watching only server', $self->{server}); return bless $self, $class; } # The packet arg should be a hashref from TcpdumpParser::parse_event(). # misc is a placeholder for future features. sub parse_event { my ( $self, %args ) = @_; my @required_args = qw(event); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless $args{$arg}; } my $packet = @args{@required_args}; my $src_host = "$packet->{src_host}:$packet->{src_port}"; my $dst_host = "$packet->{dst_host}:$packet->{dst_port}"; if ( my $server = $self->{server} ) { # Watch only the given server. $server .= ":$self->{port}"; if ( $src_host ne $server && $dst_host ne $server ) { PTDEBUG && _d('Packet is not to or from', $server); return $self->{null_event}; } } # Auto-detect the server by looking for port 3306 or port "mysql" (sometimes # tcpdump will substitute the port by a lookup in /etc/protocols). my $packet_from; my $client; if ( $src_host =~ m/:$self->{port}$/ ) { $packet_from = 'server'; $client = $dst_host; } elsif ( $dst_host =~ m/:$self->{port}$/ ) { $packet_from = 'client'; $client = $src_host; } else { PTDEBUG && _d('Packet is not to or from a MySQL server'); return $self->{null_event}; } PTDEBUG && _d('Client', $client); # Get the client's session info or create a new session if # we catch the TCP SYN sequence or the packetno is 0. my $packetno = -1; if ( $packet->{data_len} >= 5 ) { # 5 bytes is the minimum length of any valid MySQL packet. # If there's less, it's probably some TCP control packet # with other data. Peek at the MySQL packet number. The # only time a server sends packetno 0 is for its handshake. # Client packetno 0 marks start of new query. $packetno = to_num(substr($packet->{data}, 6, 2)); } if ( !exists $self->{sessions}->{$client} ) { if ( $packet->{syn} ) { PTDEBUG && _d('New session (SYN)'); } elsif ( $packetno == 0 ) { PTDEBUG && _d('New session (packetno 0)'); } else { PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,', 'packetno', $packetno); return $self->{null_event}; } $self->{sessions}->{$client} = { client => $client, ts => $packet->{ts}, state => undef, compress => undef, raw_packets => [], buff => '', sths => {}, attribs => {}, n_queries => 0, }; } my $session = $self->{sessions}->{$client}; PTDEBUG && _d('Client state:', $session->{state}); # Save raw packets to dump later in case something fails. push @{$session->{raw_packets}}, $packet->{raw_packet}; # Check client port reuse. # http://code.google.com/p/maatkit/issues/detail?id=794 if ( $packet->{syn} && ($session->{n_queries} > 0 || $session->{state}) ) { PTDEBUG && _d('Client port reuse and last session did not quit'); # Fail the session so we can see the last thing the previous # session was doing. $self->fail_session($session, 'client port reuse and last session did not quit'); # Then recurse to create a New session. return $self->parse_event(%args); } # Return early if there's no TCP/MySQL data. These are usually # TCP control packets: SYN, ACK, FIN, etc. if ( $packet->{data_len} == 0 ) { PTDEBUG && _d('TCP control:', map { uc $_ } grep { $packet->{$_} } qw(syn ack fin rst)); if ( $packet->{'fin'} && ($session->{state} || '') eq 'server_handshake' ) { PTDEBUG && _d('Client aborted connection'); my $event = { cmd => 'Admin', arg => 'administrator command: Connect', ts => $packet->{ts}, }; $session->{attribs}->{Error_msg} = 'Client closed connection during handshake'; $event = $self->_make_event($event, $packet, $session); delete $self->{sessions}->{$session->{client}}; return $event; } return $self->{null_event}; } # Return unless the compressed packet can be uncompressed. # If it cannot, then we're helpless and must return. if ( $session->{compress} ) { return unless $self->uncompress_packet($packet, $session); } if ( $session->{buff} && $packet_from eq 'client' ) { # Previous packets were not complete so append this data # to what we've been buffering. Afterwards, do *not* attempt # to remove_mysql_header() because it was already done (from # the first packet). $session->{buff} .= $packet->{data}; $packet->{data} = $session->{buff}; $session->{buff_left} -= $packet->{data_len}; # We didn't remove_mysql_header(), so mysql_data_len isn't set. # So set it to the real, complete data len (from the first # packet's MySQL header). $packet->{mysql_data_len} = $session->{mysql_data_len}; $packet->{number} = $session->{number}; PTDEBUG && _d('Appending data to buff; expecting', $session->{buff_left}, 'more bytes'); } else { # Remove the first MySQL header. A single TCP packet can contain many # MySQL packets, but we only look at the first. The 2nd and subsequent # packets are usually parts of a result set returned by the server, but # we're not interested in result sets. eval { remove_mysql_header($packet); }; if ( $EVAL_ERROR ) { PTDEBUG && _d('remove_mysql_header() failed; failing session'); $session->{EVAL_ERROR} = $EVAL_ERROR; $self->fail_session($session, 'remove_mysql_header() failed'); return $self->{null_event}; } } # Finally, parse the packet and maybe create an event. # The returned event may be empty if no event was ready to be created. my $event; if ( $packet_from eq 'server' ) { $event = $self->_packet_from_server($packet, $session, $args{misc}); } elsif ( $packet_from eq 'client' ) { if ( $session->{buff} ) { if ( $session->{buff_left} <= 0 ) { PTDEBUG && _d('Data is complete'); $self->_delete_buff($session); } else { return $self->{null_event}; # waiting for more data; buff_left was reported earlier } } elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) { # http://code.google.com/p/maatkit/issues/detail?id=832 if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) { PTDEBUG && _d('No server OK to previous command (frag)'); $self->fail_session($session, 'no server OK to previous command'); # The MySQL header is removed by this point, so put it back. $packet->{data} = $packet->{mysql_hdr} . $packet->{data}; return $self->parse_event(%args); } # There is more MySQL data than this packet contains. # Save the data and the original MySQL header values # then wait for the rest of the data. $session->{buff} = $packet->{data}; $session->{mysql_data_len} = $packet->{mysql_data_len}; $session->{number} = $packet->{number}; # Do this just once here. For the next packets, buff_left # will be decremented above. $session->{buff_left} ||= $packet->{mysql_data_len} - ($packet->{data_len} - 4); PTDEBUG && _d('Data not complete; expecting', $session->{buff_left}, 'more bytes'); return $self->{null_event}; } if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) { # Buffer handling above should ensure that by this point we have # the full client query. If there's a previous client query for # which we're "awaiting_reply" and then we get another client # query, chances are we missed the server's OK response to the # first query. So fail the first query and re-parse this second # query. PTDEBUG && _d('No server OK to previous command'); $self->fail_session($session, 'no server OK to previous command'); # The MySQL header is removed by this point, so put it back. $packet->{data} = $packet->{mysql_hdr} . $packet->{data}; return $self->parse_event(%args); } $event = $self->_packet_from_client($packet, $session, $args{misc}); } else { # Should not get here. die 'Packet origin unknown'; } PTDEBUG && _d('Done parsing packet; client state:', $session->{state}); if ( $session->{closed} ) { delete $self->{sessions}->{$session->{client}}; PTDEBUG && _d('Session deleted'); } $args{stats}->{events_parsed}++ if $args{stats}; return $event || $self->{null_event}; } # Handles a packet from the server given the state of the session. # The server can send back a lot of different stuff, but luckily # we're only interested in # * Connection handshake packets for the thread_id # * OK and Error packets for errors, warnings, etc. # Anything else is ignored. Returns an event if one was ready to be # created, otherwise returns nothing. sub _packet_from_server { my ( $self, $packet, $session, $misc ) = @_; die "I need a packet" unless $packet; die "I need a session" unless $session; PTDEBUG && _d('Packet is from server; client state:', $session->{state}); if ( ($session->{server_seq} || '') eq $packet->{seq} ) { push @{ $session->{server_retransmissions} }, $packet->{seq}; PTDEBUG && _d('TCP retransmission'); return; } $session->{server_seq} = $packet->{seq}; my $data = $packet->{data}; # The first byte in the packet indicates whether it's an OK, # ERROR, EOF packet. If it's not one of those, we test # whether it's an initialization packet (the first thing the # server ever sends the client). If it's not that, it could # be a result set header, field, row data, etc. my ( $first_byte ) = substr($data, 0, 2, ''); PTDEBUG && _d('First byte of packet:', $first_byte); if ( !$first_byte ) { $self->fail_session($session, 'no first byte'); return; } # If there's no session state, then we're catching a server response # mid-stream. It's only safe to wait until the client sends a command # or to look for the server handshake. if ( !$session->{state} ) { if ( $first_byte eq '0a' && length $data >= 33 && $data =~ m/00{13}/ ) { # It's the handshake packet from the server to the client. # 0a is protocol v10 which is essentially the only version used # today. 33 is the minimum possible length for a valid server # handshake packet. It's probably a lot longer. Other packets # may start with 0a, but none that can would be >= 33. The 13-byte # 00 scramble buffer is another indicator. my $handshake = parse_server_handshake_packet($data); if ( !$handshake ) { $self->fail_session($session, 'failed to parse server handshake'); return; } $session->{state} = 'server_handshake'; $session->{thread_id} = $handshake->{thread_id}; # See http://code.google.com/p/maatkit/issues/detail?id=794 $session->{ts} = $packet->{ts} unless $session->{ts}; } elsif ( $session->{buff} ) { $self->fail_session($session, 'got server response before full buffer'); return; } else { PTDEBUG && _d('Ignoring mid-stream server response'); return; } } else { if ( $first_byte eq '00' ) { if ( ($session->{state} || '') eq 'client_auth' ) { # We logged in OK! Trigger an admin Connect command. $session->{compress} = $session->{will_compress}; delete $session->{will_compress}; PTDEBUG && $session->{compress} && _d('Packets will be compressed'); PTDEBUG && _d('Admin command: Connect'); return $self->_make_event( { cmd => 'Admin', arg => 'administrator command: Connect', ts => $packet->{ts}, # Events are timestamped when they end }, $packet, $session ); } elsif ( $session->{cmd} ) { # This OK should be ack'ing a query or something sent earlier # by the client. OK for prepared statement are special. my $com = $session->{cmd}->{cmd}; my $ok; if ( $com eq COM_STMT_PREPARE ) { PTDEBUG && _d('OK for prepared statement'); $ok = parse_ok_prepared_statement_packet($data); if ( !$ok ) { $self->fail_session($session, 'failed to parse OK prepared statement packet'); return; } my $sth_id = $ok->{sth_id}; $session->{attribs}->{Statement_id} = $sth_id; # Save all sth info, used in parse_execute_packet(). $session->{sths}->{$sth_id} = $ok; $session->{sths}->{$sth_id}->{statement} = $session->{cmd}->{arg}; } else { $ok = parse_ok_packet($data); if ( !$ok ) { $self->fail_session($session, 'failed to parse OK packet'); return; } } my $arg; if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE || $com eq COM_STMT_RESET ) { $com = 'Query'; $arg = $session->{cmd}->{arg}; } elsif ( $com eq COM_STMT_PREPARE ) { $com = 'Query'; $arg = "PREPARE $session->{cmd}->{arg}"; } else { $arg = 'administrator command: ' . ucfirst(lc(substr($com_for{$com}, 4))); $com = 'Admin'; } return $self->_make_event( { cmd => $com, arg => $arg, ts => $packet->{ts}, Insert_id => $ok->{insert_id}, Warning_count => $ok->{warnings}, Rows_affected => $ok->{affected_rows}, }, $packet, $session ); } else { PTDEBUG && _d('Looks like an OK packet but session has no cmd'); } } elsif ( $first_byte eq 'ff' ) { my $error = parse_error_packet($data); if ( !$error ) { $self->fail_session($session, 'failed to parse error packet'); return; } my $event; if ( $session->{state} eq 'client_auth' || $session->{state} eq 'server_handshake' ) { PTDEBUG && _d('Connection failed'); $event = { cmd => 'Admin', arg => 'administrator command: Connect', ts => $packet->{ts}, Error_no => $error->{errno}, }; $session->{attribs}->{Error_msg} = $error->{message}; $session->{closed} = 1; # delete session when done return $self->_make_event($event, $packet, $session); } elsif ( $session->{cmd} ) { # This error should be in response to a query or something # sent earlier by the client. my $com = $session->{cmd}->{cmd}; my $arg; if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) { $com = 'Query'; $arg = $session->{cmd}->{arg}; } else { $arg = 'administrator command: ' . ucfirst(lc(substr($com_for{$com}, 4))); $com = 'Admin'; } $event = { cmd => $com, arg => $arg, ts => $packet->{ts}, }; if ( $error->{errno} ) { # https://bugs.launchpad.net/percona-toolkit/+bug/823411 $event->{Error_no} = $error->{errno}; } $session->{attribs}->{Error_msg} = $error->{message}; return $self->_make_event($event, $packet, $session); } else { PTDEBUG && _d('Looks like an error packet but client is not ' . 'authenticating and session has no cmd'); } } elsif ( $first_byte eq 'fe' && $packet->{mysql_data_len} < 9 ) { # EOF packet if ( $packet->{mysql_data_len} == 1 && $session->{state} eq 'client_auth' && $packet->{number} == 2 ) { PTDEBUG && _d('Server has old password table;', 'client will resend password using old algorithm'); $session->{state} = 'client_auth_resend'; } else { PTDEBUG && _d('Got an EOF packet'); $self->fail_session($session, 'got an unexpected EOF packet'); # ^^^ We shouldn't reach this because EOF should come after a # header, field, or row data packet; and we should be firing the # event and returning when we see that. See SVN history for some # good stuff we could do if we wanted to handle EOF packets. } } else { # Since we do NOT always have all the data the server sent to the # client, we can't always do any processing of results. So when # we get one of these, we just fire the event even if the query # is not done. This means we will NOT process EOF packets # themselves (see above). if ( $session->{cmd} ) { PTDEBUG && _d('Got a row/field/result packet'); my $com = $session->{cmd}->{cmd}; PTDEBUG && _d('Responding to client', $com_for{$com}); my $event = { ts => $packet->{ts} }; if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) { $event->{cmd} = 'Query'; $event->{arg} = $session->{cmd}->{arg}; } else { $event->{arg} = 'administrator command: ' . ucfirst(lc(substr($com_for{$com}, 4))); $event->{cmd} = 'Admin'; } # We DID get all the data in the packet. if ( $packet->{complete} ) { # Look to see if the end of the data appears to be an EOF # packet. my ( $warning_count, $status_flags ) = $data =~ m/fe(.{4})(.{4})\Z/; if ( $warning_count ) { $event->{Warnings} = to_num($warning_count); my $flags = to_num($status_flags); # TODO set all flags? $event->{No_good_index_used} = $flags & SERVER_QUERY_NO_GOOD_INDEX_USED ? 1 : 0; $event->{No_index_used} = $flags & SERVER_QUERY_NO_INDEX_USED ? 1 : 0; } } return $self->_make_event($event, $packet, $session); } else { PTDEBUG && _d('Unknown in-stream server response'); } } } return; } # Handles a packet from the client given the state of the session. # The client doesn't send a wide and exotic array of packets like # the server. Even so, we're only interested in: # * Users and dbs from connection handshake packets # * SQL statements from COM_QUERY commands # Anything else is ignored. Returns an event if one was ready to be # created, otherwise returns nothing. sub _packet_from_client { my ( $self, $packet, $session, $misc ) = @_; die "I need a packet" unless $packet; die "I need a session" unless $session; PTDEBUG && _d('Packet is from client; state:', $session->{state}); if ( ($session->{client_seq} || '') eq $packet->{seq} ) { push @{ $session->{client_retransmissions} }, $packet->{seq}; PTDEBUG && _d('TCP retransmission'); return; } $session->{client_seq} = $packet->{seq}; my $data = $packet->{data}; my $ts = $packet->{ts}; if ( ($session->{state} || '') eq 'server_handshake' ) { PTDEBUG && _d('Expecting client authentication packet'); # The connection is a 3-way handshake: # server > client (protocol version, thread id, etc.) # client > server (user, pass, default db, etc.) # server > client OK if login succeeds # pos_in_log refers to 2nd handshake from the client. # A connection is logged even if the client fails to # login (bad password, etc.). my $handshake = parse_client_handshake_packet($data); if ( !$handshake ) { $self->fail_session($session, 'failed to parse client handshake'); return; } $session->{state} = 'client_auth'; $session->{pos_in_log} = $packet->{pos_in_log}; $session->{user} = $handshake->{user}; $session->{db} = $handshake->{db}; # $session->{will_compress} will become $session->{compress} when # the server's final handshake packet is received. This prevents # parse_packet() from trying to decompress that final packet. # Compressed packets can only begin after the full handshake is done. $session->{will_compress} = $handshake->{flags}->{CLIENT_COMPRESS}; } elsif ( ($session->{state} || '') eq 'client_auth_resend' ) { # Don't know how to parse this packet. PTDEBUG && _d('Client resending password using old algorithm'); $session->{state} = 'client_auth'; } elsif ( ($session->{state} || '') eq 'awaiting_reply' ) { my $arg = $session->{cmd}->{arg} ? substr($session->{cmd}->{arg}, 0, 50) : 'unknown'; PTDEBUG && _d('More data for previous command:', $arg, '...'); return; } else { # Otherwise, it should be a query if its the first packet (number 0). # We ignore the commands that take arguments (COM_CHANGE_USER, # COM_PROCESS_KILL). if ( $packet->{number} != 0 ) { $self->fail_session($session, 'client cmd not packet 0'); return; } # Detect compression in-stream only if $session->{compress} is # not defined. This means we didn't see the client handshake. # If we had seen it, $session->{compress} would be defined as 0 or 1. if ( !defined $session->{compress} ) { return unless $self->detect_compression($packet, $session); $data = $packet->{data}; } my $com = parse_com_packet($data, $packet->{mysql_data_len}); if ( !$com ) { $self->fail_session($session, 'failed to parse COM packet'); return; } if ( $com->{code} eq COM_STMT_EXECUTE ) { PTDEBUG && _d('Execute prepared statement'); my $exec = parse_execute_packet($com->{data}, $session->{sths}); if ( !$exec ) { # This does not signal a failure, it could just be that # the statement handle ID is unknown. PTDEBUG && _d('Failed to parse execute packet'); $session->{state} = undef; return; } $com->{data} = $exec->{arg}; $session->{attribs}->{Statement_id} = $exec->{sth_id}; } elsif ( $com->{code} eq COM_STMT_RESET ) { my $sth_id = get_sth_id($com->{data}); if ( !$sth_id ) { $self->fail_session($session, 'failed to parse prepared statement reset packet'); return; } $com->{data} = "RESET $sth_id"; $session->{attribs}->{Statement_id} = $sth_id; } $session->{state} = 'awaiting_reply'; $session->{pos_in_log} = $packet->{pos_in_log}; $session->{ts} = $ts; $session->{cmd} = { cmd => $com->{code}, arg => $com->{data}, }; if ( $com->{code} eq COM_QUIT ) { # Fire right away; will cleanup later. PTDEBUG && _d('Got a COM_QUIT'); # See http://code.google.com/p/maatkit/issues/detail?id=794 $session->{closed} = 1; # delete session when done return $self->_make_event( { cmd => 'Admin', arg => 'administrator command: Quit', ts => $ts, }, $packet, $session ); } elsif ( $com->{code} eq COM_STMT_CLOSE ) { # Apparently, these are not acknowledged by the server. my $sth_id = get_sth_id($com->{data}); if ( !$sth_id ) { $self->fail_session($session, 'failed to parse prepared statement close packet'); return; } delete $session->{sths}->{$sth_id}; return $self->_make_event( { cmd => 'Query', arg => "DEALLOCATE PREPARE $sth_id", ts => $ts, }, $packet, $session ); } } return; } # Make and return an event from the given packet and session. sub _make_event { my ( $self, $event, $packet, $session ) = @_; PTDEBUG && _d('Making event'); # Clear packets that preceded this event. $session->{raw_packets} = []; $self->_delete_buff($session); if ( !$session->{thread_id} ) { # Only the server handshake packet gives the thread id, so for # sessions caught mid-stream we assign a fake thread id. PTDEBUG && _d('Giving session fake thread id', $self->{fake_thread_id}); $session->{thread_id} = $self->{fake_thread_id}++; } my ($host, $port) = $session->{client} =~ m/((?:\d+\.){3}\d+)\:(\w+)/; my $new_event = { cmd => $event->{cmd}, arg => $event->{arg}, bytes => length( $event->{arg} ), ts => tcp_timestamp( $event->{ts} ), host => $host, ip => $host, port => $port, db => $session->{db}, user => $session->{user}, Thread_id => $session->{thread_id}, pos_in_log => $session->{pos_in_log}, Query_time => timestamp_diff($session->{ts}, $packet->{ts}), Rows_affected => ($event->{Rows_affected} || 0), Warning_count => ($event->{Warning_count} || 0), No_good_index_used => ($event->{No_good_index_used} ? 'Yes' : 'No'), No_index_used => ($event->{No_index_used} ? 'Yes' : 'No'), }; @{$new_event}{keys %{$session->{attribs}}} = values %{$session->{attribs}}; # https://bugs.launchpad.net/percona-toolkit/+bug/823411 foreach my $opt_attrib ( qw(Error_no) ) { if ( defined $event->{$opt_attrib} ) { $new_event->{$opt_attrib} = $event->{$opt_attrib}; } } PTDEBUG && _d('Properties of event:', Dumper($new_event)); # Delete cmd to prevent re-making the same event if the # server sends extra stuff that looks like a result set, etc. delete $session->{cmd}; # Undef the session state so that we ignore everything from # the server and wait until the client says something again. $session->{state} = undef; # Clear the attribs for this event. $session->{attribs} = {}; $session->{n_queries}++; $session->{server_retransmissions} = []; $session->{client_retransmissions} = []; return $new_event; } # Extracts a slow-log-formatted timestamp from the tcpdump timestamp format. sub tcp_timestamp { my ( $ts ) = @_; $ts =~ s/^\d\d(\d\d)-(\d\d)-(\d\d)/$1$2$3/; return $ts; } # Returns the difference between two tcpdump timestamps. sub timestamp_diff { my ( $start, $end ) = @_; my $sd = substr($start, 0, 11, ''); my $ed = substr($end, 0, 11, ''); my ( $sh, $sm, $ss ) = split(/:/, $start); my ( $eh, $em, $es ) = split(/:/, $end); my $esecs = ($eh * 3600 + $em * 60 + $es); my $ssecs = ($sh * 3600 + $sm * 60 + $ss); if ( $sd eq $ed ) { return sprintf '%.6f', $esecs - $ssecs; } else { # Assume only one day boundary has been crossed, no DST, etc return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs; } } # Converts hexadecimal to string. sub to_string { my ( $data ) = @_; return pack('H*', $data); } sub unpack_string { my ( $data ) = @_; my $len = 0; my $encode_len = 0; ($data, $len, $encode_len) = decode_len($data); my $t = 'H' . ($len ? $len * 2 : '*'); $data = pack($t, $data); return "\"$data\"", $encode_len + $len; } sub decode_len { my ( $data ) = @_; return unless $data; # first byte hex len # ========== ==== ============= # 0-251 0-FB Same # 252 FC Len in next 2 # 253 FD Len in next 4 # 254 FE Len in next 8 my $first_byte = to_num(substr($data, 0, 2, '')); my $len; my $encode_len; if ( $first_byte <= 251 ) { $len = $first_byte; $encode_len = 1; } elsif ( $first_byte == 252 ) { $len = to_num(substr($data, 4, '')); $encode_len = 2; } elsif ( $first_byte == 253 ) { $len = to_num(substr($data, 6, '')); $encode_len = 3; } elsif ( $first_byte == 254 ) { $len = to_num(substr($data, 16, '')); $encode_len = 8; } else { # This shouldn't happen, but it may if we're passed data # that isn't length encoded. PTDEBUG && _d('data:', $data, 'first byte:', $first_byte); die "Invalid length encoded byte: $first_byte"; } PTDEBUG && _d('len:', $len, 'encode len', $encode_len); return $data, $len, $encode_len; } # All numbers are stored with the least significant byte first in the MySQL # protocol (little endian). # This function converts from little endian to big endian sub to_num { my ( $str, $len ) = @_; if ( $len ) { $str = substr($str, 0, $len * 2); } my @bytes = $str =~ m/(..)/g; my $result = 0; foreach my $i ( 0 .. $#bytes ) { $result += hex($bytes[$i]) * (16 ** ($i * 2)); } return $result; } sub to_double { my ( $str ) = @_; return unpack('d', pack('H*', $str)); } # Accepts a reference to a string, which it will modify. Extracts a # length-coded binary off the front of the string and returns that value as an # integer. sub get_lcb { my ( $string ) = @_; my $first_byte = hex(substr($$string, 0, 2, '')); if ( $first_byte < 251 ) { return $first_byte; } elsif ( $first_byte == 252 ) { return to_num(substr($$string, 0, 4, '')); } elsif ( $first_byte == 253 ) { return to_num(substr($$string, 0, 6, '')); } elsif ( $first_byte == 254 ) { return to_num(substr($$string, 0, 16, '')); } } # Error packet structure: # Offset Bytes Field # ====== ================= ==================================== # 00 00 00 01 MySQL proto header (already removed) # ff Error (already removed) # 0 00 00 Error number # 4 23 SQL state marker, always '#' # 6 00 00 00 00 00 SQL state # 16 00 ... Error message # The sqlstate marker and actual sqlstate are combined into one value. sub parse_error_packet { my ( $data ) = @_; return unless $data; PTDEBUG && _d('ERROR data:', $data); if ( length $data < 16 ) { PTDEBUG && _d('Error packet is too short:', $data); return; } my $errno = to_num(substr($data, 0, 4)); my $marker = to_string(substr($data, 4, 2)); my $sqlstate = ''; my $message = ''; if ( $marker eq '#' ) { $sqlstate = to_string(substr($data, 6, 10)); $message = to_string(substr($data, 16)); } else { $marker = ''; $message = to_string(substr($data, 4)); } return unless $message; my $pkt = { errno => $errno, sqlstate => $marker . $sqlstate, message => $message, }; PTDEBUG && _d('Error packet:', Dumper($pkt)); return $pkt; } # OK packet structure: # Bytes Field # =========== ==================================== # 00 00 00 01 MySQL proto header (already removed) # 00 OK/Field count (already removed) # 1-9 Affected rows (LCB) # 1-9 Insert ID (LCB) # 00 00 Server status # 00 00 Warning count # 00 ... Message (optional) sub parse_ok_packet { my ( $data ) = @_; return unless $data; PTDEBUG && _d('OK data:', $data); if ( length $data < 12 ) { PTDEBUG && _d('OK packet is too short:', $data); return; } my $affected_rows = get_lcb(\$data); my $insert_id = get_lcb(\$data); my $status = to_num(substr($data, 0, 4, '')); my $warnings = to_num(substr($data, 0, 4, '')); my $message = to_string($data); # Note: $message is discarded. It might be something like # Records: 2 Duplicates: 0 Warnings: 0 my $pkt = { affected_rows => $affected_rows, insert_id => $insert_id, status => $status, warnings => $warnings, message => $message, }; PTDEBUG && _d('OK packet:', Dumper($pkt)); return $pkt; } # OK prepared statement packet structure: # Bytes Field # =========== ==================================== # 00 OK (already removed) # 00 00 00 00 Statement handler ID # 00 00 Number of columns in result set # 00 00 Number of parameters (?) in query sub parse_ok_prepared_statement_packet { my ( $data ) = @_; return unless $data; PTDEBUG && _d('OK prepared statement data:', $data); if ( length $data < 8 ) { PTDEBUG && _d('OK prepared statement packet is too short:', $data); return; } my $sth_id = to_num(substr($data, 0, 8, '')); my $num_cols = to_num(substr($data, 0, 4, '')); my $num_params = to_num(substr($data, 0, 4, '')); my $pkt = { sth_id => $sth_id, num_cols => $num_cols, num_params => $num_params, }; PTDEBUG && _d('OK prepared packet:', Dumper($pkt)); return $pkt; } # Currently we only capture and return the thread id. sub parse_server_handshake_packet { my ( $data ) = @_; return unless $data; PTDEBUG && _d('Server handshake data:', $data); my $handshake_pattern = qr{ # Bytes Name ^ # ----- ---- (.+?)00 # n Null-Term String server_version (.{8}) # 4 thread_id .{16} # 8 scramble_buff .{2} # 1 filler: always 0x00 (.{4}) # 2 server_capabilities .{2} # 1 server_language .{4} # 2 server_status .{26} # 13 filler: always 0x00 # 13 rest of scramble_buff }x; my ( $server_version, $thread_id, $flags ) = $data =~ m/$handshake_pattern/; my $pkt = { server_version => to_string($server_version), thread_id => to_num($thread_id), flags => parse_flags($flags), }; PTDEBUG && _d('Server handshake packet:', Dumper($pkt)); return $pkt; } # Currently we only capture and return the user and default database. sub parse_client_handshake_packet { my ( $data ) = @_; return unless $data; PTDEBUG && _d('Client handshake data:', $data); my ( $flags, $user, $buff_len ) = $data =~ m{ ^ (.{8}) # Client flags .{10} # Max packet size, charset (?:00){23} # Filler ((?:..)+?)00 # Null-terminated user name (..) # Length-coding byte for scramble buff }x; # This packet is easy to detect because it's the only case where # the server sends the client a packet first (its handshake) and # then the client only and ever sends back its handshake. if ( !$buff_len ) { PTDEBUG && _d('Did not match client handshake packet'); return; } my $code_len = hex($buff_len); my $db; # Only try to get the db if CLIENT_CONNECT_WITH_DB flag is set # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse41 my $capability_flags = to_num($flags); # $flags is stored as little endian. if ($capability_flags & $flag_for{CLIENT_CONNECT_WITH_DB}) { ( $db ) = $data =~ m! ^.{64}${user}00.. # Everything matched before (?:..){$code_len} # The scramble buffer (.*?)00.*\Z # The database name !x; } my $pkt = { user => to_string($user), db => $db ? to_string($db) : '', flags => parse_flags($flags), }; PTDEBUG && _d('Client handshake packet:', Dumper($pkt)); return $pkt; } # COM data is not 00-terminated, but the the MySQL client appends \0, # so we have to use the packet length to know where the data ends. sub parse_com_packet { my ( $data, $len ) = @_; return unless $data && $len; PTDEBUG && _d('COM data:', (substr($data, 0, 100).(length $data > 100 ? '...' : '')), 'len:', $len); my $code = substr($data, 0, 2); my $com = $com_for{$code}; if ( !$com ) { PTDEBUG && _d('Did not match COM packet'); return; } if ( $code ne COM_STMT_EXECUTE && $code ne COM_STMT_CLOSE && $code ne COM_STMT_RESET ) { # Data for the most common COM, e.g. COM_QUERY, is text. # COM_STMT_EXECUTE is not, so we leave it binary; it can # be parsed by parse_execute_packet(). $data = to_string(substr($data, 2, ($len - 1) * 2)); } my $pkt = { code => $code, com => $com, data => $data, }; PTDEBUG && _d('COM packet:', Dumper($pkt)); return $pkt; } # Execute prepared statement packet structure: # Bytes Field # =========== ======================================== # 00 Code 17, COM_STMT_EXECUTE # 00 00 00 00 Statement handler ID # 00 flags # 00 00 00 00 Iteration count (reserved, always 1) # (param_count+7)/8 NULL bitmap # 00 1 if new parameters, else 0 # n*2 Parameter types (only if new parameters) sub parse_execute_packet { my ( $data, $sths ) = @_; return unless $data && $sths; my $sth_id = to_num(substr($data, 2, 8)); return unless defined $sth_id; my $sth = $sths->{$sth_id}; if ( !$sth ) { PTDEBUG && _d('Skipping unknown statement handle', $sth_id); return; } my $null_count = int(($sth->{num_params} + 7) / 8) || 1; my $null_bitmap = to_num(substr($data, 20, $null_count * 2)); PTDEBUG && _d('NULL bitmap:', $null_bitmap, 'count:', $null_count); # This chops off everything up to the byte for new params. substr($data, 0, 20 + ($null_count * 2), ''); my $new_params = to_num(substr($data, 0, 2, '')); my @types; if ( $new_params ) { PTDEBUG && _d('New param types'); # It seems all params are type 254, MYSQL_TYPE_STRING. Perhaps # this depends on the client. If we ever need these types, they # can be saved here. Otherwise for now I just want to see the # types in debug output. for my $i ( 0..($sth->{num_params}-1) ) { my $type = to_num(substr($data, 0, 4, '')); push @types, $type_for{$type}; PTDEBUG && _d('Param', $i, 'type:', $type, $type_for{$type}); } $sth->{types} = \@types; } else { # Retrieve previous param types if there are param vals (data). @types = @{$sth->{types}} if $data; } # $data should now be truncated up to the parameter values. my $arg = $sth->{statement}; PTDEBUG && _d('Statement:', $arg); for my $i ( 0..($sth->{num_params}-1) ) { my $val; my $len; # in bytes if ( $null_bitmap & (2**$i) ) { PTDEBUG && _d('Param', $i, 'is NULL (bitmap)'); $val = 'NULL'; $len = 0; } else { if ( $unpack_type{$types[$i]} ) { ($val, $len) = $unpack_type{$types[$i]}->($data); } else { # TODO: this is probably going to break parsing other param vals PTDEBUG && _d('No handler for param', $i, 'type', $types[$i]); $val = '?'; $len = 0; } } # Replace ? in prepared statement with value. PTDEBUG && _d('Param', $i, 'val:', $val); $arg =~ s/\?/$val/; # Remove this param val from the data, putting us at the next one. substr($data, 0, $len * 2, '') if $len; } my $pkt = { sth_id => $sth_id, arg => "EXECUTE $arg", }; PTDEBUG && _d('Execute packet:', Dumper($pkt)); return $pkt; } sub get_sth_id { my ( $data ) = @_; return unless $data; my $sth_id = to_num(substr($data, 2, 8)); return $sth_id; } sub parse_flags { my ( $flags ) = @_; die "I need flags" unless $flags; PTDEBUG && _d('Flag data:', $flags); my %flags = %flag_for; my $flags_dec = to_num($flags); foreach my $flag ( keys %flag_for ) { my $flagno = $flag_for{$flag}; $flags{$flag} = ($flags_dec & $flagno ? 1 : 0); } return \%flags; } # Takes a scalarref to a hex string of compressed data. # Returns a scalarref to a hex string of the uncompressed data. # The given hex string of compressed data is not modified. sub uncompress_data { my ( $data, $len ) = @_; die "I need data" unless $data; die "I need a len argument" unless $len; die "I need a scalar reference to data" unless ref $data eq 'SCALAR'; PTDEBUG && _d('Uncompressing data'); our $InflateError; # Pack hex string into compressed binary data. my $comp_bin_data = pack('H*', $$data); # Uncompress the compressed binary data. my $uncomp_bin_data = ''; my $z = new IO::Uncompress::Inflate( \$comp_bin_data ) or die "IO::Uncompress::Inflate failed: $InflateError"; my $status = $z->read(\$uncomp_bin_data, $len) or die "IO::Uncompress::Inflate failed: $InflateError"; # Unpack the uncompressed binary data back into a hex string. # This is the original MySQL packet(s). my $uncomp_data = unpack('H*', $uncomp_bin_data); return \$uncomp_data; } # Returns 1 on success or 0 on failure. Failure is probably # detecting compression but not being able to uncompress # (uncompress_packet() returns 0). sub detect_compression { my ( $self, $packet, $session ) = @_; PTDEBUG && _d('Checking for client compression'); # This is a necessary hack for detecting compression in-stream without # having seen the client handshake and CLIENT_COMPRESS flag. If the # client is compressing packets, there will be an extra 7 bytes before # the regular MySQL header. For short COM_QUERY commands, these 7 bytes # are usually zero where we'd expect to see 03 for COM_QUERY. So if we # parse this packet and it looks like a COM_SLEEP (00) which is not a # command that the client can send, then chances are the client is using # compression. my $com = parse_com_packet($packet->{data}, $packet->{mysql_data_len}); if ( $com && $com->{code} eq COM_SLEEP ) { PTDEBUG && _d('Client is using compression'); $session->{compress} = 1; # Since parse_packet() didn't know the packet was compressed, it # called remove_mysql_header() which removed the first 4 of 7 bytes # of the compression header. We must restore these 4 bytes, then # uncompress and remove the MySQL header. We only do this once. $packet->{data} = $packet->{mysql_hdr} . $packet->{data}; return 0 unless $self->uncompress_packet($packet, $session); remove_mysql_header($packet); } else { PTDEBUG && _d('Client is NOT using compression'); $session->{compress} = 0; } return 1; } # Returns 1 if the packet was uncompressed or 0 if we can't uncompress. # Failure is usually due to IO::Uncompress not being available. sub uncompress_packet { my ( $self, $packet, $session ) = @_; die "I need a packet" unless $packet; die "I need a session" unless $session; # From the doc: "A compressed packet header is: # packet length (3 bytes), # packet number (1 byte), # and Uncompressed Packet Length (3 bytes). # The Uncompressed Packet Length is the number of bytes # in the original, uncompressed packet. If this is zero # then the data is not compressed." my $data; my $comp_hdr; my $comp_data_len; my $pkt_num; my $uncomp_data_len; eval { $data = \$packet->{data}; $comp_hdr = substr($$data, 0, 14, ''); $comp_data_len = to_num(substr($comp_hdr, 0, 6)); $pkt_num = to_num(substr($comp_hdr, 6, 2)); $uncomp_data_len = to_num(substr($comp_hdr, 8, 6)); PTDEBUG && _d('Compression header data:', $comp_hdr, 'compressed data len (bytes)', $comp_data_len, 'number', $pkt_num, 'uncompressed data len (bytes)', $uncomp_data_len); }; if ( $EVAL_ERROR ) { $session->{EVAL_ERROR} = $EVAL_ERROR; $self->fail_session($session, 'failed to parse compression header'); return 0; } if ( $uncomp_data_len ) { eval { $data = uncompress_data($data, $uncomp_data_len); $packet->{data} = $$data; }; if ( $EVAL_ERROR ) { $session->{EVAL_ERROR} = $EVAL_ERROR; $self->fail_session($session, 'failed to uncompress data'); die "Cannot uncompress packet. Check that IO::Uncompress::Inflate " . "is installed.\nError: $EVAL_ERROR"; } } else { PTDEBUG && _d('Packet is not really compressed'); $packet->{data} = $$data; } return 1; } # Removes the first 4 bytes of the packet data which should be # a MySQL header: 3 bytes packet length, 1 byte packet number. sub remove_mysql_header { my ( $packet ) = @_; die "I need a packet" unless $packet; # NOTE: the data is modified by the inmost substr call here! If we # had all the data in the TCP packets, we could change this to a while # loop; while get-a-packet-from-$data, do stuff, etc. But we don't, # and we don't want to either. my $mysql_hdr = substr($packet->{data}, 0, 8, ''); my $mysql_data_len = to_num(substr($mysql_hdr, 0, 6)); my $pkt_num = to_num(substr($mysql_hdr, 6, 2)); PTDEBUG && _d('MySQL packet: header data', $mysql_hdr, 'data len (bytes)', $mysql_data_len, 'number', $pkt_num); $packet->{mysql_hdr} = $mysql_hdr; $packet->{mysql_data_len} = $mysql_data_len; $packet->{number} = $pkt_num; return; } # Delete anything we added to the session related to # buffering a large query received in multiple packets. sub _delete_buff { my ( $self, $session ) = @_; map { delete $session->{$_} } qw(buff buff_left mysql_data_len); return; } sub _d { my ($package, undef, $line) = caller 0; @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } map { defined $_ ? $_ : 'undef' } @_; print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; } 1; } # ########################################################################### # End MySQLProtocolParser package # ###########################################################################