Merge lp:~percona-toolkit-dev/percona-toolkit/pt-table-checksum-2.0.

This commit is contained in:
Daniel Nichter
2011-12-29 17:39:26 -07:00
177 changed files with 12244 additions and 10107 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1104,6 +1104,48 @@ sub join_quote {
return $db ? "$db.$tbl" : $tbl;
}
sub serialize_list {
my ( $self, @args ) = @_;
return unless @args;
return $args[0] if @args == 1 && !defined $args[0];
die "Cannot serialize multiple values with undef/NULL"
if grep { !defined $_ } @args;
return join ',', map { quotemeta } @args;
}
sub deserialize_list {
my ( $self, $string ) = @_;
return $string unless defined $string;
my @escaped_parts = $string =~ /
\G # Start of string, or end of previous match.
( # Each of these is an element in the original list.
[^\\,]* # Anything not a backslash or a comma
(?: # When we get here, we found one of the above.
\\. # A backslash followed by something so we can continue
[^\\,]* # Same as above.
)* # Repeat zero of more times.
)
, # Comma dividing elements
/sxgc;
push @escaped_parts, pos($string) ? substr( $string, pos($string) ) : $string;
my @unescaped_parts = map {
my $part = $_;
my $char_class = utf8::is_utf8($part) # If it's a UTF-8 string,
? qr/(?=\p{ASCII})\W/ # We only care about non-word
: qr/(?=\p{ASCII})\W|[\x{80}-\x{FF}]/; # Otherwise,
$part =~ s/\\($char_class)/$1/g;
$part;
} @escaped_parts;
return @unescaped_parts;
}
1;
}
# ###########################################################################
@@ -4167,18 +4209,16 @@ sub make_checksum_query {
sub find_replication_differences {
my ( $self, $dbh, $table ) = @_;
(my $sql = <<" EOF") =~ s/\s+/ /gm;
SELECT db, tbl, chunk, boundaries,
COALESCE(this_cnt-master_cnt, 0) AS cnt_diff,
COALESCE(
this_crc <> master_crc OR ISNULL(master_crc) <> ISNULL(this_crc),
0
) AS crc_diff,
this_cnt, master_cnt, this_crc, master_crc
FROM $table
WHERE master_cnt <> this_cnt OR master_crc <> this_crc
OR ISNULL(master_crc) <> ISNULL(this_crc)
EOF
my $sql
= "SELECT db, tbl, CONCAT(db, '.', tbl) AS `table`, "
. "chunk, chunk_index, lower_boundary, upper_boundary, "
. "COALESCE(this_cnt-master_cnt, 0) AS cnt_diff, "
. "COALESCE("
. "this_crc <> master_crc OR ISNULL(master_crc) <> ISNULL(this_crc), 0"
. ") AS crc_diff, this_cnt, master_cnt, this_crc, master_crc "
. "FROM $table "
. "WHERE master_cnt <> this_cnt OR master_crc <> this_crc "
. "OR ISNULL(master_crc) <> ISNULL(this_crc)";
MKDEBUG && _d($sql);
my $diffs = $dbh->selectall_arrayref($sql, { Slice => {} });
@@ -7248,6 +7288,7 @@ package pt_table_sync;
use English qw(-no_match_vars);
use List::Util qw(sum max min);
use POSIX qw(ceil);
use Data::Dumper;
Transformers->import(qw(time_to_secs any_unix_timestamp));
@@ -7256,6 +7297,7 @@ use constant MKDEBUG => $ENV{MKDEBUG} || 0;
$OUTPUT_AUTOFLUSH = 1;
my %dsn_for;
my $q = new Quoter();
sub main {
@ARGV = @_; # set global ARGV for this package
@@ -7375,7 +7417,6 @@ sub main {
# ########################################################################
# Do the work.
# ########################################################################
my $q = new Quoter();
my $tp = new TableParser( Quoter => $q );
my $vp = new VersionParser();
my $ms = new MasterSlave(VersionParser => $vp);
@@ -7743,7 +7784,8 @@ sub sync_via_replication {
$exit_status |= sync_a_table(
src => $src,
dst => $dst,
where => $diff->{boundaries},
where => 1, # prevents --where from being used
diff => $diff,
%args,
);
}
@@ -7811,7 +7853,8 @@ sub sync_via_replication {
$exit_status |= sync_a_table(
src => $src,
dst => $dst,
where => $diff->{boundaries},
where => 1, # prevents --where from being used
diff => $diff,
%args,
);
}
@@ -8065,6 +8108,14 @@ sub sync_a_table {
# This will either die if there's a problem or return the tbl struct.
my $tbl_struct = ok_to_sync($src, $dst, %args);
if ( my $diff = $args{diff} ) {
MKDEBUG && _d('Converting checksum diff to WHERE:', Dumper($diff));
$args{where} = diff_where(
%args,
tbl_struct => $tbl_struct,
);
}
# If the table is InnoDB, prefer to sync it with transactions, unless
# the user explicitly said not to.
@@ -8510,13 +8561,13 @@ sub ok_to_sync {
sub filter_diffs {
my ( $skip_table, $databases, $tables, @diffs ) = @_;
return grep {
!$skip_table->{$_->{db}}->{$_->{tbl}}
&& (!$databases || $databases->{$_->{db}})
&& (!$tables || ($tables->{$_->{tbl}} || $tables->{"$_->{db}.$_->{tbl}"}))
my ($db, $tbl) = $q->split_unquote($_->{table});
!$skip_table->{$db}->{$tbl}
&& (!$databases || $databases->{$db})
&& (!$tables || ($tables->{$tbl} || $tables->{$_->{table}}))
} @diffs;
}
# Sub: disconnect
# Disconnect host dbhs created by <get_cxn()>. To make sure all dbh
# are closed, pt-table-sync keeps track of the dbh it opens and this
@@ -8804,6 +8855,56 @@ sub get_current_user {
return $user;
}
{
my %asc_for_table;
sub diff_where {
my (%args) = @_;
my @required_args = qw(diff tbl_struct TableNibbler);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($diff, $tbl_struct, $tn) = @args{@required_args};
my $key = $diff->{chunk_index};
if ( !$key ) {
MKDEBUG && _d('One nibble checksum');
return;
}
my $cols = $tbl_struct->{keys}->{$key}->{cols};
my $asc = $asc_for_table{$diff->{table}};
if ( !$asc ) {
die "Index $key does not exist in table" unless $cols && @$cols;
# NibbleIterator does this to make the boundary statements.
$asc = $args{TableNibbler}->generate_asc_stmt(
%args,
tbl_struct => $tbl_struct,
index => $key,
cols => $cols,
asc_only => 1,
);
$asc_for_table{$diff->{table}} = $asc;
MKDEBUG && _d('Ascend params:', Dumper($asc));
}
my $lb_sql = $asc->{boundaries}->{'>='};
foreach my $val ( $q->deserialize_list($diff->{lower_boundary}) ) {
my $quoted_val = $q->quote_val($val);
$lb_sql =~ s/\?/$val/;
}
my $ub_sql = $asc->{boundaries}->{'<='};
foreach my $val ( $q->deserialize_list($diff->{upper_boundary}) ) {
my $quoted_val = $q->quote_val($val);
$ub_sql =~ s/\?/$val/;
}
return "$lb_sql AND $ub_sql";
}
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }