Files
percona-toolkit/lib/CompareResults.pm
Viktor Szépe 2bd40d8c39 Remove trailing spaces (#665)
* Remove trailing spaces

* PR-665 -  Remove trailing spaces

- Updated not stable test t/pt-online-schema-change/preserve_triggers.t
- Updated utilities in bin directory

* PR-665 -  Remove trailing spaces

- Fixed typos

* PR-665 -  Remove trailing spaces

- Fixed typos

---------

Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
2023-09-06 01:15:12 +03:00

1015 lines
32 KiB
Perl

# This program is copyright 2009-2011 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# CompareResults package
# ###########################################################################
{
# Package: CompareResults
# CompareResults compares the query result sets.
package CompareResults;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use Time::HiRes qw(time);
use Data::Dumper;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
# Required args:
# * method scalar: "checksum" or "rows"
# * base-dir scalar: dir used by rows method to write outfiles
# * plugins arrayref: TableSync* plugins used by rows method
# * get_id coderef: used by report() to trf query to its ID
# * common modules
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(method base-dir plugins get_id
QueryParser TableParser TableSyncer Quoter);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $self = {
%args,
diffs => {},
samples => {},
};
return bless $self, $class;
}
# Required args:
# * event hashref: an event
# * dbh scalar: active dbh
# Optional args:
# * db scalar: database name to create temp table in unless...
# * temp-database scalar: ...temp db name is given
# * temp-table scalar: temp table name
# Returns: hashref
# Can die: yes
# before_execute() drops the temp table if the method is checksum.
# db and temp-table are required for the checksum method, but optional
# for the rows method.
sub before_execute {
my ( $self, %args ) = @_;
my @required_args = qw(event dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($event, $dbh) = @args{@required_args};
my $sql;
if ( $self->{method} eq 'checksum' ) {
my ($db, $tmp_tbl) = @args{qw(db temp-table)};
$db = $args{'temp-database'} if $args{'temp-database'};
die "Cannot checksum results without a database"
unless $db;
$tmp_tbl = $self->{Quoter}->quote($db, $tmp_tbl);
eval {
$sql = "DROP TABLE IF EXISTS $tmp_tbl";
PTDEBUG && _d($sql);
$dbh->do($sql);
# Deprecated since MySQL 5.7
#$sql = "SET storage_engine=MyISAM";
PTDEBUG && _d($sql);
$dbh->do($sql);
};
die "Failed to drop temporary table $tmp_tbl: $EVAL_ERROR"
if $EVAL_ERROR;
# Save the tmp tbl; it's used later in _compare_checksums().
$event->{tmp_tbl} = $tmp_tbl;
# Wrap the original query so when it's executed its results get
# put in tmp table.
$event->{wrapped_query}
= "CREATE TEMPORARY TABLE $tmp_tbl AS $event->{arg}";
PTDEBUG && _d('Wrapped query:', $event->{wrapped_query});
}
return $event;
}
# Required args:
# * event hashref: an event
# * dbh scalar: active dbh
# Returns: hashref
# Can die: yes
# execute() executes the event's query. Any prep work should have
# been done in before_execute(). For the checksum method, this simply
# executes the wrapped query. For the rows method, this gets/saves
# a statement handle for the results in the event which is processed
# later in compare(). Both methods add the Query_time attrib to the
# event.
sub execute {
my ( $self, %args ) = @_;
my @required_args = qw(event dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($event, $dbh) = @args{@required_args};
my ( $start, $end, $query_time );
# Other modules should only execute the query if Query_time does not
# already exist. This module requires special execution so we always
# execute.
PTDEBUG && _d('Executing query');
$event->{Query_time} = 0;
if ( $self->{method} eq 'rows' ) {
my $query = $event->{arg};
my $sth;
eval {
$sth = $dbh->prepare($query);
};
die "Failed to prepare query: $EVAL_ERROR" if $EVAL_ERROR;
eval {
$start = time();
$sth->execute();
$end = time();
$query_time = sprintf '%.6f', $end - $start;
};
die "Failed to execute query: $EVAL_ERROR" if $EVAL_ERROR;
$event->{results_sth} = $sth;
}
else {
die "No wrapped query" unless $event->{wrapped_query};
my $query = $event->{wrapped_query};
eval {
$start = time();
$dbh->do($query);
$end = time();
$query_time = sprintf '%.6f', $end - $start;
};
if ( $EVAL_ERROR ) {
delete $event->{wrapped_query};
delete $event->{tmp_tbl};
die "Failed to execute query: $EVAL_ERROR";
}
}
$event->{Query_time} = $query_time;
return $event;
}
# Required args:
# * event hashref: an event
# Optional args:
# * dbh scalar: active dbh
# Returns: hashref
# Can die: yes
# after_execute() does any post-execution cleanup. The results should
# not be compared here; no anaylytics here, save that for compare().
sub after_execute {
my ( $self, %args ) = @_;
my @required_args = qw(event);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
return $args{event};
}
# Required args:
# * events arrayref: events
# * hosts arrayref: hosts hashrefs with at least a dbh key
# Optional args:
# * temp-database scalar: temp db name
# Returns: array
# Can die: yes
# compare() compares events that have been run through before_execute(),
# execute() and after_execute(). The checksum method primarily compares
# the checksum attribs saved in the events. The rows method uses the
# result statement handles saved in the events to compare rows and column
# values. Each method returns an array of key => value pairs which the
# caller should aggregate into a meta-event that represents differences
# compare() has found in these events. Only a "summary" of differences is
# returned. Specific differences are saved internally and are reported
# by calling report() later.
sub compare {
my ( $self, %args ) = @_;
my @required_args = qw(events hosts);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($events, $hosts) = @args{@required_args};
return $self->{method} eq 'rows' ? $self->_compare_rows(%args)
: $self->_compare_checksums(%args);
}
sub _compare_checksums {
my ( $self, %args ) = @_;
my @required_args = qw(events hosts);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($events, $hosts) = @args{@required_args};
my $different_row_counts = 0;
my $different_column_counts = 0; # TODO
my $different_column_types = 0; # TODO
my $different_checksums = 0;
my $n_events = scalar @$events;
foreach my $i ( 0..($n_events-1) ) {
$events->[$i] = $self->_checksum_results(
event => $events->[$i],
dbh => $hosts->[$i]->{dbh},
);
if ( $i ) {
if ( ($events->[0]->{checksum} || 0)
!= ($events->[$i]->{checksum}||0) ) {
$different_checksums++;
}
if ( ($events->[0]->{row_count} || 0)
!= ($events->[$i]->{row_count} || 0) ) {
$different_row_counts++
}
delete $events->[$i]->{wrapped_query};
}
}
delete $events->[0]->{wrapped_query};
# Save differences.
my $item = $events->[0]->{fingerprint} || $events->[0]->{arg};
my $sampleno = $events->[0]->{sampleno} || 0;
if ( $different_checksums ) {
$self->{diffs}->{checksums}->{$item}->{$sampleno}
= [ map { $_->{checksum} } @$events ];
$self->{samples}->{$item}->{$sampleno} = $events->[0]->{arg};
}
if ( $different_row_counts ) {
$self->{diffs}->{row_counts}->{$item}->{$sampleno}
= [ map { $_->{row_count} } @$events ];
$self->{samples}->{$item}->{$sampleno} = $events->[0]->{arg};
}
return (
different_row_counts => $different_row_counts,
different_checksums => $different_checksums,
different_column_counts => $different_column_counts,
different_column_types => $different_column_types,
);
}
sub _checksum_results {
my ( $self, %args ) = @_;
my @required_args = qw(event dbh);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($event, $dbh) = @args{@required_args};
my $sql;
my $n_rows = 0;
my $tbl_checksum = 0;
if ( $event->{wrapped_query} && $event->{tmp_tbl} ) {
my $tmp_tbl = $event->{tmp_tbl};
eval {
$sql = "SELECT COUNT(*) FROM $tmp_tbl";
PTDEBUG && _d($sql);
($n_rows) = @{ $dbh->selectcol_arrayref($sql) };
$sql = "CHECKSUM TABLE $tmp_tbl";
PTDEBUG && _d($sql);
$tbl_checksum = $dbh->selectrow_arrayref($sql)->[1];
};
die "Failed to checksum table: $EVAL_ERROR"
if $EVAL_ERROR;
$sql = "DROP TABLE IF EXISTS $tmp_tbl";
PTDEBUG && _d($sql);
eval {
$dbh->do($sql);
};
# This isn't critical; we don't need to die.
PTDEBUG && $EVAL_ERROR && _d('Error:', $EVAL_ERROR);
}
else {
PTDEBUG && _d("Event doesn't have wrapped query or tmp tbl");
}
$event->{row_count} = $n_rows;
$event->{checksum} = $tbl_checksum;
PTDEBUG && _d('row count:', $n_rows, 'checksum:', $tbl_checksum);
return $event;
}
sub _compare_rows {
my ( $self, %args ) = @_;
my @required_args = qw(events hosts);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($events, $hosts) = @args{@required_args};
my $different_row_counts = 0;
my $different_column_counts = 0; # TODO
my $different_column_types = 0; # TODO
my $different_column_values = 0;
my $n_events = scalar @$events;
my $event0 = $events->[0];
my $item = $event0->{fingerprint} || $event0->{arg};
my $sampleno = $event0->{sampleno} || 0;
my $dbh = $hosts->[0]->{dbh}; # doesn't matter which one
if ( !$event0->{results_sth} ) {
# This will happen if execute() or something fails.
PTDEBUG && _d("Event 0 doesn't have a results sth");
return (
different_row_counts => $different_row_counts,
different_column_values => $different_column_values,
different_column_counts => $different_column_counts,
different_column_types => $different_column_types,
);
}
my $res_struct = MockSyncStream::get_result_set_struct($dbh,
$event0->{results_sth});
PTDEBUG && _d('Result set struct:', Dumper($res_struct));
# Use a mock sth so we don't have to re-execute event0 sth to compare
# it to the 3rd and subsequent events.
my @event0_rows = @{ $event0->{results_sth}->fetchall_arrayref({}) };
$event0->{row_count} = scalar @event0_rows;
my $left = new MockSth(@event0_rows);
$left->{NAME} = [ @{$event0->{results_sth}->{NAME}} ];
EVENT:
foreach my $i ( 1..($n_events-1) ) {
my $event = $events->[$i];
my $right = $event->{results_sth};
$event->{row_count} = 0;
if ( !$right ) {
PTDEBUG && _d('No results sth on host', $i);
delete $event->{results_sth};
next EVENT;
}
# Identical rows are ignored. Once a difference on either side is found,
# we gobble the remaining rows in that sth and print them to an outfile.
# This short circuits RowDiff::compare_sets() which is what we want to do.
my $no_diff = 1; # results are identical; this catches 0 row results
my $outfile = new Outfile();
my ($left_outfile, $right_outfile, $n_rows);
my $same_row = sub {
$event->{row_count}++; # Keep track of this event's row_count.
return;
};
my $not_in_left = sub {
my ( $rr ) = @_;
$no_diff = 0;
# $n_rows will be added later to this event's row_count.
($right_outfile, $n_rows) = $self->write_to_outfile(
side => 'right',
sth => $right,
row => $rr,
Outfile => $outfile,
);
return;
};
my $not_in_right = sub {
my ( $lr ) = @_;
$no_diff = 0;
# left is event0 so we don't need $n_rows back.
($left_outfile, undef) = $self->write_to_outfile(
side => 'left',
sth => $left,
row => $lr,
Outfile => $outfile,
);
return;
};
my $rd = new RowDiff(dbh => $dbh);
my $mocksync = new MockSyncStream(
query => $event0->{arg},
cols => $res_struct->{cols},
same_row => $same_row,
not_in_left => $not_in_left,
not_in_right => $not_in_right,
);
PTDEBUG && _d('Comparing result sets with MockSyncStream');
$rd->compare_sets(
left_sth => $left,
right_sth => $right,
syncer => $mocksync,
tbl_struct => $res_struct,
);
# Add number of rows written to outfile to this event's row_count.
# $n_rows will be undef if there were no differences; row_count will
# still be correct in this case because we kept track of it in $same_row.
$event->{row_count} += $n_rows || 0;
PTDEBUG && _d('Left has', $event0->{row_count}, 'rows, right has',
$event->{row_count});
# Save differences.
$different_row_counts++ if $event0->{row_count} != $event->{row_count};
if ( $different_row_counts ) {
$self->{diffs}->{row_counts}->{$item}->{$sampleno}
= [ $event0->{row_count}, $event->{row_count} ];
$self->{samples}->{$item}->{$sampleno} = $event0->{arg};
}
$left->reset();
if ( $no_diff ) {
delete $event->{results_sth};
next EVENT;
}
# The result sets differ, so now we must begin the difficult
# work: finding and determining the nature of those differences.
PTDEBUG && _d('Result sets are different');
# Make sure both outfiles are created, else diff_rows() will die.
if ( !$left_outfile ) {
PTDEBUG && _d('Right has extra rows not in left');
(undef, $left_outfile) = $self->open_outfile(side => 'left');
}
if ( !$right_outfile ) {
PTDEBUG && _d('Left has extra rows not in right');
(undef, $right_outfile) = $self->open_outfile(side => 'right');
}
my @diff_rows = $self->diff_rows(
%args, # for options like max-different-rows
left_dbh => $hosts->[0]->{dbh},
left_outfile => $left_outfile,
right_dbh => $hosts->[$i]->{dbh},
right_outfile => $right_outfile,
res_struct => $res_struct,
query => $event0->{arg},
db => $args{'temp-database'} || $event0->{db},
);
# Save differences.
if ( scalar @diff_rows ) {
$different_column_values++;
$self->{diffs}->{col_vals}->{$item}->{$sampleno} = \@diff_rows;
$self->{samples}->{$item}->{$sampleno} = $event0->{arg};
}
delete $event->{results_sth};
}
delete $event0->{results_sth};
return (
different_row_counts => $different_row_counts,
different_column_values => $different_column_values,
different_column_counts => $different_column_counts,
different_column_types => $different_column_types,
);
}
# Required args:
# * left_dbh scalar: active dbh for left
# * left_outfile scalar: outfile name for left
# * right_dbh scalar: active dbh for right
# * right_outfile scalar: outfile name for right
# * res_struct hashref: result set structure
# * db scalar: database to use for creating temp tables
# * query scalar: query, parsed for indexes
# Optional args:
# * add-indexes scalar: add indexes from source tables to tmp tbl
# * max-different-rows scalar: stop after this many differences are found
# * float-precision scalar: round float, double, decimal types to N places
# Returns: scalar
# Can die: no
# diff_rows() loads and compares two result sets and returns the number of
# differences between them. This includes missing rows and row data
# differences.
sub diff_rows {
my ( $self, %args ) = @_;
my @required_args = qw(left_dbh left_outfile right_dbh right_outfile
res_struct db query);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($left_dbh, $left_outfile, $right_dbh, $right_outfile, $res_struct,
$db, $query)
= @args{@required_args};
# Switch to the given db. This may be different from the event's
# db if, for example, --temp-database was specified.
my $orig_left_db = $self->_use_db($left_dbh, $db);
my $orig_right_db = $self->_use_db($right_dbh, $db);
# First thing, make two temps tables into which the outfiles can
# be loaded. This requires that we make a CREATE TABLE statement
# for the result sets' columns.
my $left_tbl = "`$db`.`mk_upgrade_left`";
my $right_tbl = "`$db`.`mk_upgrade_right`";
my $table_ddl = $self->make_table_ddl($res_struct);
$left_dbh->do("DROP TABLE IF EXISTS $left_tbl");
$left_dbh->do("CREATE TABLE $left_tbl $table_ddl");
$left_dbh->do("LOAD DATA INFILE '$left_outfile' "
. "INTO TABLE $left_tbl");
$right_dbh->do("DROP TABLE IF EXISTS $right_tbl");
$right_dbh->do("CREATE TABLE $right_tbl $table_ddl");
$right_dbh->do("LOAD DATA INFILE '$right_outfile' "
. "INTO TABLE $right_tbl");
PTDEBUG && _d('Loaded', $left_outfile, 'into table', $left_tbl, 'and',
$right_outfile, 'into table', $right_tbl);
# Now we need to get all indexes from all tables used by the query
# and add them to the temp tbl. Some indexes may be invalid, dupes,
# or generally useless, but we'll let the sync algo decide that later.
if ( $args{'add-indexes'} ) {
$self->add_indexes(
%args,
dsts => [
{ dbh => $left_dbh, tbl => $left_tbl },
{ dbh => $right_dbh, tbl => $right_tbl },
],
);
}
# Create a RowDiff with callbacks that will do what we want when rows and
# columns differ. This RowDiff is passed to TableSyncer which calls it.
# TODO: explain how these callbacks work together.
my $max_diff = $args{'max-different-rows'} || 1_000; # 1k=sanity/safety
my $n_diff = 0;
my @missing_rows; # not currently saved; row counts show missing rows
my @different_rows;
use constant LEFT => 0;
use constant RIGHT => 1;
my @l_r = (undef, undef);
my @last_diff_col;
my $last_diff = 0;
my $key_cmp = sub {
push @last_diff_col, [@_];
$last_diff--;
return;
};
my $same_row = sub {
my ( %args ) = @_;
my ($lr, $rr) = @args{qw(lr rr)};
if ( $l_r[LEFT] && $l_r[RIGHT] ) {
PTDEBUG && _d('Saving different row');
push @different_rows, $last_diff_col[$last_diff];
$n_diff++;
}
elsif ( $l_r[LEFT] ) {
PTDEBUG && _d('Saving not in right row');
# push @missing_rows, [$l_r[LEFT], undef];
$n_diff++;
}
elsif ( $l_r[RIGHT] ) {
PTDEBUG && _d('Saving not in left row');
# push @missing_rows, [undef, $l_r[RIGHT]];
$n_diff++;
}
else {
PTDEBUG && _d('No missing or different rows in queue');
}
@l_r = (undef, undef);
@last_diff_col = ();
$last_diff = 0;
return;
};
my $not_in_left = sub {
my ( %args ) = @_;
my ($lr, $rr) = @args{qw(lr rr)};
$same_row->() if $l_r[RIGHT]; # last missing row
$l_r[RIGHT] = $rr;
$same_row->(@l_r) if $l_r[LEFT] && $l_r[RIGHT];
return;
};
my $not_in_right = sub {
my ( %args ) = @_;
my ($lr, $rr) = @args{qw(lr rr)};
$same_row->() if $l_r[LEFT]; # last missing row
$l_r[LEFT] = $lr;
$same_row->(@l_r) if $l_r[LEFT] && $l_r[RIGHT];
return;
};
my $done = sub {
my ( %args ) = @_;
my ($left, $right) = @args{qw(left_sth right_sth)};
PTDEBUG && _d('Found', $n_diff, 'of', $max_diff, 'max differences');
if ( $n_diff >= $max_diff ) {
PTDEBUG && _d('Done comparing rows, got --max-differences', $max_diff);
$left->finish();
$right->finish();
return 1;
}
return 0;
};
my $trf;
if ( my $n = $args{'float-precision'} ) {
$trf = sub {
my ( $l, $r, $tbl, $col ) = @_;
return $l, $r
unless $tbl->{type_for}->{$col} =~ m/(?:float|double|decimal)/;
my $l_rounded = sprintf "%.${n}f", $l;
my $r_rounded = sprintf "%.${n}f", $r;
PTDEBUG && _d('Rounded', $l, 'to', $l_rounded,
'and', $r, 'to', $r_rounded);
return $l_rounded, $r_rounded;
};
};
my $rd = new RowDiff(
dbh => $left_dbh,
key_cmp => $key_cmp,
same_row => $same_row,
not_in_left => $not_in_left,
not_in_right => $not_in_right,
done => $done,
trf => $trf,
);
my $ch = new ChangeHandler(
left_db => $db,
left_tbl => 'mk_upgrade_left',
right_db => $db,
right_tbl => 'mk_upgrade_right',
tbl_struct => $res_struct,
queue => 0,
replace => 0,
actions => [],
Quoter => $self->{Quoter},
);
# With whatever index we may have, let TableSyncer choose an
# algorithm and find were rows differ. We don't actually sync
# the tables (execute=>0). Instead, the callbacks above will
# save rows in @missing_rows and @different_rows.
$self->{TableSyncer}->sync_table(
plugins => $self->{plugins},
src => {
dbh => $left_dbh,
db => $db,
tbl => 'mk_upgrade_left',
},
dst => {
dbh => $right_dbh,
db => $db,
tbl => 'mk_upgrade_right',
},
tbl_struct => $res_struct,
cols => $res_struct->{cols},
chunk_size => 1_000,
RowDiff => $rd,
ChangeHandler => $ch,
);
if ( $n_diff < $max_diff ) {
$same_row->() if $l_r[LEFT] || $l_r[RIGHT]; # save remaining rows
}
# Switch back to the original dbs.
$self->_use_db($left_dbh, $orig_left_db);
$self->_use_db($right_dbh, $orig_right_db);
return @different_rows;
}
# Writes the current row and all remaining rows to an outfile.
# Returns the outfile's name.
sub write_to_outfile {
my ( $self, %args ) = @_;
my @required_args = qw(side row sth Outfile);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ( $side, $row, $sth, $outfile ) = @args{@required_args};
my ( $fh, $file ) = $self->open_outfile(%args);
# Write this one row.
$outfile->write($fh, [ MockSyncStream::as_arrayref($sth, $row) ]);
# Get and write all remaining rows.
my $remaining_rows = $sth->fetchall_arrayref();
$outfile->write($fh, $remaining_rows);
my $n_rows = 1 + @$remaining_rows;
PTDEBUG && _d('Wrote', $n_rows, 'rows');
close $fh or warn "Cannot close $file: $OS_ERROR";
return $file, $n_rows;
}
sub open_outfile {
my ( $self, %args ) = @_;
my $outfile = $self->{'base-dir'} . "/$args{side}-outfile.txt";
open my $fh, '>', $outfile or die "Cannot open $outfile: $OS_ERROR";
PTDEBUG && _d('Opened outfile', $outfile);
return $fh, $outfile;
}
# Returns just the column definitions for the given struct.
# Example:
# (
# `i` integer,
# `f` float(10,8)
# )
sub make_table_ddl {
my ( $self, $struct ) = @_;
my $sql = "(\n"
. (join("\n",
map {
my $name = $_;
my $type = $struct->{type_for}->{$_};
my $size = $struct->{size}->{$_} || '';
" `$name` $type$size,";
} @{$struct->{cols}}))
. ')';
# The last column will be like "`i` integer,)" which is invalid.
$sql =~ s/,\)$/\n)/;
PTDEBUG && _d('Table ddl:', $sql);
return $sql;
}
# Adds every index from every table used by the query to all the
# dest tables. dest is an arrayref of hashes, one for each destination.
# Each hash needs a dbh and tbl key; e.g.:
# [
# {
# dbh => $dbh,
# tbl => 'db.tbl',
# },
# ],
# For the moment, the sub returns nothing. In the future, it should
# add to $args{struct}->{keys} the keys that it was able to add.
sub add_indexes {
my ( $self, %args ) = @_;
my @required_args = qw(query dsts db);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($query, $dsts) = @args{@required_args};
my $qp = $self->{QueryParser};
my $tp = $self->{TableParser};
my $q = $self->{Quoter};
my @src_tbls = $qp->get_tables($query);
my @keys;
foreach my $db_tbl ( @src_tbls ) {
my ($db, $tbl) = $q->split_unquote($db_tbl, $args{db});
if ( $db ) {
my $tbl_struct;
eval {
$tbl_struct = $tp->parse(
$tp->get_create_table(
dbh => $dsts->[0]->{dbh},
db => $db,
tbl => $tbl,
));
};
if ( $EVAL_ERROR ) {
PTDEBUG && _d('Error parsing', $db, '.', $tbl, ':', $EVAL_ERROR);
next;
}
push @keys, map {
my $def = ($_->{is_unique} ? 'UNIQUE ' : '')
. "KEY ($_->{colnames})";
[$def, $_];
} grep { $_->{type} eq 'BTREE' } values %{$tbl_struct->{keys}};
}
else {
PTDEBUG && _d('Cannot get indexes from', $db_tbl, 'because its '
. 'database is unknown');
}
}
PTDEBUG && _d('Source keys:', Dumper(\@keys));
return unless @keys;
for my $dst ( @$dsts ) {
foreach my $key ( @keys ) {
my $def = $key->[0];
my $sql = "ALTER TABLE $dst->{tbl} ADD $key->[0]";
PTDEBUG && _d($sql);
eval {
$dst->{dbh}->do($sql);
};
if ( $EVAL_ERROR ) {
PTDEBUG && _d($EVAL_ERROR);
}
else {
# TODO: $args{res_struct}->{keys}->{$key->[1]->{name}} = $key->[1];
}
}
}
# If the query uses only 1 table then return its struct.
# TODO: $args{struct} = $struct if @src_tbls == 1;
return;
}
sub report {
my ( $self, %args ) = @_;
my @required_args = qw(hosts);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($hosts) = @args{@required_args};
return unless keys %{$self->{diffs}};
# These columns are common to all the reports; make them just once.
my $query_id_col = {
name => 'Query ID',
};
my $hostno = 0;
my @host_cols = map {
$hostno++;
my $col = { name => "host$hostno" };
$col;
} @$hosts;
my @reports;
foreach my $diff ( qw(checksums col_vals row_counts) ) {
my $report = "_report_diff_$diff";
push @reports, $self->$report(
query_id_col => $query_id_col,
host_cols => \@host_cols,
%args
);
}
return join("\n", @reports);
}
sub _report_diff_checksums {
my ( $self, %args ) = @_;
my @required_args = qw(query_id_col host_cols);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $get_id = $self->{get_id};
return unless keys %{$self->{diffs}->{checksums}};
my $report = new ReportFormatter();
$report->title('Checksum differences');
$report->set_columns(
$args{query_id_col},
@{$args{host_cols}},
);
my $diff_checksums = $self->{diffs}->{checksums};
foreach my $item ( sort keys %$diff_checksums ) {
map {
$report->add_line(
$get_id->($item) . '-' . $_,
@{$diff_checksums->{$item}->{$_}},
);
} sort { $a <=> $b } keys %{$diff_checksums->{$item}};
}
return $report->get_report();
}
sub _report_diff_col_vals {
my ( $self, %args ) = @_;
my @required_args = qw(query_id_col host_cols);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $get_id = $self->{get_id};
return unless keys %{$self->{diffs}->{col_vals}};
my $report = new ReportFormatter();
$report->title('Column value differences');
$report->set_columns(
$args{query_id_col},
{
name => 'Column'
},
@{$args{host_cols}},
);
my $diff_col_vals = $self->{diffs}->{col_vals};
foreach my $item ( sort keys %$diff_col_vals ) {
foreach my $sampleno (sort {$a <=> $b} keys %{$diff_col_vals->{$item}}) {
map {
$report->add_line(
$get_id->($item) . '-' . $sampleno,
@$_,
);
} @{$diff_col_vals->{$item}->{$sampleno}};
}
}
return $report->get_report();
}
sub _report_diff_row_counts {
my ( $self, %args ) = @_;
my @required_args = qw(query_id_col hosts);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my $get_id = $self->{get_id};
return unless keys %{$self->{diffs}->{row_counts}};
my $report = new ReportFormatter();
$report->title('Row count differences');
my $hostno = 0;
$report->set_columns(
$args{query_id_col},
(map {
$hostno++;
my $col = { name => "host$hostno", right_justify => 1 };
$col;
} @{$args{hosts}}),
);
my $diff_row_counts = $self->{diffs}->{row_counts};
foreach my $item ( sort keys %$diff_row_counts ) {
map {
$report->add_line(
$get_id->($item) . '-' . $_,
@{$diff_row_counts->{$item}->{$_}},
);
} sort { $a <=> $b } keys %{$diff_row_counts->{$item}};
}
return $report->get_report();
}
sub samples {
my ( $self, $item ) = @_;
return unless $item;
my @samples;
foreach my $sampleno ( keys %{$self->{samples}->{$item}} ) {
push @samples, $sampleno, $self->{samples}->{$item}->{$sampleno};
}
return @samples;
}
sub reset {
my ( $self ) = @_;
$self->{diffs} = {};
$self->{samples} = {};
return;
}
# USE $new_db, return current db before the switch.
sub _use_db {
my ( $self, $dbh, $new_db ) = @_;
return unless $new_db;
my $sql = 'SELECT DATABASE()';
PTDEBUG && _d($sql);
my $curr = $dbh->selectrow_array($sql);
if ( $curr && $new_db && $curr eq $new_db ) {
PTDEBUG && _d('Current and new DB are the same');
return $curr;
}
$sql = "USE `$new_db`";
PTDEBUG && _d($sql);
$dbh->do($sql);
return $curr;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End CompareResults package
# ###########################################################################