mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-01 18:25:59 +00:00

* Remove trailing spaces * PR-665 - Remove trailing spaces - Updated not stable test t/pt-online-schema-change/preserve_triggers.t - Updated utilities in bin directory * PR-665 - Remove trailing spaces - Fixed typos * PR-665 - Remove trailing spaces - Fixed typos --------- Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
287 lines
11 KiB
Perl
287 lines
11 KiB
Perl
# This program is copyright 2011 Baron Schwartz, 2011 Percona Ireland Ltd.
|
|
# Feedback and improvements are welcome.
|
|
#
|
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
|
# licenses.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
# ###########################################################################
|
|
# TCPRequestAggregator package
|
|
# ###########################################################################
|
|
{
|
|
# Package: TCPRequestAggregator
|
|
# TCPRequestAggregator aggregates TCP requests from tcpdump files.
|
|
package TCPRequestAggregator;
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use English qw(-no_match_vars);
|
|
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
|
|
|
use List::Util qw(sum);
|
|
use Data::Dumper;
|
|
|
|
# Required arguments: interval, quantile
|
|
sub new {
|
|
my ( $class, %args ) = @_;
|
|
my @required_args = qw(interval quantile);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my $self = {
|
|
buffer => [],
|
|
last_weighted_time => 0,
|
|
last_busy_time => 0,
|
|
last_completions => 0,
|
|
current_ts => 0,
|
|
%args,
|
|
};
|
|
return bless $self, $class;
|
|
}
|
|
|
|
# This method accepts an open filehandle and callback functions. It reads
|
|
# events from the filehandle and calls the callbacks with each event. $misc is
|
|
# some placeholder for the future and for compatibility with other query
|
|
# sources.
|
|
#
|
|
# The input is the output of mk-tcp-model, like so:
|
|
#
|
|
# 21 1301957863.820001 1301957863.820169 0.000168 10.10.18.253:58297
|
|
# 22 1301957863.821677 1301957863.821839 0.000162 10.10.18.253:43608
|
|
# 23 1301957863.822890 1301957863.823074 0.000184 10.10.18.253:52726
|
|
# 24 1301957863.822895 1301957863.823160 0.000265 10.10.18.253:58297
|
|
#
|
|
# Each event is a hashref of attribute => value pairs as defined in
|
|
# mk-tcp-model's documentation.
|
|
sub parse_event {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(next_event tell);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($next_event, $tell) = @args{@required_args};
|
|
|
|
my $pos_in_log = $tell->();
|
|
my $buffer = $self->{buffer};
|
|
$self->{last_pos_in_log} ||= $pos_in_log;
|
|
|
|
EVENT:
|
|
while ( 1 ) {
|
|
PTDEBUG && _d("Beginning a loop at pos", $pos_in_log);
|
|
my ( $id, $start, $elapsed );
|
|
|
|
my ($timestamp, $direction);
|
|
if ( $self->{pending} ) {
|
|
( $id, $start, $elapsed ) = @{$self->{pending}};
|
|
PTDEBUG && _d("Pulled from pending", @{$self->{pending}});
|
|
}
|
|
elsif ( defined(my $line = $next_event->()) ) {
|
|
# Split the line into ID, start, end, elapsed, and host:port
|
|
my ($end, $host_port);
|
|
( $id, $start, $end, $elapsed, $host_port ) = $line =~ m/(\S+)/g;
|
|
@$buffer = sort { $a <=> $b } ( @$buffer, $end );
|
|
PTDEBUG && _d("Read from the file", $id, $start, $end, $elapsed, $host_port);
|
|
PTDEBUG && _d("Buffer is now", @$buffer);
|
|
}
|
|
if ( $start ) { # Test that we got a line; $id can be 0.
|
|
# We have a line to work on. The next event we need to process is the
|
|
# smaller of a) the arrival recorded in the $start of the line we just
|
|
# read, or b) the first completion recorded in the completions buffer.
|
|
if ( @$buffer && $buffer->[0] < $start ) {
|
|
$direction = 'C'; # Completion
|
|
$timestamp = shift @$buffer;
|
|
$self->{pending} = [ $id, $start, $elapsed ];
|
|
$id = $start = $elapsed = undef;
|
|
PTDEBUG && _d("Completion: using buffered end value", $timestamp);
|
|
PTDEBUG && _d("Saving line to pending", @{$self->{pending}});
|
|
}
|
|
else {
|
|
$direction = 'A'; # Arrival
|
|
$timestamp = $start;
|
|
$self->{pending} = undef;
|
|
PTDEBUG && _d("Deleting pending line");
|
|
PTDEBUG && _d("Arrival: using the line");
|
|
}
|
|
}
|
|
elsif ( @$buffer ) {
|
|
$direction = 'C';
|
|
$timestamp = shift @$buffer;
|
|
PTDEBUG && _d("No more lines, reading from buffer", $timestamp);
|
|
}
|
|
else { # We hit EOF.
|
|
PTDEBUG && _d("No more lines, no more buffered end times");
|
|
if ( $self->{in_prg} ) {
|
|
die "Error: no more lines, but in_prg = $self->{in_prg}";
|
|
}
|
|
if ( defined $self->{t_start}
|
|
&& defined $self->{current_ts}
|
|
&& $self->{t_start} < $self->{current_ts} )
|
|
{
|
|
PTDEBUG && _d("Returning event based on what's been seen");
|
|
return $self->make_event($self->{t_start}, $self->{current_ts});
|
|
}
|
|
else {
|
|
PTDEBUG && _d("No further events to make");
|
|
return;
|
|
}
|
|
}
|
|
|
|
# The notation used here is T_start for start of observation time (T).
|
|
# The divide, int(), and multiply effectively truncates the value to
|
|
# $interval precision.
|
|
my $t_start = int($timestamp / $self->{interval}) * $self->{interval};
|
|
$self->{t_start} ||= $timestamp; # Not $t_start; that'd skew 1st interval.
|
|
PTDEBUG && _d("Timestamp", $timestamp, "interval start time", $t_start);
|
|
|
|
# If $timestamp is not within the current interval, then we need to save
|
|
# everything for later, compute stats for the rest of this interval, and
|
|
# return an event. The next time we are called, we'll begin the next
|
|
# interval.
|
|
if ( $t_start > $self->{t_start} ) {
|
|
PTDEBUG && _d("Timestamp doesn't belong to this interval");
|
|
# We need to compute how much time is left in this interval, and add
|
|
# that much busy_time and weighted_time to the running totals, but only
|
|
# if there is some request in progress.
|
|
if ( $self->{in_prg} ) {
|
|
PTDEBUG && _d("Computing from", $self->{current_ts}, "to", $t_start);
|
|
$self->{busy_time} += $t_start - $self->{current_ts};
|
|
$self->{weighted_time} += ($t_start - $self->{current_ts}) * $self->{in_prg};
|
|
}
|
|
|
|
if ( @$buffer && $buffer->[0] < $t_start ) {
|
|
die "Error: completions for interval remain unprocessed";
|
|
}
|
|
|
|
# Reset running totals and last-time-seen stuff for next iteration,
|
|
# re-buffer the completion or replace the line onto pending, then
|
|
# return the event.
|
|
my $event = $self->make_event($self->{t_start}, $t_start);
|
|
$self->{last_pos_in_log} = $pos_in_log;
|
|
if ( $start ) {
|
|
$self->{pending} = [ $id, $start, $elapsed ];
|
|
}
|
|
else {
|
|
unshift @$buffer, $timestamp;
|
|
}
|
|
return $event;
|
|
}
|
|
|
|
# Otherwise, we need to compute the running sums and keep looping.
|
|
else {
|
|
if ( $self->{in_prg} ) {
|
|
# $self->{current_ts} is intitially 0, which would seem likely to
|
|
# skew this computation. But $self->{in_prg} will be 0 also, and
|
|
# $self->{current_ts} will get set immediately after this, so
|
|
# anytime this if() block runs, it'll be OK.
|
|
PTDEBUG && _d("Computing from", $self->{current_ts}, "to", $timestamp);
|
|
$self->{busy_time} += $timestamp - $self->{current_ts};
|
|
$self->{weighted_time} += ($timestamp - $self->{current_ts}) * $self->{in_prg};
|
|
}
|
|
$self->{current_ts} = $timestamp;
|
|
if ( $direction eq 'A' ) {
|
|
PTDEBUG && _d("Direction A", $timestamp);
|
|
++$self->{in_prg};
|
|
if ( defined $elapsed ) {
|
|
push @{$self->{response_times}}, $elapsed;
|
|
}
|
|
}
|
|
else {
|
|
PTDEBUG && _d("Direction C", $timestamp);
|
|
--$self->{in_prg};
|
|
++$self->{completions};
|
|
}
|
|
}
|
|
|
|
$pos_in_log = $tell->();
|
|
} # EVENT
|
|
|
|
$args{oktorun}->(0) if $args{oktorun};
|
|
return;
|
|
}
|
|
|
|
# Makes an event and returns it. Arguments:
|
|
# $t_start -- the start of the observation period for this event.
|
|
# $t_end -- the end of the observation period for this event.
|
|
sub make_event {
|
|
my ( $self, $t_start, $t_end ) = @_;
|
|
|
|
# Prep a couple of things...
|
|
my $quantile_cutoff = sprintf( "%.0f", # Round to nearest int
|
|
scalar( @{ $self->{response_times} } ) * $self->{quantile} );
|
|
my @times = sort { $a <=> $b } @{ $self->{response_times} };
|
|
my $arrivals = scalar(@times);
|
|
my $sum_times = sum( @times );
|
|
my $mean_times = ($sum_times || 0) / ($arrivals || 1);
|
|
my $var_times = 0;
|
|
if ( @times ) {
|
|
$var_times = sum( map { ($_ - $mean_times) **2 } @times ) / $arrivals;
|
|
}
|
|
|
|
# Compute the parts of the event we'll return.
|
|
my $e_ts
|
|
= int( $self->{current_ts} / $self->{interval} ) * $self->{interval};
|
|
my $e_concurrency = sprintf( "%.6f",
|
|
( $self->{weighted_time} - $self->{last_weighted_time} )
|
|
/ ( $t_end - $t_start ) );
|
|
my $e_arrivals = $arrivals;
|
|
my $e_throughput = sprintf( "%.6f", $e_arrivals / ( $t_end - $t_start ) );
|
|
my $e_completions
|
|
= ( $self->{completions} - $self->{last_completions} );
|
|
my $e_busy_time
|
|
= sprintf( "%.6f", $self->{busy_time} - $self->{last_busy_time} );
|
|
my $e_weighted_time = sprintf( "%.6f",
|
|
$self->{weighted_time} - $self->{last_weighted_time} );
|
|
my $e_sum_time = sprintf("%.6f", $sum_times || 0);
|
|
my $e_variance_mean = sprintf("%.6f", $var_times / ($mean_times || 1));
|
|
my $e_quantile_time = sprintf("%.6f", $times[ $quantile_cutoff - 1 ] || 0);
|
|
|
|
# Construct the event
|
|
my $event = {
|
|
ts => $e_ts,
|
|
concurrency => $e_concurrency,
|
|
throughput => $e_throughput,
|
|
arrivals => $e_arrivals,
|
|
completions => $e_completions,
|
|
busy_time => $e_busy_time,
|
|
weighted_time => $e_weighted_time,
|
|
sum_time => $e_sum_time,
|
|
variance_mean => $e_variance_mean,
|
|
quantile_time => $e_quantile_time,
|
|
pos_in_log => $self->{last_pos_in_log},
|
|
obs_time => sprintf("%.6f", $t_end - $t_start),
|
|
};
|
|
|
|
$self->{t_start} = $t_end; # Not current_timestamp!
|
|
$self->{current_ts} = $t_end; # Next iteration will begin at boundary
|
|
$self->{last_weighted_time} = $self->{weighted_time};
|
|
$self->{last_busy_time} = $self->{busy_time};
|
|
$self->{last_completions} = $self->{completions};
|
|
$self->{response_times} = [];
|
|
|
|
PTDEBUG && _d("Event is", Dumper($event));
|
|
return $event;
|
|
}
|
|
|
|
sub _d {
|
|
my ($package, undef, $line) = caller 0;
|
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
|
map { defined $_ ? $_ : 'undef' }
|
|
@_;
|
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
|
}
|
|
|
|
1;
|
|
}
|
|
# ###########################################################################
|
|
# End TCPRequestAggregator package
|
|
# ###########################################################################
|