mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-01 18:25:59 +00:00

* Remove trailing spaces * PR-665 - Remove trailing spaces - Updated not stable test t/pt-online-schema-change/preserve_triggers.t - Updated utilities in bin directory * PR-665 - Remove trailing spaces - Fixed typos * PR-665 - Remove trailing spaces - Fixed typos --------- Co-authored-by: Sveta Smirnova <sveta.smirnova@percona.com>
1015 lines
32 KiB
Perl
1015 lines
32 KiB
Perl
# This program is copyright 2009-2011 Percona Ireland Ltd.
|
|
# Feedback and improvements are welcome.
|
|
#
|
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
|
# licenses.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
# ###########################################################################
|
|
# CompareResults package
|
|
# ###########################################################################
|
|
{
|
|
# Package: CompareResults
|
|
# CompareResults compares the query result sets.
|
|
package CompareResults;
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use English qw(-no_match_vars);
|
|
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
|
|
|
use Time::HiRes qw(time);
|
|
use Data::Dumper;
|
|
$Data::Dumper::Indent = 1;
|
|
$Data::Dumper::Sortkeys = 1;
|
|
$Data::Dumper::Quotekeys = 0;
|
|
|
|
# Required args:
|
|
# * method scalar: "checksum" or "rows"
|
|
# * base-dir scalar: dir used by rows method to write outfiles
|
|
# * plugins arrayref: TableSync* plugins used by rows method
|
|
# * get_id coderef: used by report() to trf query to its ID
|
|
# * common modules
|
|
sub new {
|
|
my ( $class, %args ) = @_;
|
|
my @required_args = qw(method base-dir plugins get_id
|
|
QueryParser TableParser TableSyncer Quoter);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my $self = {
|
|
%args,
|
|
diffs => {},
|
|
samples => {},
|
|
};
|
|
return bless $self, $class;
|
|
}
|
|
|
|
# Required args:
|
|
# * event hashref: an event
|
|
# * dbh scalar: active dbh
|
|
# Optional args:
|
|
# * db scalar: database name to create temp table in unless...
|
|
# * temp-database scalar: ...temp db name is given
|
|
# * temp-table scalar: temp table name
|
|
# Returns: hashref
|
|
# Can die: yes
|
|
# before_execute() drops the temp table if the method is checksum.
|
|
# db and temp-table are required for the checksum method, but optional
|
|
# for the rows method.
|
|
sub before_execute {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(event dbh);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($event, $dbh) = @args{@required_args};
|
|
my $sql;
|
|
|
|
if ( $self->{method} eq 'checksum' ) {
|
|
my ($db, $tmp_tbl) = @args{qw(db temp-table)};
|
|
$db = $args{'temp-database'} if $args{'temp-database'};
|
|
die "Cannot checksum results without a database"
|
|
unless $db;
|
|
|
|
$tmp_tbl = $self->{Quoter}->quote($db, $tmp_tbl);
|
|
eval {
|
|
$sql = "DROP TABLE IF EXISTS $tmp_tbl";
|
|
PTDEBUG && _d($sql);
|
|
$dbh->do($sql);
|
|
|
|
# Deprecated since MySQL 5.7
|
|
#$sql = "SET storage_engine=MyISAM";
|
|
PTDEBUG && _d($sql);
|
|
$dbh->do($sql);
|
|
};
|
|
die "Failed to drop temporary table $tmp_tbl: $EVAL_ERROR"
|
|
if $EVAL_ERROR;
|
|
|
|
# Save the tmp tbl; it's used later in _compare_checksums().
|
|
$event->{tmp_tbl} = $tmp_tbl;
|
|
|
|
# Wrap the original query so when it's executed its results get
|
|
# put in tmp table.
|
|
$event->{wrapped_query}
|
|
= "CREATE TEMPORARY TABLE $tmp_tbl AS $event->{arg}";
|
|
PTDEBUG && _d('Wrapped query:', $event->{wrapped_query});
|
|
}
|
|
|
|
return $event;
|
|
}
|
|
|
|
# Required args:
|
|
# * event hashref: an event
|
|
# * dbh scalar: active dbh
|
|
# Returns: hashref
|
|
# Can die: yes
|
|
# execute() executes the event's query. Any prep work should have
|
|
# been done in before_execute(). For the checksum method, this simply
|
|
# executes the wrapped query. For the rows method, this gets/saves
|
|
# a statement handle for the results in the event which is processed
|
|
# later in compare(). Both methods add the Query_time attrib to the
|
|
# event.
|
|
sub execute {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(event dbh);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($event, $dbh) = @args{@required_args};
|
|
my ( $start, $end, $query_time );
|
|
|
|
# Other modules should only execute the query if Query_time does not
|
|
# already exist. This module requires special execution so we always
|
|
# execute.
|
|
|
|
PTDEBUG && _d('Executing query');
|
|
$event->{Query_time} = 0;
|
|
if ( $self->{method} eq 'rows' ) {
|
|
my $query = $event->{arg};
|
|
my $sth;
|
|
eval {
|
|
$sth = $dbh->prepare($query);
|
|
};
|
|
die "Failed to prepare query: $EVAL_ERROR" if $EVAL_ERROR;
|
|
|
|
eval {
|
|
$start = time();
|
|
$sth->execute();
|
|
$end = time();
|
|
$query_time = sprintf '%.6f', $end - $start;
|
|
};
|
|
die "Failed to execute query: $EVAL_ERROR" if $EVAL_ERROR;
|
|
|
|
$event->{results_sth} = $sth;
|
|
}
|
|
else {
|
|
die "No wrapped query" unless $event->{wrapped_query};
|
|
my $query = $event->{wrapped_query};
|
|
eval {
|
|
$start = time();
|
|
$dbh->do($query);
|
|
$end = time();
|
|
$query_time = sprintf '%.6f', $end - $start;
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
delete $event->{wrapped_query};
|
|
delete $event->{tmp_tbl};
|
|
die "Failed to execute query: $EVAL_ERROR";
|
|
}
|
|
}
|
|
|
|
$event->{Query_time} = $query_time;
|
|
|
|
return $event;
|
|
}
|
|
|
|
# Required args:
|
|
# * event hashref: an event
|
|
# Optional args:
|
|
# * dbh scalar: active dbh
|
|
# Returns: hashref
|
|
# Can die: yes
|
|
# after_execute() does any post-execution cleanup. The results should
|
|
# not be compared here; no anaylytics here, save that for compare().
|
|
sub after_execute {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(event);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
return $args{event};
|
|
}
|
|
|
|
# Required args:
|
|
# * events arrayref: events
|
|
# * hosts arrayref: hosts hashrefs with at least a dbh key
|
|
# Optional args:
|
|
# * temp-database scalar: temp db name
|
|
# Returns: array
|
|
# Can die: yes
|
|
# compare() compares events that have been run through before_execute(),
|
|
# execute() and after_execute(). The checksum method primarily compares
|
|
# the checksum attribs saved in the events. The rows method uses the
|
|
# result statement handles saved in the events to compare rows and column
|
|
# values. Each method returns an array of key => value pairs which the
|
|
# caller should aggregate into a meta-event that represents differences
|
|
# compare() has found in these events. Only a "summary" of differences is
|
|
# returned. Specific differences are saved internally and are reported
|
|
# by calling report() later.
|
|
sub compare {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(events hosts);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($events, $hosts) = @args{@required_args};
|
|
return $self->{method} eq 'rows' ? $self->_compare_rows(%args)
|
|
: $self->_compare_checksums(%args);
|
|
}
|
|
|
|
sub _compare_checksums {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(events hosts);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($events, $hosts) = @args{@required_args};
|
|
|
|
my $different_row_counts = 0;
|
|
my $different_column_counts = 0; # TODO
|
|
my $different_column_types = 0; # TODO
|
|
my $different_checksums = 0;
|
|
|
|
my $n_events = scalar @$events;
|
|
foreach my $i ( 0..($n_events-1) ) {
|
|
$events->[$i] = $self->_checksum_results(
|
|
event => $events->[$i],
|
|
dbh => $hosts->[$i]->{dbh},
|
|
);
|
|
if ( $i ) {
|
|
if ( ($events->[0]->{checksum} || 0)
|
|
!= ($events->[$i]->{checksum}||0) ) {
|
|
$different_checksums++;
|
|
}
|
|
if ( ($events->[0]->{row_count} || 0)
|
|
!= ($events->[$i]->{row_count} || 0) ) {
|
|
$different_row_counts++
|
|
}
|
|
|
|
delete $events->[$i]->{wrapped_query};
|
|
}
|
|
}
|
|
delete $events->[0]->{wrapped_query};
|
|
|
|
# Save differences.
|
|
my $item = $events->[0]->{fingerprint} || $events->[0]->{arg};
|
|
my $sampleno = $events->[0]->{sampleno} || 0;
|
|
if ( $different_checksums ) {
|
|
$self->{diffs}->{checksums}->{$item}->{$sampleno}
|
|
= [ map { $_->{checksum} } @$events ];
|
|
$self->{samples}->{$item}->{$sampleno} = $events->[0]->{arg};
|
|
}
|
|
if ( $different_row_counts ) {
|
|
$self->{diffs}->{row_counts}->{$item}->{$sampleno}
|
|
= [ map { $_->{row_count} } @$events ];
|
|
$self->{samples}->{$item}->{$sampleno} = $events->[0]->{arg};
|
|
}
|
|
|
|
return (
|
|
different_row_counts => $different_row_counts,
|
|
different_checksums => $different_checksums,
|
|
different_column_counts => $different_column_counts,
|
|
different_column_types => $different_column_types,
|
|
);
|
|
}
|
|
|
|
sub _checksum_results {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(event dbh);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($event, $dbh) = @args{@required_args};
|
|
|
|
my $sql;
|
|
my $n_rows = 0;
|
|
my $tbl_checksum = 0;
|
|
if ( $event->{wrapped_query} && $event->{tmp_tbl} ) {
|
|
my $tmp_tbl = $event->{tmp_tbl};
|
|
eval {
|
|
$sql = "SELECT COUNT(*) FROM $tmp_tbl";
|
|
PTDEBUG && _d($sql);
|
|
($n_rows) = @{ $dbh->selectcol_arrayref($sql) };
|
|
|
|
$sql = "CHECKSUM TABLE $tmp_tbl";
|
|
PTDEBUG && _d($sql);
|
|
$tbl_checksum = $dbh->selectrow_arrayref($sql)->[1];
|
|
};
|
|
die "Failed to checksum table: $EVAL_ERROR"
|
|
if $EVAL_ERROR;
|
|
|
|
$sql = "DROP TABLE IF EXISTS $tmp_tbl";
|
|
PTDEBUG && _d($sql);
|
|
eval {
|
|
$dbh->do($sql);
|
|
};
|
|
# This isn't critical; we don't need to die.
|
|
PTDEBUG && $EVAL_ERROR && _d('Error:', $EVAL_ERROR);
|
|
}
|
|
else {
|
|
PTDEBUG && _d("Event doesn't have wrapped query or tmp tbl");
|
|
}
|
|
|
|
$event->{row_count} = $n_rows;
|
|
$event->{checksum} = $tbl_checksum;
|
|
PTDEBUG && _d('row count:', $n_rows, 'checksum:', $tbl_checksum);
|
|
|
|
return $event;
|
|
}
|
|
|
|
sub _compare_rows {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(events hosts);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($events, $hosts) = @args{@required_args};
|
|
|
|
my $different_row_counts = 0;
|
|
my $different_column_counts = 0; # TODO
|
|
my $different_column_types = 0; # TODO
|
|
my $different_column_values = 0;
|
|
|
|
my $n_events = scalar @$events;
|
|
my $event0 = $events->[0];
|
|
my $item = $event0->{fingerprint} || $event0->{arg};
|
|
my $sampleno = $event0->{sampleno} || 0;
|
|
my $dbh = $hosts->[0]->{dbh}; # doesn't matter which one
|
|
|
|
if ( !$event0->{results_sth} ) {
|
|
# This will happen if execute() or something fails.
|
|
PTDEBUG && _d("Event 0 doesn't have a results sth");
|
|
return (
|
|
different_row_counts => $different_row_counts,
|
|
different_column_values => $different_column_values,
|
|
different_column_counts => $different_column_counts,
|
|
different_column_types => $different_column_types,
|
|
);
|
|
}
|
|
|
|
my $res_struct = MockSyncStream::get_result_set_struct($dbh,
|
|
$event0->{results_sth});
|
|
PTDEBUG && _d('Result set struct:', Dumper($res_struct));
|
|
|
|
# Use a mock sth so we don't have to re-execute event0 sth to compare
|
|
# it to the 3rd and subsequent events.
|
|
my @event0_rows = @{ $event0->{results_sth}->fetchall_arrayref({}) };
|
|
$event0->{row_count} = scalar @event0_rows;
|
|
my $left = new MockSth(@event0_rows);
|
|
$left->{NAME} = [ @{$event0->{results_sth}->{NAME}} ];
|
|
|
|
EVENT:
|
|
foreach my $i ( 1..($n_events-1) ) {
|
|
my $event = $events->[$i];
|
|
my $right = $event->{results_sth};
|
|
|
|
$event->{row_count} = 0;
|
|
|
|
if ( !$right ) {
|
|
PTDEBUG && _d('No results sth on host', $i);
|
|
delete $event->{results_sth};
|
|
next EVENT;
|
|
}
|
|
|
|
# Identical rows are ignored. Once a difference on either side is found,
|
|
# we gobble the remaining rows in that sth and print them to an outfile.
|
|
# This short circuits RowDiff::compare_sets() which is what we want to do.
|
|
my $no_diff = 1; # results are identical; this catches 0 row results
|
|
my $outfile = new Outfile();
|
|
my ($left_outfile, $right_outfile, $n_rows);
|
|
my $same_row = sub {
|
|
$event->{row_count}++; # Keep track of this event's row_count.
|
|
return;
|
|
};
|
|
my $not_in_left = sub {
|
|
my ( $rr ) = @_;
|
|
$no_diff = 0;
|
|
# $n_rows will be added later to this event's row_count.
|
|
($right_outfile, $n_rows) = $self->write_to_outfile(
|
|
side => 'right',
|
|
sth => $right,
|
|
row => $rr,
|
|
Outfile => $outfile,
|
|
);
|
|
return;
|
|
};
|
|
my $not_in_right = sub {
|
|
my ( $lr ) = @_;
|
|
$no_diff = 0;
|
|
# left is event0 so we don't need $n_rows back.
|
|
($left_outfile, undef) = $self->write_to_outfile(
|
|
side => 'left',
|
|
sth => $left,
|
|
row => $lr,
|
|
Outfile => $outfile,
|
|
);
|
|
return;
|
|
};
|
|
|
|
my $rd = new RowDiff(dbh => $dbh);
|
|
my $mocksync = new MockSyncStream(
|
|
query => $event0->{arg},
|
|
cols => $res_struct->{cols},
|
|
same_row => $same_row,
|
|
not_in_left => $not_in_left,
|
|
not_in_right => $not_in_right,
|
|
);
|
|
|
|
PTDEBUG && _d('Comparing result sets with MockSyncStream');
|
|
$rd->compare_sets(
|
|
left_sth => $left,
|
|
right_sth => $right,
|
|
syncer => $mocksync,
|
|
tbl_struct => $res_struct,
|
|
);
|
|
|
|
# Add number of rows written to outfile to this event's row_count.
|
|
# $n_rows will be undef if there were no differences; row_count will
|
|
# still be correct in this case because we kept track of it in $same_row.
|
|
$event->{row_count} += $n_rows || 0;
|
|
|
|
PTDEBUG && _d('Left has', $event0->{row_count}, 'rows, right has',
|
|
$event->{row_count});
|
|
|
|
# Save differences.
|
|
$different_row_counts++ if $event0->{row_count} != $event->{row_count};
|
|
if ( $different_row_counts ) {
|
|
$self->{diffs}->{row_counts}->{$item}->{$sampleno}
|
|
= [ $event0->{row_count}, $event->{row_count} ];
|
|
$self->{samples}->{$item}->{$sampleno} = $event0->{arg};
|
|
}
|
|
|
|
$left->reset();
|
|
if ( $no_diff ) {
|
|
delete $event->{results_sth};
|
|
next EVENT;
|
|
}
|
|
|
|
# The result sets differ, so now we must begin the difficult
|
|
# work: finding and determining the nature of those differences.
|
|
PTDEBUG && _d('Result sets are different');
|
|
|
|
|
|
# Make sure both outfiles are created, else diff_rows() will die.
|
|
if ( !$left_outfile ) {
|
|
PTDEBUG && _d('Right has extra rows not in left');
|
|
(undef, $left_outfile) = $self->open_outfile(side => 'left');
|
|
}
|
|
if ( !$right_outfile ) {
|
|
PTDEBUG && _d('Left has extra rows not in right');
|
|
(undef, $right_outfile) = $self->open_outfile(side => 'right');
|
|
}
|
|
|
|
my @diff_rows = $self->diff_rows(
|
|
%args, # for options like max-different-rows
|
|
left_dbh => $hosts->[0]->{dbh},
|
|
left_outfile => $left_outfile,
|
|
right_dbh => $hosts->[$i]->{dbh},
|
|
right_outfile => $right_outfile,
|
|
res_struct => $res_struct,
|
|
query => $event0->{arg},
|
|
db => $args{'temp-database'} || $event0->{db},
|
|
);
|
|
|
|
# Save differences.
|
|
if ( scalar @diff_rows ) {
|
|
$different_column_values++;
|
|
$self->{diffs}->{col_vals}->{$item}->{$sampleno} = \@diff_rows;
|
|
$self->{samples}->{$item}->{$sampleno} = $event0->{arg};
|
|
}
|
|
|
|
delete $event->{results_sth};
|
|
}
|
|
delete $event0->{results_sth};
|
|
|
|
return (
|
|
different_row_counts => $different_row_counts,
|
|
different_column_values => $different_column_values,
|
|
different_column_counts => $different_column_counts,
|
|
different_column_types => $different_column_types,
|
|
);
|
|
}
|
|
|
|
# Required args:
|
|
# * left_dbh scalar: active dbh for left
|
|
# * left_outfile scalar: outfile name for left
|
|
# * right_dbh scalar: active dbh for right
|
|
# * right_outfile scalar: outfile name for right
|
|
# * res_struct hashref: result set structure
|
|
# * db scalar: database to use for creating temp tables
|
|
# * query scalar: query, parsed for indexes
|
|
# Optional args:
|
|
# * add-indexes scalar: add indexes from source tables to tmp tbl
|
|
# * max-different-rows scalar: stop after this many differences are found
|
|
# * float-precision scalar: round float, double, decimal types to N places
|
|
# Returns: scalar
|
|
# Can die: no
|
|
# diff_rows() loads and compares two result sets and returns the number of
|
|
# differences between them. This includes missing rows and row data
|
|
# differences.
|
|
sub diff_rows {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(left_dbh left_outfile right_dbh right_outfile
|
|
res_struct db query);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($left_dbh, $left_outfile, $right_dbh, $right_outfile, $res_struct,
|
|
$db, $query)
|
|
= @args{@required_args};
|
|
|
|
# Switch to the given db. This may be different from the event's
|
|
# db if, for example, --temp-database was specified.
|
|
my $orig_left_db = $self->_use_db($left_dbh, $db);
|
|
my $orig_right_db = $self->_use_db($right_dbh, $db);
|
|
|
|
# First thing, make two temps tables into which the outfiles can
|
|
# be loaded. This requires that we make a CREATE TABLE statement
|
|
# for the result sets' columns.
|
|
my $left_tbl = "`$db`.`mk_upgrade_left`";
|
|
my $right_tbl = "`$db`.`mk_upgrade_right`";
|
|
my $table_ddl = $self->make_table_ddl($res_struct);
|
|
|
|
$left_dbh->do("DROP TABLE IF EXISTS $left_tbl");
|
|
$left_dbh->do("CREATE TABLE $left_tbl $table_ddl");
|
|
$left_dbh->do("LOAD DATA INFILE '$left_outfile' "
|
|
. "INTO TABLE $left_tbl");
|
|
|
|
$right_dbh->do("DROP TABLE IF EXISTS $right_tbl");
|
|
$right_dbh->do("CREATE TABLE $right_tbl $table_ddl");
|
|
$right_dbh->do("LOAD DATA INFILE '$right_outfile' "
|
|
. "INTO TABLE $right_tbl");
|
|
|
|
PTDEBUG && _d('Loaded', $left_outfile, 'into table', $left_tbl, 'and',
|
|
$right_outfile, 'into table', $right_tbl);
|
|
|
|
# Now we need to get all indexes from all tables used by the query
|
|
# and add them to the temp tbl. Some indexes may be invalid, dupes,
|
|
# or generally useless, but we'll let the sync algo decide that later.
|
|
if ( $args{'add-indexes'} ) {
|
|
$self->add_indexes(
|
|
%args,
|
|
dsts => [
|
|
{ dbh => $left_dbh, tbl => $left_tbl },
|
|
{ dbh => $right_dbh, tbl => $right_tbl },
|
|
],
|
|
);
|
|
}
|
|
|
|
# Create a RowDiff with callbacks that will do what we want when rows and
|
|
# columns differ. This RowDiff is passed to TableSyncer which calls it.
|
|
# TODO: explain how these callbacks work together.
|
|
my $max_diff = $args{'max-different-rows'} || 1_000; # 1k=sanity/safety
|
|
my $n_diff = 0;
|
|
my @missing_rows; # not currently saved; row counts show missing rows
|
|
my @different_rows;
|
|
use constant LEFT => 0;
|
|
use constant RIGHT => 1;
|
|
my @l_r = (undef, undef);
|
|
my @last_diff_col;
|
|
my $last_diff = 0;
|
|
my $key_cmp = sub {
|
|
push @last_diff_col, [@_];
|
|
$last_diff--;
|
|
return;
|
|
};
|
|
my $same_row = sub {
|
|
my ( %args ) = @_;
|
|
my ($lr, $rr) = @args{qw(lr rr)};
|
|
if ( $l_r[LEFT] && $l_r[RIGHT] ) {
|
|
PTDEBUG && _d('Saving different row');
|
|
push @different_rows, $last_diff_col[$last_diff];
|
|
$n_diff++;
|
|
}
|
|
elsif ( $l_r[LEFT] ) {
|
|
PTDEBUG && _d('Saving not in right row');
|
|
# push @missing_rows, [$l_r[LEFT], undef];
|
|
$n_diff++;
|
|
}
|
|
elsif ( $l_r[RIGHT] ) {
|
|
PTDEBUG && _d('Saving not in left row');
|
|
# push @missing_rows, [undef, $l_r[RIGHT]];
|
|
$n_diff++;
|
|
}
|
|
else {
|
|
PTDEBUG && _d('No missing or different rows in queue');
|
|
}
|
|
@l_r = (undef, undef);
|
|
@last_diff_col = ();
|
|
$last_diff = 0;
|
|
return;
|
|
};
|
|
my $not_in_left = sub {
|
|
my ( %args ) = @_;
|
|
my ($lr, $rr) = @args{qw(lr rr)};
|
|
$same_row->() if $l_r[RIGHT]; # last missing row
|
|
$l_r[RIGHT] = $rr;
|
|
$same_row->(@l_r) if $l_r[LEFT] && $l_r[RIGHT];
|
|
return;
|
|
};
|
|
my $not_in_right = sub {
|
|
my ( %args ) = @_;
|
|
my ($lr, $rr) = @args{qw(lr rr)};
|
|
$same_row->() if $l_r[LEFT]; # last missing row
|
|
$l_r[LEFT] = $lr;
|
|
$same_row->(@l_r) if $l_r[LEFT] && $l_r[RIGHT];
|
|
return;
|
|
};
|
|
my $done = sub {
|
|
my ( %args ) = @_;
|
|
my ($left, $right) = @args{qw(left_sth right_sth)};
|
|
PTDEBUG && _d('Found', $n_diff, 'of', $max_diff, 'max differences');
|
|
if ( $n_diff >= $max_diff ) {
|
|
PTDEBUG && _d('Done comparing rows, got --max-differences', $max_diff);
|
|
$left->finish();
|
|
$right->finish();
|
|
return 1;
|
|
}
|
|
return 0;
|
|
};
|
|
my $trf;
|
|
if ( my $n = $args{'float-precision'} ) {
|
|
$trf = sub {
|
|
my ( $l, $r, $tbl, $col ) = @_;
|
|
return $l, $r
|
|
unless $tbl->{type_for}->{$col} =~ m/(?:float|double|decimal)/;
|
|
my $l_rounded = sprintf "%.${n}f", $l;
|
|
my $r_rounded = sprintf "%.${n}f", $r;
|
|
PTDEBUG && _d('Rounded', $l, 'to', $l_rounded,
|
|
'and', $r, 'to', $r_rounded);
|
|
return $l_rounded, $r_rounded;
|
|
};
|
|
};
|
|
|
|
my $rd = new RowDiff(
|
|
dbh => $left_dbh,
|
|
key_cmp => $key_cmp,
|
|
same_row => $same_row,
|
|
not_in_left => $not_in_left,
|
|
not_in_right => $not_in_right,
|
|
done => $done,
|
|
trf => $trf,
|
|
);
|
|
my $ch = new ChangeHandler(
|
|
left_db => $db,
|
|
left_tbl => 'mk_upgrade_left',
|
|
right_db => $db,
|
|
right_tbl => 'mk_upgrade_right',
|
|
tbl_struct => $res_struct,
|
|
queue => 0,
|
|
replace => 0,
|
|
actions => [],
|
|
Quoter => $self->{Quoter},
|
|
);
|
|
|
|
# With whatever index we may have, let TableSyncer choose an
|
|
# algorithm and find were rows differ. We don't actually sync
|
|
# the tables (execute=>0). Instead, the callbacks above will
|
|
# save rows in @missing_rows and @different_rows.
|
|
$self->{TableSyncer}->sync_table(
|
|
plugins => $self->{plugins},
|
|
src => {
|
|
dbh => $left_dbh,
|
|
db => $db,
|
|
tbl => 'mk_upgrade_left',
|
|
},
|
|
dst => {
|
|
dbh => $right_dbh,
|
|
db => $db,
|
|
tbl => 'mk_upgrade_right',
|
|
},
|
|
tbl_struct => $res_struct,
|
|
cols => $res_struct->{cols},
|
|
chunk_size => 1_000,
|
|
RowDiff => $rd,
|
|
ChangeHandler => $ch,
|
|
);
|
|
|
|
if ( $n_diff < $max_diff ) {
|
|
$same_row->() if $l_r[LEFT] || $l_r[RIGHT]; # save remaining rows
|
|
}
|
|
|
|
# Switch back to the original dbs.
|
|
$self->_use_db($left_dbh, $orig_left_db);
|
|
$self->_use_db($right_dbh, $orig_right_db);
|
|
|
|
return @different_rows;
|
|
}
|
|
|
|
# Writes the current row and all remaining rows to an outfile.
|
|
# Returns the outfile's name.
|
|
sub write_to_outfile {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(side row sth Outfile);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ( $side, $row, $sth, $outfile ) = @args{@required_args};
|
|
my ( $fh, $file ) = $self->open_outfile(%args);
|
|
|
|
# Write this one row.
|
|
$outfile->write($fh, [ MockSyncStream::as_arrayref($sth, $row) ]);
|
|
|
|
# Get and write all remaining rows.
|
|
my $remaining_rows = $sth->fetchall_arrayref();
|
|
$outfile->write($fh, $remaining_rows);
|
|
|
|
my $n_rows = 1 + @$remaining_rows;
|
|
PTDEBUG && _d('Wrote', $n_rows, 'rows');
|
|
|
|
close $fh or warn "Cannot close $file: $OS_ERROR";
|
|
return $file, $n_rows;
|
|
}
|
|
|
|
sub open_outfile {
|
|
my ( $self, %args ) = @_;
|
|
my $outfile = $self->{'base-dir'} . "/$args{side}-outfile.txt";
|
|
open my $fh, '>', $outfile or die "Cannot open $outfile: $OS_ERROR";
|
|
PTDEBUG && _d('Opened outfile', $outfile);
|
|
return $fh, $outfile;
|
|
}
|
|
|
|
# Returns just the column definitions for the given struct.
|
|
# Example:
|
|
# (
|
|
# `i` integer,
|
|
# `f` float(10,8)
|
|
# )
|
|
sub make_table_ddl {
|
|
my ( $self, $struct ) = @_;
|
|
my $sql = "(\n"
|
|
. (join("\n",
|
|
map {
|
|
my $name = $_;
|
|
my $type = $struct->{type_for}->{$_};
|
|
my $size = $struct->{size}->{$_} || '';
|
|
" `$name` $type$size,";
|
|
} @{$struct->{cols}}))
|
|
. ')';
|
|
# The last column will be like "`i` integer,)" which is invalid.
|
|
$sql =~ s/,\)$/\n)/;
|
|
PTDEBUG && _d('Table ddl:', $sql);
|
|
return $sql;
|
|
}
|
|
|
|
# Adds every index from every table used by the query to all the
|
|
# dest tables. dest is an arrayref of hashes, one for each destination.
|
|
# Each hash needs a dbh and tbl key; e.g.:
|
|
# [
|
|
# {
|
|
# dbh => $dbh,
|
|
# tbl => 'db.tbl',
|
|
# },
|
|
# ],
|
|
# For the moment, the sub returns nothing. In the future, it should
|
|
# add to $args{struct}->{keys} the keys that it was able to add.
|
|
sub add_indexes {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(query dsts db);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($query, $dsts) = @args{@required_args};
|
|
|
|
my $qp = $self->{QueryParser};
|
|
my $tp = $self->{TableParser};
|
|
my $q = $self->{Quoter};
|
|
|
|
my @src_tbls = $qp->get_tables($query);
|
|
my @keys;
|
|
foreach my $db_tbl ( @src_tbls ) {
|
|
my ($db, $tbl) = $q->split_unquote($db_tbl, $args{db});
|
|
if ( $db ) {
|
|
my $tbl_struct;
|
|
eval {
|
|
$tbl_struct = $tp->parse(
|
|
$tp->get_create_table(
|
|
dbh => $dsts->[0]->{dbh},
|
|
db => $db,
|
|
tbl => $tbl,
|
|
));
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
PTDEBUG && _d('Error parsing', $db, '.', $tbl, ':', $EVAL_ERROR);
|
|
next;
|
|
}
|
|
push @keys, map {
|
|
my $def = ($_->{is_unique} ? 'UNIQUE ' : '')
|
|
. "KEY ($_->{colnames})";
|
|
[$def, $_];
|
|
} grep { $_->{type} eq 'BTREE' } values %{$tbl_struct->{keys}};
|
|
}
|
|
else {
|
|
PTDEBUG && _d('Cannot get indexes from', $db_tbl, 'because its '
|
|
. 'database is unknown');
|
|
}
|
|
}
|
|
PTDEBUG && _d('Source keys:', Dumper(\@keys));
|
|
return unless @keys;
|
|
|
|
for my $dst ( @$dsts ) {
|
|
foreach my $key ( @keys ) {
|
|
my $def = $key->[0];
|
|
my $sql = "ALTER TABLE $dst->{tbl} ADD $key->[0]";
|
|
PTDEBUG && _d($sql);
|
|
eval {
|
|
$dst->{dbh}->do($sql);
|
|
};
|
|
if ( $EVAL_ERROR ) {
|
|
PTDEBUG && _d($EVAL_ERROR);
|
|
}
|
|
else {
|
|
# TODO: $args{res_struct}->{keys}->{$key->[1]->{name}} = $key->[1];
|
|
}
|
|
}
|
|
}
|
|
|
|
# If the query uses only 1 table then return its struct.
|
|
# TODO: $args{struct} = $struct if @src_tbls == 1;
|
|
return;
|
|
}
|
|
|
|
sub report {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(hosts);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my ($hosts) = @args{@required_args};
|
|
|
|
return unless keys %{$self->{diffs}};
|
|
|
|
# These columns are common to all the reports; make them just once.
|
|
my $query_id_col = {
|
|
name => 'Query ID',
|
|
};
|
|
my $hostno = 0;
|
|
my @host_cols = map {
|
|
$hostno++;
|
|
my $col = { name => "host$hostno" };
|
|
$col;
|
|
} @$hosts;
|
|
|
|
my @reports;
|
|
foreach my $diff ( qw(checksums col_vals row_counts) ) {
|
|
my $report = "_report_diff_$diff";
|
|
push @reports, $self->$report(
|
|
query_id_col => $query_id_col,
|
|
host_cols => \@host_cols,
|
|
%args
|
|
);
|
|
}
|
|
|
|
return join("\n", @reports);
|
|
}
|
|
|
|
sub _report_diff_checksums {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(query_id_col host_cols);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
|
|
my $get_id = $self->{get_id};
|
|
|
|
return unless keys %{$self->{diffs}->{checksums}};
|
|
|
|
my $report = new ReportFormatter();
|
|
$report->title('Checksum differences');
|
|
$report->set_columns(
|
|
$args{query_id_col},
|
|
@{$args{host_cols}},
|
|
);
|
|
|
|
my $diff_checksums = $self->{diffs}->{checksums};
|
|
foreach my $item ( sort keys %$diff_checksums ) {
|
|
map {
|
|
$report->add_line(
|
|
$get_id->($item) . '-' . $_,
|
|
@{$diff_checksums->{$item}->{$_}},
|
|
);
|
|
} sort { $a <=> $b } keys %{$diff_checksums->{$item}};
|
|
}
|
|
|
|
return $report->get_report();
|
|
}
|
|
|
|
sub _report_diff_col_vals {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(query_id_col host_cols);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
|
|
my $get_id = $self->{get_id};
|
|
|
|
return unless keys %{$self->{diffs}->{col_vals}};
|
|
|
|
my $report = new ReportFormatter();
|
|
$report->title('Column value differences');
|
|
$report->set_columns(
|
|
$args{query_id_col},
|
|
{
|
|
name => 'Column'
|
|
},
|
|
@{$args{host_cols}},
|
|
);
|
|
my $diff_col_vals = $self->{diffs}->{col_vals};
|
|
foreach my $item ( sort keys %$diff_col_vals ) {
|
|
foreach my $sampleno (sort {$a <=> $b} keys %{$diff_col_vals->{$item}}) {
|
|
map {
|
|
$report->add_line(
|
|
$get_id->($item) . '-' . $sampleno,
|
|
@$_,
|
|
);
|
|
} @{$diff_col_vals->{$item}->{$sampleno}};
|
|
}
|
|
}
|
|
|
|
return $report->get_report();
|
|
}
|
|
|
|
sub _report_diff_row_counts {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(query_id_col hosts);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
|
|
my $get_id = $self->{get_id};
|
|
|
|
return unless keys %{$self->{diffs}->{row_counts}};
|
|
|
|
my $report = new ReportFormatter();
|
|
$report->title('Row count differences');
|
|
my $hostno = 0;
|
|
$report->set_columns(
|
|
$args{query_id_col},
|
|
(map {
|
|
$hostno++;
|
|
my $col = { name => "host$hostno", right_justify => 1 };
|
|
$col;
|
|
} @{$args{hosts}}),
|
|
);
|
|
|
|
my $diff_row_counts = $self->{diffs}->{row_counts};
|
|
foreach my $item ( sort keys %$diff_row_counts ) {
|
|
map {
|
|
$report->add_line(
|
|
$get_id->($item) . '-' . $_,
|
|
@{$diff_row_counts->{$item}->{$_}},
|
|
);
|
|
} sort { $a <=> $b } keys %{$diff_row_counts->{$item}};
|
|
}
|
|
|
|
return $report->get_report();
|
|
}
|
|
|
|
sub samples {
|
|
my ( $self, $item ) = @_;
|
|
return unless $item;
|
|
my @samples;
|
|
foreach my $sampleno ( keys %{$self->{samples}->{$item}} ) {
|
|
push @samples, $sampleno, $self->{samples}->{$item}->{$sampleno};
|
|
}
|
|
return @samples;
|
|
}
|
|
|
|
sub reset {
|
|
my ( $self ) = @_;
|
|
$self->{diffs} = {};
|
|
$self->{samples} = {};
|
|
return;
|
|
}
|
|
|
|
# USE $new_db, return current db before the switch.
|
|
sub _use_db {
|
|
my ( $self, $dbh, $new_db ) = @_;
|
|
return unless $new_db;
|
|
my $sql = 'SELECT DATABASE()';
|
|
PTDEBUG && _d($sql);
|
|
my $curr = $dbh->selectrow_array($sql);
|
|
if ( $curr && $new_db && $curr eq $new_db ) {
|
|
PTDEBUG && _d('Current and new DB are the same');
|
|
return $curr;
|
|
}
|
|
$sql = "USE `$new_db`";
|
|
PTDEBUG && _d($sql);
|
|
$dbh->do($sql);
|
|
return $curr;
|
|
}
|
|
|
|
sub _d {
|
|
my ($package, undef, $line) = caller 0;
|
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
|
map { defined $_ ? $_ : 'undef' }
|
|
@_;
|
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
|
}
|
|
|
|
1;
|
|
}
|
|
# ###########################################################################
|
|
# End CompareResults package
|
|
# ###########################################################################
|