diff --git a/bin/pt-query-digest b/bin/pt-query-digest index 896b1f95..8adde527 100755 --- a/bin/pt-query-digest +++ b/bin/pt-query-digest @@ -81,6 +81,7 @@ our $VERSION = '2.2.1'; # ########################################################################### { package Lmo::Utils; + use strict; use warnings qw( FATAL all ); require Exporter; @@ -88,7 +89,12 @@ our (@ISA, @EXPORT, @EXPORT_OK); BEGIN { @ISA = qw(Exporter); - @EXPORT = @EXPORT_OK = qw(_install_coderef _unimport_coderefs _glob_for _stash_for); + @EXPORT = @EXPORT_OK = qw( + _install_coderef + _unimport_coderefs + _glob_for + _stash_for + ); } { @@ -272,7 +278,6 @@ sub meta { return Lmo::Meta->new(class => $class); } - 1; } # ########################################################################### @@ -3210,10 +3215,25 @@ sub parse_event { $new_query = 1; } elsif ( $curr->[INFO] && defined $curr->[TIME] - && $query_start - $etime - $prev->[START] > $fudge ) { - PTDEBUG && _d('Query restarted; new query', - $query_start, $etime, $prev->[START], $fudge); - $new_query = 1; + && $query_start - $etime - $prev->[START] > $fudge) + { + my $ms = $self->{MasterSlave}; + + my $is_repl_thread = !$ms->is_replication_thread({ + Command => $curr->[COMMAND], + User => $curr->[USER], + State => $curr->[STATE], + Id => $curr->[ID]}); + if ( !$is_repl_thread ) { + PTDEBUG && _d('Query restarted; new query', + $query_start, $etime, $prev->[START], $fudge); + $new_query = 1; + } + elsif ( PTDEBUG ) { + _d(q'Query has a start time beyond the fudge factor, ', + q'but not counting it as a new query because, ', + q{it's a replication thread}); + } } if ( $new_query ) { @@ -3789,6 +3809,7 @@ sub new { sessions => {}, o => $args{o}, fake_thread_id => 2**32, # see _make_event() + null_event => $args{null_event}, }; PTDEBUG && $self->{server} && _d('Watching only server', $self->{server}); return bless $self, $class; @@ -3809,7 +3830,7 @@ sub parse_event { $server .= ":$self->{port}"; if ( $src_host ne $server && $dst_host ne $server ) { PTDEBUG && _d('Packet is not to or from', $server); - return; + return $self->{null_event}; } } @@ -3825,7 +3846,7 @@ sub parse_event { } else { PTDEBUG && _d('Packet is not to or from a MySQL server'); - return; + return $self->{null_event}; } PTDEBUG && _d('Client', $client); @@ -3843,7 +3864,7 @@ sub parse_event { else { PTDEBUG && _d('Ignoring mid-stream', $packet_from, 'data,', 'packetno', $packetno); - return; + return $self->{null_event}; } $self->{sessions}->{$client} = { @@ -3886,7 +3907,7 @@ sub parse_event { delete $self->{sessions}->{$session->{client}}; return $event; } - return; + return $self->{null_event}; } if ( $session->{compress} ) { @@ -3912,7 +3933,7 @@ sub parse_event { PTDEBUG && _d('remove_mysql_header() failed; failing session'); $session->{EVAL_ERROR} = $EVAL_ERROR; $self->fail_session($session, 'remove_mysql_header() failed'); - return; + return $self->{null_event}; } } @@ -3927,7 +3948,7 @@ sub parse_event { $self->_delete_buff($session); } else { - return; # waiting for more data; buff_left was reported earlier + return $self->{null_event}; # waiting for more data; buff_left was reported earlier } } elsif ( $packet->{mysql_data_len} > ($packet->{data_len} - 4) ) { @@ -3948,7 +3969,7 @@ sub parse_event { PTDEBUG && _d('Data not complete; expecting', $session->{buff_left}, 'more bytes'); - return; + return $self->{null_event}; } if ( $session->{cmd} && ($session->{state} || '') eq 'awaiting_reply' ) { @@ -3971,7 +3992,7 @@ sub parse_event { } $args{stats}->{events_parsed}++ if $args{stats}; - return $event; + return $event || $self->{null_event}; } sub _packet_from_server { diff --git a/lib/Processlist.pm b/lib/Processlist.pm index e53eca3b..1780de1e 100644 --- a/lib/Processlist.pm +++ b/lib/Processlist.pm @@ -245,14 +245,29 @@ sub parse_event { $new_query = 1; } elsif ( $curr->[INFO] && defined $curr->[TIME] - && $query_start - $etime - $prev->[START] > $fudge ) { + && $query_start - $etime - $prev->[START] > $fudge) + { # If the query's recalculated start time minus its previously # calculated start time is greater than the fudge factor, then # the query has restarted. I.e. the new start time is after # the previous start time. - PTDEBUG && _d('Query restarted; new query', - $query_start, $etime, $prev->[START], $fudge); - $new_query = 1; + my $ms = $self->{MasterSlave}; + + my $is_repl_thread = $ms->is_replication_thread({ + Command => $curr->[COMMAND], + User => $curr->[USER], + State => $curr->[STATE], + Id => $curr->[ID]}); + if ( !$is_repl_thread ) { + PTDEBUG && _d('Query restarted; new query', + $query_start, $etime, $prev->[START], $fudge); + $new_query = 1; + } + elsif ( PTDEBUG ) { + _d(q'Query has a start time beyond the fudge factor, ', + q'but not counting it as a new query because, ', + q{it's a replication thread}); + } } if ( $new_query ) { diff --git a/t/lib/Processlist.t b/t/lib/Processlist.t index afe088ba..1d96a7ac 100644 --- a/t/lib/Processlist.t +++ b/t/lib/Processlist.t @@ -335,9 +335,118 @@ is_deeply( "New query2_2 is active, starting at 05:08" ); +# + +# This is basically the same thing as above, but we're pretending to +# be a repl thread, so it should behave differently. + +$pl = Processlist->new(MasterSlave=>$ms); + +parse_n_times( + 1, + code => sub { + return [ + [ 2, 'system user', 'localhost', 'test', 'Query', 0, 'executing', 'query2_2'], + ], + }, + time => Transformers::unix_timestamp('2001-01-01 00:05:03'), + etime => 3.14159, +); + +is_deeply( + $pl->_get_active_cxn(), + { + 2 => [ + 2, 'system user', 'localhost', 'test', 'Query', 0, 'executing', 'query2_2', + Transformers::unix_timestamp('2001-01-01 00:05:03'), # START + 3.14159, # ETIME + Transformers::unix_timestamp('2001-01-01 00:05:03'), # FSEEN + { executing => 0 }, + ], + }, + 'query2_2 just started', +); + +# And there is no event on cxn 2. +is( + scalar @events, + 0, + 'query2_2 has not fired yet', +); + +# In this sample, the "same" query is running one second later and this time it +# seems to have a start time of 5 secs later, which is not enough to be a new +# query. +parse_n_times( + 1, + code => sub { + return [ + [ 2, 'system user', 'localhost', 'test', 'Query', 0, 'executing', 'query2_2'], + ], + }, + time => Transformers::unix_timestamp('2001-01-01 00:05:05'), + etime => 2.718, +); + +is( + scalar @events, + 0, + 'query2_2 has not fired yet', +); + +# And so as a result, query2_2 has NOT fired, but the query is still active. +is_deeply( + $pl->_get_active_cxn(), + { + 2 => [ + 2, 'system user', 'localhost', 'test', 'Query', 0, 'executing', 'query2_2', + Transformers::unix_timestamp('2001-01-01 00:05:03'), + 3.14159, + Transformers::unix_timestamp('2001-01-01 00:05:03'), + { executing => 2 }, + ], + }, + 'Cxn 2 still active with query starting at 05:03', +); + +# But wait! There's another! And this time we catch it! +parse_n_times( + 1, + code => sub { + return [ + [ 2, 'system user', 'localhost', 'test', 'Query', 0, 'executing', 'query2_2'], + ], + }, + time => Transformers::unix_timestamp('2001-01-01 00:05:08.500'), + etime => 0.123, +); + +is_deeply( + \@events, + [], + 'Original query2_2 not fired because we are a repl thrad', +); + +# And so as a result, query2_2 has fired and the prev array contains the "new" +# query2_2. +is_deeply( + $pl->_get_active_cxn(), + { + 2 => [ + 2, 'system user', 'localhost', 'test', 'Query', 0, 'executing', 'query2_2', + Transformers::unix_timestamp('2001-01-01 00:05:03'), # START + 3.14159, # ETIME + Transformers::unix_timestamp('2001-01-01 00:05:03'), # FSEEN + { executing => 5.5 }, + ], + }, + "Old query2_2 is active" +); + # ########################################################################### # Issue 867: Make mk-query-digest detect Lock_time from processlist # ########################################################################### +$ms = new MasterSlave(OptionParser=>1,DSNParser=>1,Quoter=>1); $pl = Processlist->new(MasterSlave=>$ms); # For 2/10ths of a second, the query is Locked. First time we see this