mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-01 18:25:59 +00:00

* Remove trailing spaces * PR-665 - Remove trailing spaces - Updated not stable test t/pt-online-schema-change/preserve_triggers.t - Updated utilities in bin directory * PR-665 - Remove trailing spaces - Fixed typos * PR-665 - Remove trailing spaces - Fixed typos --------- Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
1547 lines
55 KiB
Perl
1547 lines
55 KiB
Perl
# This program is copyright 2007-2011 Percona Ireland Ltd.
|
|
# Feedback and improvements are welcome.
|
|
#
|
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
|
# licenses.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
# ###########################################################################
|
|
# MySQLProtocolParser package
|
|
# ###########################################################################
|
|
{
|
|
# Package: MySQLProtocolParser
|
|
# MySQLProtocolParser parses MySQL events from tcpdump files.
|
|
# The packets come from TcpdumpParser. MySQLProtocolParse::parse_packet()
|
|
# should be first in the callback chain because it creates events for
|
|
# subsequent callbacks. So the sequence is:
|
|
# 1. mk-query-digest calls TcpdumpParser::parse_event($fh, ..., @callbacks)
|
|
# 2. TcpdumpParser::parse_event() extracts raw MySQL packets from $fh and
|
|
# passes them to the callbacks, the first of which is
|
|
# MySQLProtocolParser::parse_packet().
|
|
# 3. MySQLProtocolParser::parse_packet() makes events from the packets
|
|
# and returns them to TcpdumpParser::parse_event().
|
|
# 4. TcpdumpParser::parse_event() passes the newly created events to
|
|
# the subsequent callbacks.
|
|
# At times MySQLProtocolParser::parse_packet() will not return an event
|
|
# because it usually takes a few packets to create one event. In such
|
|
# cases, TcpdumpParser::parse_event() will not call the other callbacks.
|
|
package MySQLProtocolParser;
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use English qw(-no_match_vars);
|
|
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
|
|
|
eval {
|
|
require IO::Uncompress::Inflate; # yum: perl-IO-Compress-Zlib
|
|
IO::Uncompress::Inflate->import(qw(inflate $InflateError));
|
|
};
|
|
|
|
use Data::Dumper;
|
|
$Data::Dumper::Indent = 1;
|
|
$Data::Dumper::Sortkeys = 1;
|
|
$Data::Dumper::Quotekeys = 0;
|
|
|
|
BEGIN { our @ISA = 'ProtocolParser'; }
|
|
|
|
use constant {
|
|
COM_SLEEP => '00',
|
|
COM_QUIT => '01',
|
|
COM_INIT_DB => '02',
|
|
COM_QUERY => '03',
|
|
COM_FIELD_LIST => '04',
|
|
COM_CREATE_DB => '05',
|
|
COM_DROP_DB => '06',
|
|
COM_REFRESH => '07',
|
|
COM_SHUTDOWN => '08',
|
|
COM_STATISTICS => '09',
|
|
COM_PROCESS_INFO => '0a',
|
|
COM_CONNECT => '0b',
|
|
COM_PROCESS_KILL => '0c',
|
|
COM_DEBUG => '0d',
|
|
COM_PING => '0e',
|
|
COM_TIME => '0f',
|
|
COM_DELAYED_INSERT => '10',
|
|
COM_CHANGE_USER => '11',
|
|
COM_BINLOG_DUMP => '12',
|
|
COM_TABLE_DUMP => '13',
|
|
COM_CONNECT_OUT => '14',
|
|
COM_REGISTER_SLAVE => '15',
|
|
COM_STMT_PREPARE => '16',
|
|
COM_STMT_EXECUTE => '17',
|
|
COM_STMT_SEND_LONG_DATA => '18',
|
|
COM_STMT_CLOSE => '19',
|
|
COM_STMT_RESET => '1a',
|
|
COM_SET_OPTION => '1b',
|
|
COM_STMT_FETCH => '1c',
|
|
SERVER_QUERY_NO_GOOD_INDEX_USED => 16,
|
|
SERVER_QUERY_NO_INDEX_USED => 32,
|
|
};
|
|
|
|
my %com_for = (
|
|
'00' => 'COM_SLEEP',
|
|
'01' => 'COM_QUIT',
|
|
'02' => 'COM_INIT_DB',
|
|
'03' => 'COM_QUERY',
|
|
'04' => 'COM_FIELD_LIST',
|
|
'05' => 'COM_CREATE_DB',
|
|
'06' => 'COM_DROP_DB',
|
|
'07' => 'COM_REFRESH',
|
|
'08' => 'COM_SHUTDOWN',
|
|
'09' => 'COM_STATISTICS',
|
|
'0a' => 'COM_PROCESS_INFO',
|
|
'0b' => 'COM_CONNECT',
|
|
'0c' => 'COM_PROCESS_KILL',
|
|
'0d' => 'COM_DEBUG',
|
|
'0e' => 'COM_PING',
|
|
'0f' => 'COM_TIME',
|
|
'10' => 'COM_DELAYED_INSERT',
|
|
'11' => 'COM_CHANGE_USER',
|
|
'12' => 'COM_BINLOG_DUMP',
|
|
'13' => 'COM_TABLE_DUMP',
|
|
'14' => 'COM_CONNECT_OUT',
|
|
'15' => 'COM_REGISTER_SLAVE',
|
|
'16' => 'COM_STMT_PREPARE',
|
|
'17' => 'COM_STMT_EXECUTE',
|
|
'18' => 'COM_STMT_SEND_LONG_DATA',
|
|
'19' => 'COM_STMT_CLOSE',
|
|
'1a' => 'COM_STMT_RESET',
|
|
'1b' => 'COM_SET_OPTION',
|
|
'1c' => 'COM_STMT_FETCH',
|
|
);
|
|
|
|
my %flag_for = (
|
|
'CLIENT_LONG_PASSWORD' => 1, # new more secure passwords
|
|
'CLIENT_FOUND_ROWS' => 2, # Found instead of affected rows
|
|
'CLIENT_LONG_FLAG' => 4, # Get all column flags
|
|
'CLIENT_CONNECT_WITH_DB' => 8, # One can specify db on connect
|
|
'CLIENT_NO_SCHEMA' => 16, # Don't allow database.table.column
|
|
'CLIENT_COMPRESS' => 32, # Can use compression protocol
|
|
'CLIENT_ODBC' => 64, # Odbc client
|
|
'CLIENT_LOCAL_FILES' => 128, # Can use LOAD DATA LOCAL
|
|
'CLIENT_IGNORE_SPACE' => 256, # Ignore spaces before '('
|
|
'CLIENT_PROTOCOL_41' => 512, # New 4.1 protocol
|
|
'CLIENT_INTERACTIVE' => 1024, # This is an interactive client
|
|
'CLIENT_SSL' => 2048, # Switch to SSL after handshake
|
|
'CLIENT_IGNORE_SIGPIPE' => 4096, # IGNORE sigpipes
|
|
'CLIENT_TRANSACTIONS' => 8192, # Client knows about transactions
|
|
'CLIENT_RESERVED' => 16384, # Old flag for 4.1 protocol
|
|
'CLIENT_SECURE_CONNECTION' => 32768, # New 4.1 authentication
|
|
'CLIENT_MULTI_STATEMENTS' => 65536, # Enable/disable multi-stmt support
|
|
'CLIENT_MULTI_RESULTS' => 131072, # Enable/disable multi-results
|
|
);
|
|
|
|
use constant {
|
|
MYSQL_TYPE_DECIMAL => 0,
|
|
MYSQL_TYPE_TINY => 1,
|
|
MYSQL_TYPE_SHORT => 2,
|
|
MYSQL_TYPE_LONG => 3,
|
|
MYSQL_TYPE_FLOAT => 4,
|
|
MYSQL_TYPE_DOUBLE => 5,
|
|
MYSQL_TYPE_NULL => 6,
|
|
MYSQL_TYPE_TIMESTAMP => 7,
|
|
MYSQL_TYPE_LONGLONG => 8,
|
|
MYSQL_TYPE_INT24 => 9,
|
|
MYSQL_TYPE_DATE => 10,
|
|
MYSQL_TYPE_TIME => 11,
|
|
MYSQL_TYPE_DATETIME => 12,
|
|
MYSQL_TYPE_YEAR => 13,
|
|
MYSQL_TYPE_NEWDATE => 14,
|
|
MYSQL_TYPE_VARCHAR => 15,
|
|
MYSQL_TYPE_BIT => 16,
|
|
MYSQL_TYPE_NEWDECIMAL => 246,
|
|
MYSQL_TYPE_ENUM => 247,
|
|
MYSQL_TYPE_SET => 248,
|
|
MYSQL_TYPE_TINY_BLOB => 249,
|
|
MYSQL_TYPE_MEDIUM_BLOB => 250,
|
|
MYSQL_TYPE_LONG_BLOB => 251,
|
|
MYSQL_TYPE_BLOB => 252,
|
|
MYSQL_TYPE_VAR_STRING => 253,
|
|
MYSQL_TYPE_STRING => 254,
|
|
MYSQL_TYPE_GEOMETRY => 255,
|
|
};
|
|
|
|
my %type_for = (
|
|
0 => 'MYSQL_TYPE_DECIMAL',
|
|
1 => 'MYSQL_TYPE_TINY',
|
|
2 => 'MYSQL_TYPE_SHORT',
|
|
3 => 'MYSQL_TYPE_LONG',
|
|
4 => 'MYSQL_TYPE_FLOAT',
|
|
5 => 'MYSQL_TYPE_DOUBLE',
|
|
6 => 'MYSQL_TYPE_NULL',
|
|
7 => 'MYSQL_TYPE_TIMESTAMP',
|
|
8 => 'MYSQL_TYPE_LONGLONG',
|
|
9 => 'MYSQL_TYPE_INT24',
|
|
10 => 'MYSQL_TYPE_DATE',
|
|
11 => 'MYSQL_TYPE_TIME',
|
|
12 => 'MYSQL_TYPE_DATETIME',
|
|
13 => 'MYSQL_TYPE_YEAR',
|
|
14 => 'MYSQL_TYPE_NEWDATE',
|
|
15 => 'MYSQL_TYPE_VARCHAR',
|
|
16 => 'MYSQL_TYPE_BIT',
|
|
246 => 'MYSQL_TYPE_NEWDECIMAL',
|
|
247 => 'MYSQL_TYPE_ENUM',
|
|
248 => 'MYSQL_TYPE_SET',
|
|
249 => 'MYSQL_TYPE_TINY_BLOB',
|
|
250 => 'MYSQL_TYPE_MEDIUM_BLOB',
|
|
251 => 'MYSQL_TYPE_LONG_BLOB',
|
|
252 => 'MYSQL_TYPE_BLOB',
|
|
253 => 'MYSQL_TYPE_VAR_STRING',
|
|
254 => 'MYSQL_TYPE_STRING',
|
|
255 => 'MYSQL_TYPE_GEOMETRY',
|
|
);
|
|
|
|
my %unpack_type = (
|
|
MYSQL_TYPE_NULL => sub { return 'NULL', 0; },
|
|
MYSQL_TYPE_TINY => sub { return to_num(@_, 1), 1; },
|
|
MySQL_TYPE_SHORT => sub { return to_num(@_, 2), 2; },
|
|
MYSQL_TYPE_LONG => sub { return to_num(@_, 4), 4; },
|
|
MYSQL_TYPE_LONGLONG => sub { return to_num(@_, 8), 8; },
|
|
MYSQL_TYPE_DOUBLE => sub { return to_double(@_), 8; },
|
|
MYSQL_TYPE_VARCHAR => \&unpack_string,
|
|
MYSQL_TYPE_VAR_STRING => \&unpack_string,
|
|
MYSQL_TYPE_STRING => \&unpack_string,
|
|
);
|
|
|
|
# server is the "host:port" of the sever being watched. It's auto-guessed if
|
|
# not specified. version is a placeholder for handling differences between
|
|
# MySQL v4.0 and older and v4.1 and newer. Currently, we only handle v4.1.
|
|
sub new {
|
|
my ( $class, %args ) = @_;
|
|
|
|
my $self = {
|
|
server => $args{server},
|
|
port => $args{port} || '3306',
|
|
version => '41', # MySQL proto version; not used yet
|
|
sessions => {},
|
|
o => $args{o},
|
|
fake_thread_id => 2**32, # see _make_event()
|
|
null_event => $args{null_event},
|
|
};
|
|
PTDEBUG && $self->{server} && _d('Watching only server', $self->{server});
|
|
return bless $self, $class;
|
|
}
|
|
|
|
# The packet arg should be a hashref from TcpdumpParser::parse_event().
|
|
# misc is a placeholder for future features.
|
|
sub parse_event {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(event);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my $packet = @args{@required_args};
|
|
|
|
my $src_host = "$packet->{src_host}:$packet->{src_port}";
|
|
my $dst_host = "$packet->{dst_host}:$packet->{dst_port}";
|
|
|
|
if ( my $server = $self->{server} ) { # Watch only the given server.
|
|
$server .= ":$self->{port}";
|
|
if ( $src_host ne $server && $dst_host ne $server ) {
|
|
PTDEBUG && _d('Packet is not to or from', $server);
|
|
return $self->{null_event};
|
|
}
|
|
}
|
|
|
|
# Auto-detect the server by looking for port 3306 or port "mysql" (sometimes
|
|
# tcpdump will substitute the port by a lookup in /etc/protocols).
|
|
my $packet_from;
|
|
my $client;
|
|
if ( $src_host =~ m/:$self->{port}$/ ) {
|
|
$packet_from = 'server';
|
|
$client = $dst_host;
|
|
}
|
|
elsif ( $dst_host =~ m/:$self->{port}$/ ) {
|
|
$packet_from = 'client';
|
|
$client = $src_host;
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Packet is not to or from a MySQL server');
|
|
return $self->{null_event};
|
|
}
|
|
PTDEBUG && _d('Client', $client);
|
|
|
|
# Get the client's session info or create a new session if
|
|
# we catch the TCP SYN sequence or the packetno is 0.
|
|
my $packetno = -1;
|
|
if ( $packet->{data_len} >= 5 ) {
|
|
# 5 bytes is the minimum length of any valid MySQL packet.
|
|
# If there's less, it's probably some TCP control packet
|
|
# with other data. Peek at the MySQL packet number. The
|
|
# only time a server sends packetno 0 is for its handshake.
|
|
# Client packetno 0 marks start of new query.
|
|
$packetno = to_num(substr($packet->{data}, 6, 2));
|
|
}
|
|
if ( !exists $self->{sessions}->{$client} ) {
|
|
if ( $packet->{syn} ) {
|
|
PTDEBUG && _d('New session (SYN)');
|
|
}
|
|
elsif ( $packetno == 0 ) {
|
|
PTDEBUG && _d('New session (packetno 0)');
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,',
|
|
'packetno', $packetno);
|
|
return $self->{null_event};
|
|
}
|
|
|
|
$self->{sessions}->{$client} = {
|
|
client => $client,
|
|
ts => $packet->{ts},
|
|
state => undef,
|
|
compress => undef,
|
|
raw_packets => [],
|
|
buff => '',
|
|
sths => {},
|
|
attribs => {},
|
|
n_queries => 0,
|
|
};
|
|
}
|
|
my $session = $self->{sessions}->{$client};
|
|
PTDEBUG && _d('Client state:', $session->{state});
|
|
|
|
# Save raw packets to dump later in case something fails.
|
|
push @{$session->{raw_packets}}, $packet->{raw_packet};
|
|
|
|
# Check client port reuse.
|
|
# http://code.google.com/p/maatkit/issues/detail?id=794
|
|
if ( $packet->{syn} && ($session->{n_queries} > 0 || $session->{state}) ) {
|
|
PTDEBUG && _d('Client port reuse and last session did not quit');
|
|
# Fail the session so we can see the last thing the previous
|
|
# session was doing.
|
|
$self->fail_session($session,
|
|
'client port reuse and last session did not quit');
|
|
# Then recurse to create a New session.
|
|
return $self->parse_event(%args);
|
|
}
|
|
|
|
# Return early if there's no TCP/MySQL data. These are usually
|
|
# TCP control packets: SYN, ACK, FIN, etc.
|
|
if ( $packet->{data_len} == 0 ) {
|
|
PTDEBUG && _d('TCP control:',
|
|
map { uc $_ } grep { $packet->{$_} } qw(syn ack fin rst));
|
|
if ( $packet->{'fin'}
|
|
&& ($session->{state} || '') eq 'server_handshake' ) {
|
|
PTDEBUG && _d('Client aborted connection');
|
|
my $event = {
|
|
cmd => 'Admin',
|
|
arg => 'administrator command: Connect',
|
|
ts => $packet->{ts},
|
|
};
|
|
$session->{attribs}->{Error_msg} = 'Client closed connection during handshake';
|
|
$event = $self->_make_event($event, $packet, $session);
|
|
delete $self->{sessions}->{$session->{client}};
|
|
return $event;
|
|
}
|
|
return $self->{null_event};
|
|
}
|
|
|
|
# Return unless the compressed packet can be uncompressed.
|
|
# If it cannot, then we're helpless and must return.
|
|
if ( $session->{compress} ) {
|
|
return unless $self->uncompress_packet($packet, $session);
|
|
}
|
|
|
|
if ( $session->{buff} && $packet_from eq 'client' ) {
|
|
# Previous packets were not complete so append this data
|
|
# to what we've been buffering. Afterwards, do *not* attempt
|
|
# to remove_mysql_header() because it was already done (from
|
|
# the first packet).
|
|
$session->{buff} .= $packet->{data};
|
|
$packet->{data} = $session->{buff};
|
|
$session->{buff_left} -= $packet->{data_len};
|
|
|
|
# We didn't remove_mysql_header(), so mysql_data_len isn't set.
|
|
# So set it to the real, complete data len (from the first
|
|
# packet's MySQL header).
|
|
$packet->{mysql_data_len} = $session->{mysql_data_len};
|
|
$packet->{number} = $session->{number};
|
|
|
|
PTDEBUG && _d('Appending data to buff; expecting',
|
|
$session->{buff_left}, 'more bytes');
|
|
}
|
|
else {
|
|
# Remove the first MySQL header. A single TCP packet can contain many
|
|
# MySQL packets, but we only look at the first. The 2nd and subsequent
|
|
# packets are usually parts of a result set returned by the server, but
|
|
# we're not interested in result sets.
|
|
eval {
|
|
remove_mysql_header($packet);
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
PTDEBUG && _d('remove_mysql_header() failed; failing session');
|
|
$session->{EVAL_ERROR} = $EVAL_ERROR;
|
|
$self->fail_session($session, 'remove_mysql_header() failed');
|
|
return $self->{null_event};
|
|
}
|
|
}
|
|
|
|
# Finally, parse the packet and maybe create an event.
|
|
# The returned event may be empty if no event was ready to be created.
|
|
my $event;
|
|
if ( $packet_from eq 'server' ) {
|
|
$event = $self->_packet_from_server($packet, $session, $args{misc});
|
|
}
|
|
elsif ( $packet_from eq 'client' ) {
|
|
if ( $session->{buff} ) {
|
|
if ( $session->{buff_left} <= 0 ) {
|
|
PTDEBUG && _d('Data is complete');
|
|
$self->_delete_buff($session);
|
|
}
|
|
else {
|
|
return $self->{null_event}; # waiting for more data; buff_left was reported earlier
|
|
}
|
|
}
|
|
elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) {
|
|
|
|
# http://code.google.com/p/maatkit/issues/detail?id=832
|
|
if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) {
|
|
PTDEBUG && _d('No server OK to previous command (frag)');
|
|
$self->fail_session($session, 'no server OK to previous command');
|
|
# The MySQL header is removed by this point, so put it back.
|
|
$packet->{data} = $packet->{mysql_hdr} . $packet->{data};
|
|
return $self->parse_event(%args);
|
|
}
|
|
|
|
# There is more MySQL data than this packet contains.
|
|
# Save the data and the original MySQL header values
|
|
# then wait for the rest of the data.
|
|
$session->{buff} = $packet->{data};
|
|
$session->{mysql_data_len} = $packet->{mysql_data_len};
|
|
$session->{number} = $packet->{number};
|
|
|
|
# Do this just once here. For the next packets, buff_left
|
|
# will be decremented above.
|
|
$session->{buff_left}
|
|
||= $packet->{mysql_data_len} - ($packet->{data_len} - 4);
|
|
|
|
PTDEBUG && _d('Data not complete; expecting',
|
|
$session->{buff_left}, 'more bytes');
|
|
return $self->{null_event};
|
|
}
|
|
|
|
if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) {
|
|
# Buffer handling above should ensure that by this point we have
|
|
# the full client query. If there's a previous client query for
|
|
# which we're "awaiting_reply" and then we get another client
|
|
# query, chances are we missed the server's OK response to the
|
|
# first query. So fail the first query and re-parse this second
|
|
# query.
|
|
PTDEBUG && _d('No server OK to previous command');
|
|
$self->fail_session($session, 'no server OK to previous command');
|
|
# The MySQL header is removed by this point, so put it back.
|
|
$packet->{data} = $packet->{mysql_hdr} . $packet->{data};
|
|
return $self->parse_event(%args);
|
|
}
|
|
|
|
$event = $self->_packet_from_client($packet, $session, $args{misc});
|
|
}
|
|
else {
|
|
# Should not get here.
|
|
die 'Packet origin unknown';
|
|
}
|
|
|
|
PTDEBUG && _d('Done parsing packet; client state:', $session->{state});
|
|
if ( $session->{closed} ) {
|
|
delete $self->{sessions}->{$session->{client}};
|
|
PTDEBUG && _d('Session deleted');
|
|
}
|
|
|
|
$args{stats}->{events_parsed}++ if $args{stats};
|
|
return $event || $self->{null_event};
|
|
}
|
|
|
|
# Handles a packet from the server given the state of the session.
|
|
# The server can send back a lot of different stuff, but luckily
|
|
# we're only interested in
|
|
# * Connection handshake packets for the thread_id
|
|
# * OK and Error packets for errors, warnings, etc.
|
|
# Anything else is ignored. Returns an event if one was ready to be
|
|
# created, otherwise returns nothing.
|
|
sub _packet_from_server {
|
|
my ( $self, $packet, $session, $misc ) = @_;
|
|
die "I need a packet" unless $packet;
|
|
die "I need a session" unless $session;
|
|
|
|
PTDEBUG && _d('Packet is from server; client state:', $session->{state});
|
|
|
|
if ( ($session->{server_seq} || '') eq $packet->{seq} ) {
|
|
push @{ $session->{server_retransmissions} }, $packet->{seq};
|
|
PTDEBUG && _d('TCP retransmission');
|
|
return;
|
|
}
|
|
$session->{server_seq} = $packet->{seq};
|
|
|
|
my $data = $packet->{data};
|
|
|
|
# The first byte in the packet indicates whether it's an OK,
|
|
# ERROR, EOF packet. If it's not one of those, we test
|
|
# whether it's an initialization packet (the first thing the
|
|
# server ever sends the client). If it's not that, it could
|
|
# be a result set header, field, row data, etc.
|
|
|
|
my ( $first_byte ) = substr($data, 0, 2, '');
|
|
PTDEBUG && _d('First byte of packet:', $first_byte);
|
|
if ( !$first_byte ) {
|
|
$self->fail_session($session, 'no first byte');
|
|
return;
|
|
}
|
|
|
|
# If there's no session state, then we're catching a server response
|
|
# mid-stream. It's only safe to wait until the client sends a command
|
|
# or to look for the server handshake.
|
|
if ( !$session->{state} ) {
|
|
if ( $first_byte eq '0a' && length $data >= 33 && $data =~ m/00{13}/ ) {
|
|
# It's the handshake packet from the server to the client.
|
|
# 0a is protocol v10 which is essentially the only version used
|
|
# today. 33 is the minimum possible length for a valid server
|
|
# handshake packet. It's probably a lot longer. Other packets
|
|
# may start with 0a, but none that can would be >= 33. The 13-byte
|
|
# 00 scramble buffer is another indicator.
|
|
my $handshake = parse_server_handshake_packet($data);
|
|
if ( !$handshake ) {
|
|
$self->fail_session($session, 'failed to parse server handshake');
|
|
return;
|
|
}
|
|
$session->{state} = 'server_handshake';
|
|
$session->{thread_id} = $handshake->{thread_id};
|
|
|
|
# See http://code.google.com/p/maatkit/issues/detail?id=794
|
|
$session->{ts} = $packet->{ts} unless $session->{ts};
|
|
}
|
|
elsif ( $session->{buff} ) {
|
|
$self->fail_session($session,
|
|
'got server response before full buffer');
|
|
return;
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Ignoring mid-stream server response');
|
|
return;
|
|
}
|
|
}
|
|
else {
|
|
if ( $first_byte eq '00' ) {
|
|
if ( ($session->{state} || '') eq 'client_auth' ) {
|
|
# We logged in OK! Trigger an admin Connect command.
|
|
|
|
$session->{compress} = $session->{will_compress};
|
|
delete $session->{will_compress};
|
|
PTDEBUG && $session->{compress} && _d('Packets will be compressed');
|
|
|
|
PTDEBUG && _d('Admin command: Connect');
|
|
return $self->_make_event(
|
|
{ cmd => 'Admin',
|
|
arg => 'administrator command: Connect',
|
|
ts => $packet->{ts}, # Events are timestamped when they end
|
|
},
|
|
$packet, $session
|
|
);
|
|
}
|
|
elsif ( $session->{cmd} ) {
|
|
# This OK should be ack'ing a query or something sent earlier
|
|
# by the client. OK for prepared statement are special.
|
|
my $com = $session->{cmd}->{cmd};
|
|
my $ok;
|
|
if ( $com eq COM_STMT_PREPARE ) {
|
|
PTDEBUG && _d('OK for prepared statement');
|
|
$ok = parse_ok_prepared_statement_packet($data);
|
|
if ( !$ok ) {
|
|
$self->fail_session($session,
|
|
'failed to parse OK prepared statement packet');
|
|
return;
|
|
}
|
|
my $sth_id = $ok->{sth_id};
|
|
$session->{attribs}->{Statement_id} = $sth_id;
|
|
|
|
# Save all sth info, used in parse_execute_packet().
|
|
$session->{sths}->{$sth_id} = $ok;
|
|
$session->{sths}->{$sth_id}->{statement}
|
|
= $session->{cmd}->{arg};
|
|
}
|
|
else {
|
|
$ok = parse_ok_packet($data);
|
|
if ( !$ok ) {
|
|
$self->fail_session($session, 'failed to parse OK packet');
|
|
return;
|
|
}
|
|
}
|
|
|
|
my $arg;
|
|
if ( $com eq COM_QUERY
|
|
|| $com eq COM_STMT_EXECUTE || $com eq COM_STMT_RESET ) {
|
|
$com = 'Query';
|
|
$arg = $session->{cmd}->{arg};
|
|
}
|
|
elsif ( $com eq COM_STMT_PREPARE ) {
|
|
$com = 'Query';
|
|
$arg = "PREPARE $session->{cmd}->{arg}";
|
|
}
|
|
else {
|
|
$arg = 'administrator command: '
|
|
. ucfirst(lc(substr($com_for{$com}, 4)));
|
|
$com = 'Admin';
|
|
}
|
|
|
|
return $self->_make_event(
|
|
{ cmd => $com,
|
|
arg => $arg,
|
|
ts => $packet->{ts},
|
|
Insert_id => $ok->{insert_id},
|
|
Warning_count => $ok->{warnings},
|
|
Rows_affected => $ok->{affected_rows},
|
|
},
|
|
$packet, $session
|
|
);
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Looks like an OK packet but session has no cmd');
|
|
}
|
|
}
|
|
elsif ( $first_byte eq 'ff' ) {
|
|
my $error = parse_error_packet($data);
|
|
if ( !$error ) {
|
|
$self->fail_session($session, 'failed to parse error packet');
|
|
return;
|
|
}
|
|
my $event;
|
|
|
|
if ( $session->{state} eq 'client_auth'
|
|
|| $session->{state} eq 'server_handshake' ) {
|
|
PTDEBUG && _d('Connection failed');
|
|
$event = {
|
|
cmd => 'Admin',
|
|
arg => 'administrator command: Connect',
|
|
ts => $packet->{ts},
|
|
Error_no => $error->{errno},
|
|
};
|
|
$session->{attribs}->{Error_msg} = $error->{message};
|
|
$session->{closed} = 1; # delete session when done
|
|
return $self->_make_event($event, $packet, $session);
|
|
}
|
|
elsif ( $session->{cmd} ) {
|
|
# This error should be in response to a query or something
|
|
# sent earlier by the client.
|
|
my $com = $session->{cmd}->{cmd};
|
|
my $arg;
|
|
|
|
if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) {
|
|
$com = 'Query';
|
|
$arg = $session->{cmd}->{arg};
|
|
}
|
|
else {
|
|
$arg = 'administrator command: '
|
|
. ucfirst(lc(substr($com_for{$com}, 4)));
|
|
$com = 'Admin';
|
|
}
|
|
|
|
$event = {
|
|
cmd => $com,
|
|
arg => $arg,
|
|
ts => $packet->{ts},
|
|
};
|
|
if ( $error->{errno} ) {
|
|
# https://bugs.launchpad.net/percona-toolkit/+bug/823411
|
|
$event->{Error_no} = $error->{errno};
|
|
}
|
|
$session->{attribs}->{Error_msg} = $error->{message};
|
|
return $self->_make_event($event, $packet, $session);
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Looks like an error packet but client is not '
|
|
. 'authenticating and session has no cmd');
|
|
}
|
|
}
|
|
elsif ( $first_byte eq 'fe' && $packet->{mysql_data_len} < 9 ) {
|
|
# EOF packet
|
|
if ( $packet->{mysql_data_len} == 1
|
|
&& $session->{state} eq 'client_auth'
|
|
&& $packet->{number} == 2 )
|
|
{
|
|
PTDEBUG && _d('Server has old password table;',
|
|
'client will resend password using old algorithm');
|
|
$session->{state} = 'client_auth_resend';
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Got an EOF packet');
|
|
$self->fail_session($session, 'got an unexpected EOF packet');
|
|
# ^^^ We shouldn't reach this because EOF should come after a
|
|
# header, field, or row data packet; and we should be firing the
|
|
# event and returning when we see that. See SVN history for some
|
|
# good stuff we could do if we wanted to handle EOF packets.
|
|
}
|
|
}
|
|
else {
|
|
# Since we do NOT always have all the data the server sent to the
|
|
# client, we can't always do any processing of results. So when
|
|
# we get one of these, we just fire the event even if the query
|
|
# is not done. This means we will NOT process EOF packets
|
|
# themselves (see above).
|
|
if ( $session->{cmd} ) {
|
|
PTDEBUG && _d('Got a row/field/result packet');
|
|
my $com = $session->{cmd}->{cmd};
|
|
PTDEBUG && _d('Responding to client', $com_for{$com});
|
|
my $event = { ts => $packet->{ts} };
|
|
if ( $com eq COM_QUERY || $com eq COM_STMT_EXECUTE ) {
|
|
$event->{cmd} = 'Query';
|
|
$event->{arg} = $session->{cmd}->{arg};
|
|
}
|
|
else {
|
|
$event->{arg} = 'administrator command: '
|
|
. ucfirst(lc(substr($com_for{$com}, 4)));
|
|
$event->{cmd} = 'Admin';
|
|
}
|
|
|
|
# We DID get all the data in the packet.
|
|
if ( $packet->{complete} ) {
|
|
# Look to see if the end of the data appears to be an EOF
|
|
# packet.
|
|
my ( $warning_count, $status_flags )
|
|
= $data =~ m/fe(.{4})(.{4})\Z/;
|
|
if ( $warning_count ) {
|
|
$event->{Warnings} = to_num($warning_count);
|
|
my $flags = to_num($status_flags); # TODO set all flags?
|
|
$event->{No_good_index_used}
|
|
= $flags & SERVER_QUERY_NO_GOOD_INDEX_USED ? 1 : 0;
|
|
$event->{No_index_used}
|
|
= $flags & SERVER_QUERY_NO_INDEX_USED ? 1 : 0;
|
|
}
|
|
}
|
|
|
|
return $self->_make_event($event, $packet, $session);
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Unknown in-stream server response');
|
|
}
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
# Handles a packet from the client given the state of the session.
|
|
# The client doesn't send a wide and exotic array of packets like
|
|
# the server. Even so, we're only interested in:
|
|
# * Users and dbs from connection handshake packets
|
|
# * SQL statements from COM_QUERY commands
|
|
# Anything else is ignored. Returns an event if one was ready to be
|
|
# created, otherwise returns nothing.
|
|
sub _packet_from_client {
|
|
my ( $self, $packet, $session, $misc ) = @_;
|
|
die "I need a packet" unless $packet;
|
|
die "I need a session" unless $session;
|
|
|
|
PTDEBUG && _d('Packet is from client; state:', $session->{state});
|
|
|
|
if ( ($session->{client_seq} || '') eq $packet->{seq} ) {
|
|
push @{ $session->{client_retransmissions} }, $packet->{seq};
|
|
PTDEBUG && _d('TCP retransmission');
|
|
return;
|
|
}
|
|
$session->{client_seq} = $packet->{seq};
|
|
|
|
my $data = $packet->{data};
|
|
my $ts = $packet->{ts};
|
|
|
|
if ( ($session->{state} || '') eq 'server_handshake' ) {
|
|
PTDEBUG && _d('Expecting client authentication packet');
|
|
# The connection is a 3-way handshake:
|
|
# server > client (protocol version, thread id, etc.)
|
|
# client > server (user, pass, default db, etc.)
|
|
# server > client OK if login succeeds
|
|
# pos_in_log refers to 2nd handshake from the client.
|
|
# A connection is logged even if the client fails to
|
|
# login (bad password, etc.).
|
|
my $handshake = parse_client_handshake_packet($data);
|
|
if ( !$handshake ) {
|
|
$self->fail_session($session, 'failed to parse client handshake');
|
|
return;
|
|
}
|
|
$session->{state} = 'client_auth';
|
|
$session->{pos_in_log} = $packet->{pos_in_log};
|
|
$session->{user} = $handshake->{user};
|
|
$session->{db} = $handshake->{db};
|
|
|
|
# $session->{will_compress} will become $session->{compress} when
|
|
# the server's final handshake packet is received. This prevents
|
|
# parse_packet() from trying to decompress that final packet.
|
|
# Compressed packets can only begin after the full handshake is done.
|
|
$session->{will_compress} = $handshake->{flags}->{CLIENT_COMPRESS};
|
|
}
|
|
elsif ( ($session->{state} || '') eq 'client_auth_resend' ) {
|
|
# Don't know how to parse this packet.
|
|
PTDEBUG && _d('Client resending password using old algorithm');
|
|
$session->{state} = 'client_auth';
|
|
}
|
|
elsif ( ($session->{state} || '') eq 'awaiting_reply' ) {
|
|
my $arg = $session->{cmd}->{arg} ? substr($session->{cmd}->{arg}, 0, 50)
|
|
: 'unknown';
|
|
PTDEBUG && _d('More data for previous command:', $arg, '...');
|
|
return;
|
|
}
|
|
else {
|
|
# Otherwise, it should be a query if its the first packet (number 0).
|
|
# We ignore the commands that take arguments (COM_CHANGE_USER,
|
|
# COM_PROCESS_KILL).
|
|
if ( $packet->{number} != 0 ) {
|
|
$self->fail_session($session, 'client cmd not packet 0');
|
|
return;
|
|
}
|
|
|
|
# Detect compression in-stream only if $session->{compress} is
|
|
# not defined. This means we didn't see the client handshake.
|
|
# If we had seen it, $session->{compress} would be defined as 0 or 1.
|
|
if ( !defined $session->{compress} ) {
|
|
return unless $self->detect_compression($packet, $session);
|
|
$data = $packet->{data};
|
|
}
|
|
|
|
my $com = parse_com_packet($data, $packet->{mysql_data_len});
|
|
if ( !$com ) {
|
|
$self->fail_session($session, 'failed to parse COM packet');
|
|
return;
|
|
}
|
|
|
|
if ( $com->{code} eq COM_STMT_EXECUTE ) {
|
|
PTDEBUG && _d('Execute prepared statement');
|
|
my $exec = parse_execute_packet($com->{data}, $session->{sths});
|
|
if ( !$exec ) {
|
|
# This does not signal a failure, it could just be that
|
|
# the statement handle ID is unknown.
|
|
PTDEBUG && _d('Failed to parse execute packet');
|
|
$session->{state} = undef;
|
|
return;
|
|
}
|
|
$com->{data} = $exec->{arg};
|
|
$session->{attribs}->{Statement_id} = $exec->{sth_id};
|
|
}
|
|
elsif ( $com->{code} eq COM_STMT_RESET ) {
|
|
my $sth_id = get_sth_id($com->{data});
|
|
if ( !$sth_id ) {
|
|
$self->fail_session($session,
|
|
'failed to parse prepared statement reset packet');
|
|
return;
|
|
}
|
|
$com->{data} = "RESET $sth_id";
|
|
$session->{attribs}->{Statement_id} = $sth_id;
|
|
}
|
|
|
|
$session->{state} = 'awaiting_reply';
|
|
$session->{pos_in_log} = $packet->{pos_in_log};
|
|
$session->{ts} = $ts;
|
|
$session->{cmd} = {
|
|
cmd => $com->{code},
|
|
arg => $com->{data},
|
|
};
|
|
|
|
if ( $com->{code} eq COM_QUIT ) { # Fire right away; will cleanup later.
|
|
PTDEBUG && _d('Got a COM_QUIT');
|
|
|
|
# See http://code.google.com/p/maatkit/issues/detail?id=794
|
|
$session->{closed} = 1; # delete session when done
|
|
|
|
return $self->_make_event(
|
|
{ cmd => 'Admin',
|
|
arg => 'administrator command: Quit',
|
|
ts => $ts,
|
|
},
|
|
$packet, $session
|
|
);
|
|
}
|
|
elsif ( $com->{code} eq COM_STMT_CLOSE ) {
|
|
# Apparently, these are not acknowledged by the server.
|
|
my $sth_id = get_sth_id($com->{data});
|
|
if ( !$sth_id ) {
|
|
$self->fail_session($session,
|
|
'failed to parse prepared statement close packet');
|
|
return;
|
|
}
|
|
delete $session->{sths}->{$sth_id};
|
|
return $self->_make_event(
|
|
{ cmd => 'Query',
|
|
arg => "DEALLOCATE PREPARE $sth_id",
|
|
ts => $ts,
|
|
},
|
|
$packet, $session
|
|
);
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
# Make and return an event from the given packet and session.
|
|
sub _make_event {
|
|
my ( $self, $event, $packet, $session ) = @_;
|
|
PTDEBUG && _d('Making event');
|
|
|
|
# Clear packets that preceded this event.
|
|
$session->{raw_packets} = [];
|
|
$self->_delete_buff($session);
|
|
|
|
if ( !$session->{thread_id} ) {
|
|
# Only the server handshake packet gives the thread id, so for
|
|
# sessions caught mid-stream we assign a fake thread id.
|
|
PTDEBUG && _d('Giving session fake thread id', $self->{fake_thread_id});
|
|
$session->{thread_id} = $self->{fake_thread_id}++;
|
|
}
|
|
|
|
my ($host, $port) = $session->{client} =~ m/((?:\d+\.){3}\d+)\:(\w+)/;
|
|
my $new_event = {
|
|
cmd => $event->{cmd},
|
|
arg => $event->{arg},
|
|
bytes => length( $event->{arg} ),
|
|
ts => tcp_timestamp( $event->{ts} ),
|
|
host => $host,
|
|
ip => $host,
|
|
port => $port,
|
|
db => $session->{db},
|
|
user => $session->{user},
|
|
Thread_id => $session->{thread_id},
|
|
pos_in_log => $session->{pos_in_log},
|
|
Query_time => timestamp_diff($session->{ts}, $packet->{ts}),
|
|
Rows_affected => ($event->{Rows_affected} || 0),
|
|
Warning_count => ($event->{Warning_count} || 0),
|
|
No_good_index_used => ($event->{No_good_index_used} ? 'Yes' : 'No'),
|
|
No_index_used => ($event->{No_index_used} ? 'Yes' : 'No'),
|
|
};
|
|
@{$new_event}{keys %{$session->{attribs}}} = values %{$session->{attribs}};
|
|
# https://bugs.launchpad.net/percona-toolkit/+bug/823411
|
|
foreach my $opt_attrib ( qw(Error_no) ) {
|
|
if ( defined $event->{$opt_attrib} ) {
|
|
$new_event->{$opt_attrib} = $event->{$opt_attrib};
|
|
}
|
|
}
|
|
PTDEBUG && _d('Properties of event:', Dumper($new_event));
|
|
|
|
# Delete cmd to prevent re-making the same event if the
|
|
# server sends extra stuff that looks like a result set, etc.
|
|
delete $session->{cmd};
|
|
|
|
# Undef the session state so that we ignore everything from
|
|
# the server and wait until the client says something again.
|
|
$session->{state} = undef;
|
|
|
|
# Clear the attribs for this event.
|
|
$session->{attribs} = {};
|
|
|
|
$session->{n_queries}++;
|
|
$session->{server_retransmissions} = [];
|
|
$session->{client_retransmissions} = [];
|
|
|
|
return $new_event;
|
|
}
|
|
|
|
# Extracts a slow-log-formatted timestamp from the tcpdump timestamp format.
|
|
sub tcp_timestamp {
|
|
my ( $ts ) = @_;
|
|
$ts =~ s/^\d\d(\d\d)-(\d\d)-(\d\d)/$1$2$3/;
|
|
return $ts;
|
|
}
|
|
|
|
# Returns the difference between two tcpdump timestamps.
|
|
sub timestamp_diff {
|
|
my ( $start, $end ) = @_;
|
|
my $sd = substr($start, 0, 11, '');
|
|
my $ed = substr($end, 0, 11, '');
|
|
my ( $sh, $sm, $ss ) = split(/:/, $start);
|
|
my ( $eh, $em, $es ) = split(/:/, $end);
|
|
my $esecs = ($eh * 3600 + $em * 60 + $es);
|
|
my $ssecs = ($sh * 3600 + $sm * 60 + $ss);
|
|
if ( $sd eq $ed ) {
|
|
return sprintf '%.6f', $esecs - $ssecs;
|
|
}
|
|
else { # Assume only one day boundary has been crossed, no DST, etc
|
|
return sprintf '%.6f', ( 86_400 - $ssecs ) + $esecs;
|
|
}
|
|
}
|
|
|
|
# Converts hexadecimal to string.
|
|
sub to_string {
|
|
my ( $data ) = @_;
|
|
return pack('H*', $data);
|
|
}
|
|
|
|
sub unpack_string {
|
|
my ( $data ) = @_;
|
|
my $len = 0;
|
|
my $encode_len = 0;
|
|
($data, $len, $encode_len) = decode_len($data);
|
|
my $t = 'H' . ($len ? $len * 2 : '*');
|
|
$data = pack($t, $data);
|
|
return "\"$data\"", $encode_len + $len;
|
|
}
|
|
|
|
sub decode_len {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
|
|
# first byte hex len
|
|
# ========== ==== =============
|
|
# 0-251 0-FB Same
|
|
# 252 FC Len in next 2
|
|
# 253 FD Len in next 4
|
|
# 254 FE Len in next 8
|
|
my $first_byte = to_num(substr($data, 0, 2, ''));
|
|
|
|
my $len;
|
|
my $encode_len;
|
|
if ( $first_byte <= 251 ) {
|
|
$len = $first_byte;
|
|
$encode_len = 1;
|
|
}
|
|
elsif ( $first_byte == 252 ) {
|
|
$len = to_num(substr($data, 4, ''));
|
|
$encode_len = 2;
|
|
}
|
|
elsif ( $first_byte == 253 ) {
|
|
$len = to_num(substr($data, 6, ''));
|
|
$encode_len = 3;
|
|
}
|
|
elsif ( $first_byte == 254 ) {
|
|
$len = to_num(substr($data, 16, ''));
|
|
$encode_len = 8;
|
|
}
|
|
else {
|
|
# This shouldn't happen, but it may if we're passed data
|
|
# that isn't length encoded.
|
|
PTDEBUG && _d('data:', $data, 'first byte:', $first_byte);
|
|
die "Invalid length encoded byte: $first_byte";
|
|
}
|
|
|
|
PTDEBUG && _d('len:', $len, 'encode len', $encode_len);
|
|
return $data, $len, $encode_len;
|
|
}
|
|
|
|
# All numbers are stored with the least significant byte first in the MySQL
|
|
# protocol (little endian).
|
|
# This function converts from little endian to big endian
|
|
sub to_num {
|
|
my ( $str, $len ) = @_;
|
|
if ( $len ) {
|
|
$str = substr($str, 0, $len * 2);
|
|
}
|
|
my @bytes = $str =~ m/(..)/g;
|
|
my $result = 0;
|
|
foreach my $i ( 0 .. $#bytes ) {
|
|
$result += hex($bytes[$i]) * (16 ** ($i * 2));
|
|
}
|
|
return $result;
|
|
}
|
|
|
|
sub to_double {
|
|
my ( $str ) = @_;
|
|
return unpack('d', pack('H*', $str));
|
|
}
|
|
|
|
# Accepts a reference to a string, which it will modify. Extracts a
|
|
# length-coded binary off the front of the string and returns that value as an
|
|
# integer.
|
|
sub get_lcb {
|
|
my ( $string ) = @_;
|
|
my $first_byte = hex(substr($$string, 0, 2, ''));
|
|
if ( $first_byte < 251 ) {
|
|
return $first_byte;
|
|
}
|
|
elsif ( $first_byte == 252 ) {
|
|
return to_num(substr($$string, 0, 4, ''));
|
|
}
|
|
elsif ( $first_byte == 253 ) {
|
|
return to_num(substr($$string, 0, 6, ''));
|
|
}
|
|
elsif ( $first_byte == 254 ) {
|
|
return to_num(substr($$string, 0, 16, ''));
|
|
}
|
|
}
|
|
|
|
# Error packet structure:
|
|
# Offset Bytes Field
|
|
# ====== ================= ====================================
|
|
# 00 00 00 01 MySQL proto header (already removed)
|
|
# ff Error (already removed)
|
|
# 0 00 00 Error number
|
|
# 4 23 SQL state marker, always '#'
|
|
# 6 00 00 00 00 00 SQL state
|
|
# 16 00 ... Error message
|
|
# The sqlstate marker and actual sqlstate are combined into one value.
|
|
sub parse_error_packet {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
PTDEBUG && _d('ERROR data:', $data);
|
|
if ( length $data < 16 ) {
|
|
PTDEBUG && _d('Error packet is too short:', $data);
|
|
return;
|
|
}
|
|
my $errno = to_num(substr($data, 0, 4));
|
|
my $marker = to_string(substr($data, 4, 2));
|
|
my $sqlstate = '';
|
|
my $message = '';
|
|
if ( $marker eq '#' ) {
|
|
$sqlstate = to_string(substr($data, 6, 10));
|
|
$message = to_string(substr($data, 16));
|
|
}
|
|
else {
|
|
$marker = '';
|
|
$message = to_string(substr($data, 4));
|
|
}
|
|
return unless $message;
|
|
my $pkt = {
|
|
errno => $errno,
|
|
sqlstate => $marker . $sqlstate,
|
|
message => $message,
|
|
};
|
|
PTDEBUG && _d('Error packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
# OK packet structure:
|
|
# Bytes Field
|
|
# =========== ====================================
|
|
# 00 00 00 01 MySQL proto header (already removed)
|
|
# 00 OK/Field count (already removed)
|
|
# 1-9 Affected rows (LCB)
|
|
# 1-9 Insert ID (LCB)
|
|
# 00 00 Server status
|
|
# 00 00 Warning count
|
|
# 00 ... Message (optional)
|
|
sub parse_ok_packet {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
PTDEBUG && _d('OK data:', $data);
|
|
if ( length $data < 12 ) {
|
|
PTDEBUG && _d('OK packet is too short:', $data);
|
|
return;
|
|
}
|
|
my $affected_rows = get_lcb(\$data);
|
|
my $insert_id = get_lcb(\$data);
|
|
my $status = to_num(substr($data, 0, 4, ''));
|
|
my $warnings = to_num(substr($data, 0, 4, ''));
|
|
my $message = to_string($data);
|
|
# Note: $message is discarded. It might be something like
|
|
# Records: 2 Duplicates: 0 Warnings: 0
|
|
my $pkt = {
|
|
affected_rows => $affected_rows,
|
|
insert_id => $insert_id,
|
|
status => $status,
|
|
warnings => $warnings,
|
|
message => $message,
|
|
};
|
|
PTDEBUG && _d('OK packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
# OK prepared statement packet structure:
|
|
# Bytes Field
|
|
# =========== ====================================
|
|
# 00 OK (already removed)
|
|
# 00 00 00 00 Statement handler ID
|
|
# 00 00 Number of columns in result set
|
|
# 00 00 Number of parameters (?) in query
|
|
sub parse_ok_prepared_statement_packet {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
PTDEBUG && _d('OK prepared statement data:', $data);
|
|
if ( length $data < 8 ) {
|
|
PTDEBUG && _d('OK prepared statement packet is too short:', $data);
|
|
return;
|
|
}
|
|
my $sth_id = to_num(substr($data, 0, 8, ''));
|
|
my $num_cols = to_num(substr($data, 0, 4, ''));
|
|
my $num_params = to_num(substr($data, 0, 4, ''));
|
|
my $pkt = {
|
|
sth_id => $sth_id,
|
|
num_cols => $num_cols,
|
|
num_params => $num_params,
|
|
};
|
|
PTDEBUG && _d('OK prepared packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
# Currently we only capture and return the thread id.
|
|
sub parse_server_handshake_packet {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
PTDEBUG && _d('Server handshake data:', $data);
|
|
my $handshake_pattern = qr{
|
|
# Bytes Name
|
|
^ # ----- ----
|
|
(.+?)00 # n Null-Term String server_version
|
|
(.{8}) # 4 thread_id
|
|
.{16} # 8 scramble_buff
|
|
.{2} # 1 filler: always 0x00
|
|
(.{4}) # 2 server_capabilities
|
|
.{2} # 1 server_language
|
|
.{4} # 2 server_status
|
|
.{26} # 13 filler: always 0x00
|
|
# 13 rest of scramble_buff
|
|
}x;
|
|
my ( $server_version, $thread_id, $flags ) = $data =~ m/$handshake_pattern/;
|
|
my $pkt = {
|
|
server_version => to_string($server_version),
|
|
thread_id => to_num($thread_id),
|
|
flags => parse_flags($flags),
|
|
};
|
|
PTDEBUG && _d('Server handshake packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
# Currently we only capture and return the user and default database.
|
|
sub parse_client_handshake_packet {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
PTDEBUG && _d('Client handshake data:', $data);
|
|
my ( $flags, $user, $buff_len ) = $data =~ m{
|
|
^
|
|
(.{8}) # Client flags
|
|
.{10} # Max packet size, charset
|
|
(?:00){23} # Filler
|
|
((?:..)+?)00 # Null-terminated user name
|
|
(..) # Length-coding byte for scramble buff
|
|
}x;
|
|
|
|
# This packet is easy to detect because it's the only case where
|
|
# the server sends the client a packet first (its handshake) and
|
|
# then the client only and ever sends back its handshake.
|
|
if ( !$buff_len ) {
|
|
PTDEBUG && _d('Did not match client handshake packet');
|
|
return;
|
|
}
|
|
|
|
my $code_len = hex($buff_len);
|
|
my $db;
|
|
|
|
# Only try to get the db if CLIENT_CONNECT_WITH_DB flag is set
|
|
# https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse41
|
|
my $capability_flags = to_num($flags); # $flags is stored as little endian.
|
|
|
|
if ($capability_flags & $flag_for{CLIENT_CONNECT_WITH_DB}) {
|
|
( $db ) = $data =~ m!
|
|
^.{64}${user}00.. # Everything matched before
|
|
(?:..){$code_len} # The scramble buffer
|
|
(.*?)00.*\Z # The database name
|
|
!x;
|
|
}
|
|
|
|
my $pkt = {
|
|
user => to_string($user),
|
|
db => $db ? to_string($db) : '',
|
|
flags => parse_flags($flags),
|
|
};
|
|
PTDEBUG && _d('Client handshake packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
# COM data is not 00-terminated, but the the MySQL client appends \0,
|
|
# so we have to use the packet length to know where the data ends.
|
|
sub parse_com_packet {
|
|
my ( $data, $len ) = @_;
|
|
return unless $data && $len;
|
|
PTDEBUG && _d('COM data:',
|
|
(substr($data, 0, 100).(length $data > 100 ? '...' : '')),
|
|
'len:', $len);
|
|
my $code = substr($data, 0, 2);
|
|
my $com = $com_for{$code};
|
|
if ( !$com ) {
|
|
PTDEBUG && _d('Did not match COM packet');
|
|
return;
|
|
}
|
|
if ( $code ne COM_STMT_EXECUTE
|
|
&& $code ne COM_STMT_CLOSE
|
|
&& $code ne COM_STMT_RESET )
|
|
{
|
|
# Data for the most common COM, e.g. COM_QUERY, is text.
|
|
# COM_STMT_EXECUTE is not, so we leave it binary; it can
|
|
# be parsed by parse_execute_packet().
|
|
$data = to_string(substr($data, 2, ($len - 1) * 2));
|
|
}
|
|
my $pkt = {
|
|
code => $code,
|
|
com => $com,
|
|
data => $data,
|
|
};
|
|
PTDEBUG && _d('COM packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
# Execute prepared statement packet structure:
|
|
# Bytes Field
|
|
# =========== ========================================
|
|
# 00 Code 17, COM_STMT_EXECUTE
|
|
# 00 00 00 00 Statement handler ID
|
|
# 00 flags
|
|
# 00 00 00 00 Iteration count (reserved, always 1)
|
|
# (param_count+7)/8 NULL bitmap
|
|
# 00 1 if new parameters, else 0
|
|
# n*2 Parameter types (only if new parameters)
|
|
sub parse_execute_packet {
|
|
my ( $data, $sths ) = @_;
|
|
return unless $data && $sths;
|
|
|
|
my $sth_id = to_num(substr($data, 2, 8));
|
|
return unless defined $sth_id;
|
|
|
|
my $sth = $sths->{$sth_id};
|
|
if ( !$sth ) {
|
|
PTDEBUG && _d('Skipping unknown statement handle', $sth_id);
|
|
return;
|
|
}
|
|
my $null_count = int(($sth->{num_params} + 7) / 8) || 1;
|
|
my $null_bitmap = to_num(substr($data, 20, $null_count * 2));
|
|
PTDEBUG && _d('NULL bitmap:', $null_bitmap, 'count:', $null_count);
|
|
|
|
# This chops off everything up to the byte for new params.
|
|
substr($data, 0, 20 + ($null_count * 2), '');
|
|
|
|
my $new_params = to_num(substr($data, 0, 2, ''));
|
|
my @types;
|
|
if ( $new_params ) {
|
|
PTDEBUG && _d('New param types');
|
|
# It seems all params are type 254, MYSQL_TYPE_STRING. Perhaps
|
|
# this depends on the client. If we ever need these types, they
|
|
# can be saved here. Otherwise for now I just want to see the
|
|
# types in debug output.
|
|
for my $i ( 0..($sth->{num_params}-1) ) {
|
|
my $type = to_num(substr($data, 0, 4, ''));
|
|
push @types, $type_for{$type};
|
|
PTDEBUG && _d('Param', $i, 'type:', $type, $type_for{$type});
|
|
}
|
|
$sth->{types} = \@types;
|
|
}
|
|
else {
|
|
# Retrieve previous param types if there are param vals (data).
|
|
@types = @{$sth->{types}} if $data;
|
|
}
|
|
|
|
# $data should now be truncated up to the parameter values.
|
|
|
|
my $arg = $sth->{statement};
|
|
PTDEBUG && _d('Statement:', $arg);
|
|
for my $i ( 0..($sth->{num_params}-1) ) {
|
|
my $val;
|
|
my $len; # in bytes
|
|
if ( $null_bitmap & (2**$i) ) {
|
|
PTDEBUG && _d('Param', $i, 'is NULL (bitmap)');
|
|
$val = 'NULL';
|
|
$len = 0;
|
|
}
|
|
else {
|
|
if ( $unpack_type{$types[$i]} ) {
|
|
($val, $len) = $unpack_type{$types[$i]}->($data);
|
|
}
|
|
else {
|
|
# TODO: this is probably going to break parsing other param vals
|
|
PTDEBUG && _d('No handler for param', $i, 'type', $types[$i]);
|
|
$val = '?';
|
|
$len = 0;
|
|
}
|
|
}
|
|
|
|
# Replace ? in prepared statement with value.
|
|
PTDEBUG && _d('Param', $i, 'val:', $val);
|
|
$arg =~ s/\?/$val/;
|
|
|
|
# Remove this param val from the data, putting us at the next one.
|
|
substr($data, 0, $len * 2, '') if $len;
|
|
}
|
|
|
|
my $pkt = {
|
|
sth_id => $sth_id,
|
|
arg => "EXECUTE $arg",
|
|
};
|
|
PTDEBUG && _d('Execute packet:', Dumper($pkt));
|
|
return $pkt;
|
|
}
|
|
|
|
sub get_sth_id {
|
|
my ( $data ) = @_;
|
|
return unless $data;
|
|
my $sth_id = to_num(substr($data, 2, 8));
|
|
return $sth_id;
|
|
}
|
|
|
|
sub parse_flags {
|
|
my ( $flags ) = @_;
|
|
die "I need flags" unless $flags;
|
|
PTDEBUG && _d('Flag data:', $flags);
|
|
my %flags = %flag_for;
|
|
my $flags_dec = to_num($flags);
|
|
foreach my $flag ( keys %flag_for ) {
|
|
my $flagno = $flag_for{$flag};
|
|
$flags{$flag} = ($flags_dec & $flagno ? 1 : 0);
|
|
}
|
|
return \%flags;
|
|
}
|
|
|
|
# Takes a scalarref to a hex string of compressed data.
|
|
# Returns a scalarref to a hex string of the uncompressed data.
|
|
# The given hex string of compressed data is not modified.
|
|
sub uncompress_data {
|
|
my ( $data, $len ) = @_;
|
|
die "I need data" unless $data;
|
|
die "I need a len argument" unless $len;
|
|
die "I need a scalar reference to data" unless ref $data eq 'SCALAR';
|
|
PTDEBUG && _d('Uncompressing data');
|
|
our $InflateError;
|
|
|
|
# Pack hex string into compressed binary data.
|
|
my $comp_bin_data = pack('H*', $$data);
|
|
|
|
# Uncompress the compressed binary data.
|
|
my $uncomp_bin_data = '';
|
|
my $z = new IO::Uncompress::Inflate(
|
|
\$comp_bin_data
|
|
) or die "IO::Uncompress::Inflate failed: $InflateError";
|
|
my $status = $z->read(\$uncomp_bin_data, $len)
|
|
or die "IO::Uncompress::Inflate failed: $InflateError";
|
|
|
|
# Unpack the uncompressed binary data back into a hex string.
|
|
# This is the original MySQL packet(s).
|
|
my $uncomp_data = unpack('H*', $uncomp_bin_data);
|
|
|
|
return \$uncomp_data;
|
|
}
|
|
|
|
# Returns 1 on success or 0 on failure. Failure is probably
|
|
# detecting compression but not being able to uncompress
|
|
# (uncompress_packet() returns 0).
|
|
sub detect_compression {
|
|
my ( $self, $packet, $session ) = @_;
|
|
PTDEBUG && _d('Checking for client compression');
|
|
# This is a necessary hack for detecting compression in-stream without
|
|
# having seen the client handshake and CLIENT_COMPRESS flag. If the
|
|
# client is compressing packets, there will be an extra 7 bytes before
|
|
# the regular MySQL header. For short COM_QUERY commands, these 7 bytes
|
|
# are usually zero where we'd expect to see 03 for COM_QUERY. So if we
|
|
# parse this packet and it looks like a COM_SLEEP (00) which is not a
|
|
# command that the client can send, then chances are the client is using
|
|
# compression.
|
|
my $com = parse_com_packet($packet->{data}, $packet->{mysql_data_len});
|
|
if ( $com && $com->{code} eq COM_SLEEP ) {
|
|
PTDEBUG && _d('Client is using compression');
|
|
$session->{compress} = 1;
|
|
|
|
# Since parse_packet() didn't know the packet was compressed, it
|
|
# called remove_mysql_header() which removed the first 4 of 7 bytes
|
|
# of the compression header. We must restore these 4 bytes, then
|
|
# uncompress and remove the MySQL header. We only do this once.
|
|
$packet->{data} = $packet->{mysql_hdr} . $packet->{data};
|
|
return 0 unless $self->uncompress_packet($packet, $session);
|
|
remove_mysql_header($packet);
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Client is NOT using compression');
|
|
$session->{compress} = 0;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
# Returns 1 if the packet was uncompressed or 0 if we can't uncompress.
|
|
# Failure is usually due to IO::Uncompress not being available.
|
|
sub uncompress_packet {
|
|
my ( $self, $packet, $session ) = @_;
|
|
die "I need a packet" unless $packet;
|
|
die "I need a session" unless $session;
|
|
|
|
# From the doc: "A compressed packet header is:
|
|
# packet length (3 bytes),
|
|
# packet number (1 byte),
|
|
# and Uncompressed Packet Length (3 bytes).
|
|
# The Uncompressed Packet Length is the number of bytes
|
|
# in the original, uncompressed packet. If this is zero
|
|
# then the data is not compressed."
|
|
|
|
my $data;
|
|
my $comp_hdr;
|
|
my $comp_data_len;
|
|
my $pkt_num;
|
|
my $uncomp_data_len;
|
|
eval {
|
|
$data = \$packet->{data};
|
|
$comp_hdr = substr($$data, 0, 14, '');
|
|
$comp_data_len = to_num(substr($comp_hdr, 0, 6));
|
|
$pkt_num = to_num(substr($comp_hdr, 6, 2));
|
|
$uncomp_data_len = to_num(substr($comp_hdr, 8, 6));
|
|
PTDEBUG && _d('Compression header data:', $comp_hdr,
|
|
'compressed data len (bytes)', $comp_data_len,
|
|
'number', $pkt_num,
|
|
'uncompressed data len (bytes)', $uncomp_data_len);
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
$session->{EVAL_ERROR} = $EVAL_ERROR;
|
|
$self->fail_session($session, 'failed to parse compression header');
|
|
return 0;
|
|
}
|
|
|
|
if ( $uncomp_data_len ) {
|
|
eval {
|
|
$data = uncompress_data($data, $uncomp_data_len);
|
|
$packet->{data} = $$data;
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
$session->{EVAL_ERROR} = $EVAL_ERROR;
|
|
$self->fail_session($session, 'failed to uncompress data');
|
|
die "Cannot uncompress packet. Check that IO::Uncompress::Inflate "
|
|
. "is installed.\nError: $EVAL_ERROR";
|
|
}
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Packet is not really compressed');
|
|
$packet->{data} = $$data;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
# Removes the first 4 bytes of the packet data which should be
|
|
# a MySQL header: 3 bytes packet length, 1 byte packet number.
|
|
sub remove_mysql_header {
|
|
my ( $packet ) = @_;
|
|
die "I need a packet" unless $packet;
|
|
|
|
# NOTE: the data is modified by the inmost substr call here! If we
|
|
# had all the data in the TCP packets, we could change this to a while
|
|
# loop; while get-a-packet-from-$data, do stuff, etc. But we don't,
|
|
# and we don't want to either.
|
|
my $mysql_hdr = substr($packet->{data}, 0, 8, '');
|
|
my $mysql_data_len = to_num(substr($mysql_hdr, 0, 6));
|
|
my $pkt_num = to_num(substr($mysql_hdr, 6, 2));
|
|
PTDEBUG && _d('MySQL packet: header data', $mysql_hdr,
|
|
'data len (bytes)', $mysql_data_len, 'number', $pkt_num);
|
|
|
|
$packet->{mysql_hdr} = $mysql_hdr;
|
|
$packet->{mysql_data_len} = $mysql_data_len;
|
|
$packet->{number} = $pkt_num;
|
|
|
|
return;
|
|
}
|
|
|
|
# Delete anything we added to the session related to
|
|
# buffering a large query received in multiple packets.
|
|
sub _delete_buff {
|
|
my ( $self, $session ) = @_;
|
|
map { delete $session->{$_} } qw(buff buff_left mysql_data_len);
|
|
return;
|
|
}
|
|
|
|
sub _d {
|
|
my ($package, undef, $line) = caller 0;
|
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
|
map { defined $_ ? $_ : 'undef' }
|
|
@_;
|
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
|
}
|
|
|
|
1;
|
|
}
|
|
# ###########################################################################
|
|
# End MySQLProtocolParser package
|
|
# ###########################################################################
|