Files
percona-toolkit/lib/TCPRequestAggregator.pm
Viktor Szépe 2bd40d8c39 Remove trailing spaces (#665)
* Remove trailing spaces

* PR-665 -  Remove trailing spaces

- Updated not stable test t/pt-online-schema-change/preserve_triggers.t
- Updated utilities in bin directory

* PR-665 -  Remove trailing spaces

- Fixed typos

* PR-665 -  Remove trailing spaces

- Fixed typos

---------

Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
2023-09-06 01:15:12 +03:00

287 lines
11 KiB
Perl

# This program is copyright 2011 Baron Schwartz, 2011 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# TCPRequestAggregator package
# ###########################################################################
{
# Package: TCPRequestAggregator
# TCPRequestAggregator aggregates TCP requests from tcpdump files.
package TCPRequestAggregator;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use List::Util qw(sum);
use Data::Dumper;
# Required arguments: interval, quantile
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(interval quantile);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $self = {
buffer => [],
last_weighted_time => 0,
last_busy_time => 0,
last_completions => 0,
current_ts => 0,
%args,
};
return bless $self, $class;
}
# This method accepts an open filehandle and callback functions. It reads
# events from the filehandle and calls the callbacks with each event. $misc is
# some placeholder for the future and for compatibility with other query
# sources.
#
# The input is the output of mk-tcp-model, like so:
#
# 21 1301957863.820001 1301957863.820169 0.000168 10.10.18.253:58297
# 22 1301957863.821677 1301957863.821839 0.000162 10.10.18.253:43608
# 23 1301957863.822890 1301957863.823074 0.000184 10.10.18.253:52726
# 24 1301957863.822895 1301957863.823160 0.000265 10.10.18.253:58297
#
# Each event is a hashref of attribute => value pairs as defined in
# mk-tcp-model's documentation.
sub parse_event {
my ( $self, %args ) = @_;
my @required_args = qw(next_event tell);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($next_event, $tell) = @args{@required_args};
my $pos_in_log = $tell->();
my $buffer = $self->{buffer};
$self->{last_pos_in_log} ||= $pos_in_log;
EVENT:
while ( 1 ) {
PTDEBUG && _d("Beginning a loop at pos", $pos_in_log);
my ( $id, $start, $elapsed );
my ($timestamp, $direction);
if ( $self->{pending} ) {
( $id, $start, $elapsed ) = @{$self->{pending}};
PTDEBUG && _d("Pulled from pending", @{$self->{pending}});
}
elsif ( defined(my $line = $next_event->()) ) {
# Split the line into ID, start, end, elapsed, and host:port
my ($end, $host_port);
( $id, $start, $end, $elapsed, $host_port ) = $line =~ m/(\S+)/g;
@$buffer = sort { $a <=> $b } ( @$buffer, $end );
PTDEBUG && _d("Read from the file", $id, $start, $end, $elapsed, $host_port);
PTDEBUG && _d("Buffer is now", @$buffer);
}
if ( $start ) { # Test that we got a line; $id can be 0.
# We have a line to work on. The next event we need to process is the
# smaller of a) the arrival recorded in the $start of the line we just
# read, or b) the first completion recorded in the completions buffer.
if ( @$buffer && $buffer->[0] < $start ) {
$direction = 'C'; # Completion
$timestamp = shift @$buffer;
$self->{pending} = [ $id, $start, $elapsed ];
$id = $start = $elapsed = undef;
PTDEBUG && _d("Completion: using buffered end value", $timestamp);
PTDEBUG && _d("Saving line to pending", @{$self->{pending}});
}
else {
$direction = 'A'; # Arrival
$timestamp = $start;
$self->{pending} = undef;
PTDEBUG && _d("Deleting pending line");
PTDEBUG && _d("Arrival: using the line");
}
}
elsif ( @$buffer ) {
$direction = 'C';
$timestamp = shift @$buffer;
PTDEBUG && _d("No more lines, reading from buffer", $timestamp);
}
else { # We hit EOF.
PTDEBUG && _d("No more lines, no more buffered end times");
if ( $self->{in_prg} ) {
die "Error: no more lines, but in_prg = $self->{in_prg}";
}
if ( defined $self->{t_start}
&& defined $self->{current_ts}
&& $self->{t_start} < $self->{current_ts} )
{
PTDEBUG && _d("Returning event based on what's been seen");
return $self->make_event($self->{t_start}, $self->{current_ts});
}
else {
PTDEBUG && _d("No further events to make");
return;
}
}
# The notation used here is T_start for start of observation time (T).
# The divide, int(), and multiply effectively truncates the value to
# $interval precision.
my $t_start = int($timestamp / $self->{interval}) * $self->{interval};
$self->{t_start} ||= $timestamp; # Not $t_start; that'd skew 1st interval.
PTDEBUG && _d("Timestamp", $timestamp, "interval start time", $t_start);
# If $timestamp is not within the current interval, then we need to save
# everything for later, compute stats for the rest of this interval, and
# return an event. The next time we are called, we'll begin the next
# interval.
if ( $t_start > $self->{t_start} ) {
PTDEBUG && _d("Timestamp doesn't belong to this interval");
# We need to compute how much time is left in this interval, and add
# that much busy_time and weighted_time to the running totals, but only
# if there is some request in progress.
if ( $self->{in_prg} ) {
PTDEBUG && _d("Computing from", $self->{current_ts}, "to", $t_start);
$self->{busy_time} += $t_start - $self->{current_ts};
$self->{weighted_time} += ($t_start - $self->{current_ts}) * $self->{in_prg};
}
if ( @$buffer && $buffer->[0] < $t_start ) {
die "Error: completions for interval remain unprocessed";
}
# Reset running totals and last-time-seen stuff for next iteration,
# re-buffer the completion or replace the line onto pending, then
# return the event.
my $event = $self->make_event($self->{t_start}, $t_start);
$self->{last_pos_in_log} = $pos_in_log;
if ( $start ) {
$self->{pending} = [ $id, $start, $elapsed ];
}
else {
unshift @$buffer, $timestamp;
}
return $event;
}
# Otherwise, we need to compute the running sums and keep looping.
else {
if ( $self->{in_prg} ) {
# $self->{current_ts} is intitially 0, which would seem likely to
# skew this computation. But $self->{in_prg} will be 0 also, and
# $self->{current_ts} will get set immediately after this, so
# anytime this if() block runs, it'll be OK.
PTDEBUG && _d("Computing from", $self->{current_ts}, "to", $timestamp);
$self->{busy_time} += $timestamp - $self->{current_ts};
$self->{weighted_time} += ($timestamp - $self->{current_ts}) * $self->{in_prg};
}
$self->{current_ts} = $timestamp;
if ( $direction eq 'A' ) {
PTDEBUG && _d("Direction A", $timestamp);
++$self->{in_prg};
if ( defined $elapsed ) {
push @{$self->{response_times}}, $elapsed;
}
}
else {
PTDEBUG && _d("Direction C", $timestamp);
--$self->{in_prg};
++$self->{completions};
}
}
$pos_in_log = $tell->();
} # EVENT
$args{oktorun}->(0) if $args{oktorun};
return;
}
# Makes an event and returns it. Arguments:
# $t_start -- the start of the observation period for this event.
# $t_end -- the end of the observation period for this event.
sub make_event {
my ( $self, $t_start, $t_end ) = @_;
# Prep a couple of things...
my $quantile_cutoff = sprintf( "%.0f", # Round to nearest int
scalar( @{ $self->{response_times} } ) * $self->{quantile} );
my @times = sort { $a <=> $b } @{ $self->{response_times} };
my $arrivals = scalar(@times);
my $sum_times = sum( @times );
my $mean_times = ($sum_times || 0) / ($arrivals || 1);
my $var_times = 0;
if ( @times ) {
$var_times = sum( map { ($_ - $mean_times) **2 } @times ) / $arrivals;
}
# Compute the parts of the event we'll return.
my $e_ts
= int( $self->{current_ts} / $self->{interval} ) * $self->{interval};
my $e_concurrency = sprintf( "%.6f",
( $self->{weighted_time} - $self->{last_weighted_time} )
/ ( $t_end - $t_start ) );
my $e_arrivals = $arrivals;
my $e_throughput = sprintf( "%.6f", $e_arrivals / ( $t_end - $t_start ) );
my $e_completions
= ( $self->{completions} - $self->{last_completions} );
my $e_busy_time
= sprintf( "%.6f", $self->{busy_time} - $self->{last_busy_time} );
my $e_weighted_time = sprintf( "%.6f",
$self->{weighted_time} - $self->{last_weighted_time} );
my $e_sum_time = sprintf("%.6f", $sum_times || 0);
my $e_variance_mean = sprintf("%.6f", $var_times / ($mean_times || 1));
my $e_quantile_time = sprintf("%.6f", $times[ $quantile_cutoff - 1 ] || 0);
# Construct the event
my $event = {
ts => $e_ts,
concurrency => $e_concurrency,
throughput => $e_throughput,
arrivals => $e_arrivals,
completions => $e_completions,
busy_time => $e_busy_time,
weighted_time => $e_weighted_time,
sum_time => $e_sum_time,
variance_mean => $e_variance_mean,
quantile_time => $e_quantile_time,
pos_in_log => $self->{last_pos_in_log},
obs_time => sprintf("%.6f", $t_end - $t_start),
};
$self->{t_start} = $t_end; # Not current_timestamp!
$self->{current_ts} = $t_end; # Next iteration will begin at boundary
$self->{last_weighted_time} = $self->{weighted_time};
$self->{last_busy_time} = $self->{busy_time};
$self->{last_completions} = $self->{completions};
$self->{response_times} = [];
PTDEBUG && _d("Event is", Dumper($event));
return $event;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End TCPRequestAggregator package
# ###########################################################################