Megre pt-agent branch.

This commit is contained in:
Daniel Nichter
2013-06-16 23:23:11 -07:00
64 changed files with 25198 additions and 682 deletions

9343
bin/pt-agent Executable file

File diff suppressed because it is too large Load Diff

View File

@@ -4897,6 +4897,7 @@ sub new {
my ( $class ) = @_;
my $self = {
pending => [],
last_event_offset => undef,
};
return bless $self, $class;
}
@@ -4933,6 +4934,7 @@ sub parse_event {
or defined($stmt = $next_event->())
) {
my @properties = ('cmd', 'Query', 'pos_in_log', $pos_in_log);
$self->{last_event_offset} = $pos_in_log;
$pos_in_log = $tell->();
if ( $stmt =~ s/$slow_log_hd_line//go ){ # Throw away header lines in log
@@ -5058,9 +5060,15 @@ sub parse_event {
PTDEBUG && _d('Properties of event:', Dumper(\@properties));
my $event = { @properties };
if ( $args{stats} ) {
$args{stats}->{events_read}++;
$args{stats}->{events_parsed}++;
if ( !$event->{arg} ) {
PTDEBUG && _d('Partial event, no arg');
}
else {
$self->{last_event_offset} = undef;
if ( $args{stats} ) {
$args{stats}->{events_read}++;
$args{stats}->{events_parsed}++;
}
}
return $event;
} # EVENT
@@ -6654,7 +6662,7 @@ sub hostname {
sub files {
my ( $self, %args ) = @_;
if ( $args{files} ) {
return "# Files: " . join(', ', @{$args{files}}) . "\n";
return "# Files: " . join(', ', map { $_->{name} } @{$args{files}}) . "\n";
}
return;
}
@@ -7658,17 +7666,50 @@ use constant PTDEBUG => $ENV{PTDEBUG} || 0;
my $have_json = eval { require JSON };
our $pretty_json = 0;
our $sorted_json = 0;
our $pretty_json = $ENV{PTTEST_PRETTY_JSON} || 0;
our $sorted_json = $ENV{PTTEST_PRETTY_JSON} || 0;
extends qw(QueryReportFormatter);
has 'QueryRewriter' => (
is => 'ro',
isa => 'Object',
required => 1,
);
has 'QueryParser' => (
is => 'ro',
isa => 'Object',
required => 1,
);
has 'Quoter' => (
is => 'ro',
isa => 'Object',
required => 1,
);
has _json => (
is => 'ro',
init_arg => undef,
builder => '_build_json',
);
has 'max_query_length' => (
is => 'rw',
isa => 'Int',
required => 0,
default => sub { return 10_000; }, # characters, not bytes
);
has 'max_fingerprint_length' => (
is => 'rw',
isa => 'Int',
required => 0,
default => sub { return 5_000; }, # characters, not bytes
);
sub _build_json {
return unless $have_json;
return JSON->new->utf8
@@ -7700,15 +7741,102 @@ override query_report => sub {
foreach my $arg ( qw(ea worst orderby groupby) ) {
die "I need a $arg argument" unless defined $arg;
}
my $ea = $args{ea};
my $worst = $args{worst};
my $orderby = $args{orderby};
my $groupby = $args{groupby};
my $ea = $args{ea};
my $worst = $args{worst};
my $results = $ea->results();
my @attribs = @{$ea->get_attributes()};
my %string_args = map { $_ => 1 } qw( db host arg user bytes pos_in_log );
my @queries;
my $q = $self->Quoter;
my $qr = $self->QueryRewriter;
my $global_data = {
metrics => {},
files => $args{files},
($args{resume} && scalar keys %{$args{resume}} ? (resume => $args{resume}) : ()),
};
my $global_cnt = $results->{globals}->{$orderby}->{cnt} || 0;
my $global_unq = scalar keys %{$results->{classes}};
my ($qps, $conc) = (0, 0);
if ( $global_cnt && $results->{globals}->{ts}
&& ($results->{globals}->{ts}->{max} || '')
gt ($results->{globals}->{ts}->{min} || '') )
{
eval {
my $min = parse_timestamp($results->{globals}->{ts}->{min});
my $max = parse_timestamp($results->{globals}->{ts}->{max});
my $diff = unix_timestamp($max) - unix_timestamp($min);
$qps = $global_cnt / ($diff || 1);
$conc = $results->{globals}->{$orderby}->{sum} / $diff;
};
}
$global_data->{query_count} = $global_cnt;
$global_data->{unique_query_count} = $global_unq;
$global_data->{queries_per_second} = $qps if $qps;
$global_data->{concurrency} = $conc if $conc;
my %hidden_attrib = (
arg => 1,
fingerprint => 1,
pos_in_log => 1,
ts => 1,
);
foreach my $attrib ( grep { !$hidden_attrib{$_} } @attribs ) {
my $type = $ea->type_for($attrib) || 'string';
next if $type eq 'string';
next unless exists $results->{globals}->{$attrib};
my $store = $results->{globals}->{$attrib};
my $metrics = $ea->stats()->{globals}->{$attrib};
my $int = $attrib =~ m/(?:time|wait)$/ ? 0 : 1;
my $real_attrib = $attrib eq 'bytes' ? 'Query_length' : $attrib;
if ( $type eq 'num' ) {
foreach my $m ( qw(sum min max) ) {
if ( $int ) {
$global_data->{metrics}->{$real_attrib}->{$m}
= sprintf('%d', $store->{$m} || 0);
}
else { # microsecond
$global_data->{metrics}->{$real_attrib}->{$m}
= sprintf('%.6f', $store->{$m} || 0);
}
}
foreach my $m ( qw(pct_95 stddev median) ) {
if ( $int ) {
$global_data->{metrics}->{$real_attrib}->{$m}
= sprintf('%d', $metrics->{$m} || 0);
}
else { # microsecond
$global_data->{metrics}->{$real_attrib}->{$m}
= sprintf('%.6f', $metrics->{$m} || 0);
}
}
if ( $int ) {
$global_data->{metrics}->{$real_attrib}->{avg}
= sprintf('%d', $store->{sum} / $store->{cnt});
}
else {
$global_data->{metrics}->{$real_attrib}->{avg}
= sprintf('%.6f', $store->{sum} / $store->{cnt});
}
}
elsif ( $type eq 'bool' ) {
my $store = $results->{globals}->{$real_attrib};
$global_data->{metrics}->{$real_attrib}->{cnt}
= sprintf('%d', $store->{sum});
}
}
my @classes;
foreach my $worst_info ( @$worst ) {
my $item = $worst_info->[0];
my $stats = $ea->results->{classes}->{$item};
@@ -7716,17 +7844,29 @@ override query_report => sub {
my $all_log_pos = $ea->{result_classes}->{$item}->{pos_in_log}->{all};
my $times_seen = sum values %$all_log_pos;
my %class = (
sample => $sample->{arg},
fingerprint => $item,
checksum => make_checksum($item),
cnt => $times_seen,
);
my $distill = $groupby eq 'fingerprint' ? $qr->distill($sample->{arg})
: undef;
my $fingerprint = substr($item, 0, $self->max_fingerprint_length);
my $checksum = make_checksum($item);
my $class = {
checksum => $checksum,
fingerprint => $fingerprint,
distillate => $distill,
attribute => $groupby,
query_count => $times_seen,
example => {
query => substr($sample->{arg}, 0, $self->max_query_length),
ts => $sample->{ts} ? parse_timestamp($sample->{ts}) : undef,
},
};
my %metrics;
foreach my $attrib ( @attribs ) {
$metrics{$attrib} = $ea->metrics(
my $real_attrib = $attrib eq 'bytes' ? 'Query_length' : $attrib;
next if $real_attrib eq 'Rows_affected'
&& $distill && $distill =~ m/^(?:SELECT|SHOW|SET|ADMIN)/;
$metrics{$real_attrib} = $ea->metrics(
attrib => $attrib,
where => $item,
);
@@ -7737,6 +7877,8 @@ override query_report => sub {
delete $metrics{$attrib};
next;
}
delete $metrics{pos_in_log};
delete $metrics{$attrib}->{cnt};
if ($attrib eq 'ts') {
my $ts = delete $metrics{ts};
@@ -7744,31 +7886,87 @@ override query_report => sub {
next unless defined $ts && defined $ts->{$thing};
$ts->{$thing} = parse_timestamp($ts->{$thing});
}
$class{ts_min} = $ts->{min};
$class{ts_max} = $ts->{max};
$class->{ts_min} = $ts->{min};
$class->{ts_max} = $ts->{max};
}
elsif ( $string_args{$attrib} ) {
$metrics{$attrib} = { value => $metrics{$attrib}{max} };
}
elsif ( ($ea->{type_for}->{$attrib} || '') eq 'num' ) {
for my $value ( values %{$metrics{$attrib}} ) {
next unless $value;
$value = sprintf '%.6f', $value;
else {
my $type = $attrib eq 'Query_length' ? 'num' : $ea->type_for($attrib) || 'string';
if ( $type eq 'string' ) {
$metrics{$attrib} = { value => $metrics{$attrib}{max} };
}
if ( my $pct = $metrics{$attrib}->{pct} ) {
$metrics{$attrib}->{pct} = sprintf('%.2f', $pct);
elsif ( $type eq 'num' ) {
foreach my $value ( values %{$metrics{$attrib}} ) {
next unless defined $value;
if ( $attrib =~ m/_(?:time|wait)$/ ) {
$value = sprintf('%.6f', $value);
}
else {
$value = sprintf('%d', $value);
}
}
}
elsif ( $type eq 'bool' ) {
$metrics{$attrib} = {
yes => sprintf('%d', $metrics{$attrib}->{sum}),
};
}
}
}
push @queries, {
class => \%class,
attributes => \%metrics,
};
my @tables;
if ( $groupby eq 'fingerprint' ) {
my $default_db = $sample->{db} ? $sample->{db}
: $stats->{db}->{unq} ? keys %{$stats->{db}->{unq}}
: undef;
my @table_names = $self->QueryParser->extract_tables(
query => $sample->{arg} || '',
default_db => $default_db,
Quoter => $q,
);
foreach my $db_tbl ( @table_names ) {
my ( $db, $tbl ) = @$db_tbl;
my $status
= 'SHOW TABLE STATUS'
. ($db ? " FROM `$db`" : '')
. " LIKE '$tbl'\\G";
my $create
= "SHOW CREATE TABLE "
. $q->quote(grep { $_ } @$db_tbl)
. "\\G";
push @tables, { status => $status, create => $create };
}
if ( $item =~ m/^(?:[\(\s]*select|insert|replace)/ ) {
if ( $item =~ m/^(?:insert|replace)/ ) {
}
else {
}
}
else {
my $converted = $qr->convert_to_select(
$sample->{arg} || '',
);
if ( $converted && $converted =~ m/^[\(\s]*select/i ) {
$class->{example}->{as_select} = $converted;
}
}
}
$class->{metrics} = \%metrics;
if ( @tables ) {
$class->{tables} = \@tables;
}
push @classes, $class;
}
my $json = $self->encode_json(\@queries);
$json .= "\n" if $json !~ /\n\Z/;
return $json . "\n";
my $data = {
global => $global_data,
classes => \@classes,
};
my $json = $self->encode_json($data);
$json .= "\n" unless $json =~ /\n\Z/;
return $json;
};
no Lmo;
@@ -12523,7 +12721,9 @@ my $ps_dbh; # For Processlist
my $aux_dbh; # For --aux-dsn (--since/--until "MySQL expression")
my $resume_file;
my $resume = {};
my $offset;
my $exit_status = 0;
(my $tool = __PACKAGE__) =~ tr/_/-/;
@@ -12531,8 +12731,9 @@ sub main {
# Reset global vars, else tests will fail.
local @ARGV = @_;
$oktorun = 1;
$resume_file = undef;
$resume = {};
$offset = undef;
$exit_status = 0;
# ##########################################################################
# Get configuration information.
@@ -12816,30 +13017,79 @@ sub main {
if ( $fh ) {
PTDEBUG && _d('Reading', $filename);
PTDEBUG && _d('File size:', $filesize);
push @read_files, $filename || "STDIN";
push @read_files, { name => ($filename || "STDIN"), size => $filesize };
# Read the file offset for --resume.
if ( ($resume_file = $o->get('resume')) && $filename ) {
if ( -s $resume_file ) {
open my $resume_fh, '<', $resume_file
or die "Error opening $resume_file: $OS_ERROR";
chomp(my $resume_offset = <$resume_fh>);
open my $resume_fh, "<", $resume_file
or die "Cannot open $resume_file: $OS_ERROR";
my $resume_offset = do { local $/; <$resume_fh> };
close $resume_fh
or die "Error close $resume_file: $OS_ERROR";
if ( !looks_like_number($resume_offset) ) {
die "Offset $resume_offset in $resume_file "
. "does not look like a number.\n";
chomp($resume_offset) if $resume_offset;
if ( looks_like_number($resume_offset) ) {
PTDEBUG && _d('Resuming at offset', $resume_offset);
$resume->{simple} = 1;
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "# Resuming $filename from offset "
. "$resume_offset (file size: $filesize)...\n";
}
else {
$resume->{simple} = 0; # enhanced resume file
map {
my $line = $_;
chomp $line;
my ($key, $value) = split('=', $line);
if ( !$key
|| !defined $value
|| !looks_like_number($value)
|| $value < 0 )
{
$exit_status = 1;
warn "Invalid line in --resume $resume_file: $line\n";
$oktorun = 0;
return;
}
$resume->{$key} = $value;
} split("\n", $resume_offset);
if ( $resume->{end_offset} &&
$resume->{end_offset} <=
($resume->{stop_offset} || 0) )
{
close $args->{input_fh} if $args->{input_fh};
$args->{input_fh} = undef;
$args->{more_events} = 0;
$oktorun = 0;
$resume_file = '';
warn "# Not resuming $filename because "
. "end_offset $resume->{end_offset} is "
. "less than or equal to stop_offset "
. ($resume->{stop_offset} || 0) . "\n";
}
else {
$resume_offset = $resume->{stop_offset}
|| $resume->{start_offset}
|| 0;
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "# Resuming $filename from offset "
. "$resume_offset to "
. ($resume->{end_offset} ? $resume->{end_offset}
: "end of file")
. " (file size: $filesize)...\n";
}
}
PTDEBUG && _d('Resuming at offset', $resume_offset);
seek $fh, $resume_offset, 0
or die "Error seeking to $resume_offset in "
. "$resume_file: $OS_ERROR";
warn "Resuming $filename from offset $resume_offset "
. "(file size: $filesize)...\n";
}
else {
PTDEBUG && _d('Not resuming', $filename, 'because',
$resume_file, 'does not exist');
warn "# Resuming $filename from offset 0 because "
. "resume file $filename does not exist "
. "(file size: $filesize)...\n";
$resume->{simple} = 0;
$resume->{start_offset} = 0;
}
}
@@ -12883,13 +13133,24 @@ sub main {
$args->{more_events} = 0;
}
}
$pr->update($args->{tell}) if $pr;
elsif ( $resume->{end_offset}
&& $offset >= $resume->{end_offset} ) {
PTDEBUG && _d('Offset', $offset, 'at end_offset',
$resume->{end_offset});
close $args->{input_fh} if $args->{input_fh};
$args->{input_fh} = undef;
$args->{more_events} = 0;
}
else {
$pr->update($args->{tell}) if $pr;
}
return $args;
},
);
} # input
my $ps_dsn;
my @parsers;
{ # event
my $misc;
if ( $ps_dsn = $o->get('processlist') ) {
@@ -12991,7 +13252,8 @@ sub main {
}
die "Failed to load $module module: $EVAL_ERROR";
}
push @parsers, $parser;
$pipeline->add(
name => ref $parser,
process => sub {
@@ -13244,6 +13506,10 @@ sub main {
if ( $report ) {
PTDEBUG && _d("Iteration", $args->{iter}, "stopped at",ts(time));
save_resume_offset(
last_event_offset => $parsers[0]->{last_event_offset},
);
# Get this before calling print_reports() because that sub
# resets each ea and we may need this later for stats.
my $n_events_aggregated = $ea[0]->events_processed();
@@ -13262,7 +13528,9 @@ sub main {
);
}
else {
print "\n# No events processed.\n";
if ( $o->get('output') eq 'report' ) {
print "\n# No events processed.\n";
}
}
if ( PTDEBUG ) {
@@ -13596,6 +13864,9 @@ sub main {
Last_errno => 'string',
Thread_id => 'string',
InnoDB_trx_id => 'string',
host => 'string',
ip => 'string',
port => 'string',
Killed => 'bool',
};
@@ -13686,7 +13957,9 @@ sub main {
}
PTDEBUG && _d("Pipeline data:", Dumper($pipeline_data));
save_resume_offset();
save_resume_offset(
last_event_offset => $parsers[0]->{last_event_offset},
);
# Disconnect all open $dbh's
map {
@@ -13696,7 +13969,7 @@ sub main {
grep { $_ }
($qv_dbh, $qh_dbh, $ps_dbh, $ep_dbh, $aux_dbh);
return 0;
return $exit_status;
} # End main()
# ############################################################################
@@ -13858,7 +14131,8 @@ sub print_reports {
files => $args{files},
log_type => $o->get('type')->[0],
variations => $o->get('variations'),
group => { map { $_=>1 } qw(rusage date hostname files header) }
group => { map { $_=>1 } qw(rusage date hostname files header) },
resume => $resume,
);
}
@@ -14213,6 +14487,9 @@ sub verify_run_time {
}
sub save_resume_offset {
my (%args) = @_;
my $last_event_offset = $args{last_event_offset};
if ( !$resume_file || !$offset ) {
PTDEBUG && _d('Not saving resume offset because there is no '
. 'resume file or offset:', $resume_file, $offset);
@@ -14222,10 +14499,26 @@ sub save_resume_offset {
PTDEBUG && _d('Saving resume at offset', $offset, 'to', $resume_file);
open my $resume_fh, '>', $resume_file
or die "Error opening $resume_file: $OS_ERROR";
print { $resume_fh } $offset, "\n";
if ( $resume->{simple} ) {
print { $resume_fh } $offset, "\n";
warn "\n# Saved resume file offset $offset to $resume_file\n";
}
else {
# 2.2.3+ enhanced resume file
$resume->{stop_offset} = defined $last_event_offset ? $last_event_offset
: $offset;
foreach my $key ( sort keys %$resume ) {
next if $key eq 'simple';
print { $resume_fh } "$key=$resume->{$key}\n";
}
warn "\n# Saved resume file stop_offset $resume->{stop_offset} to "
. "$resume_file\n";
}
close $resume_fh
or die "Error close $resume_file: $OS_ERROR";
warn "\n# Saved resume file offset $offset to $resume_file\n";
return;
}