mirror of
https://github.com/percona/percona-toolkit.git
synced 2026-02-27 02:00:57 +08:00
Make pt-table-sync --replicate work with pt-table-checksum 2.0.
This commit is contained in:
@@ -2672,7 +2672,6 @@ sub new {
|
||||
|
||||
my $where = $o->get('where');
|
||||
my ($row_est, $mysql_index) = get_row_estimate(%args, where => $where);
|
||||
MKDEBUG && _d($row_est, 'estimated rows, MySQL chose index', $mysql_index);
|
||||
my $chunk_size_limit = $o->has('chunk-size-limit')
|
||||
? $o->get('chunk-size-limit')
|
||||
: 1;
|
||||
@@ -2901,9 +2900,16 @@ sub next {
|
||||
: $self->{nibble_sth}->fetchrow_arrayref();
|
||||
if ( $row ) {
|
||||
$self->{rowno}++;
|
||||
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno});
|
||||
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno},
|
||||
'from', $self->{Cxn}->name());
|
||||
return $self->{fetch_hashref} ? $row : [ @$row ];
|
||||
}
|
||||
else {
|
||||
MKDEBUG && _d('No row in nibble');
|
||||
if ( $self->{empty_results} ) {
|
||||
return $self->{fetch_hashref} ? {} : [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MKDEBUG && _d('No rows in nibble or nibble skipped');
|
||||
@@ -3005,7 +3011,10 @@ sub more_boundaries {
|
||||
|
||||
sub no_more_rows {
|
||||
my ($self) = @_;
|
||||
$self->{nibble_sth}->finish() if $self->{nibble_sth};
|
||||
$self->{have_rows} = 0;
|
||||
$self->{rowno} = 0;
|
||||
MKDEBUG && _d('No more rows');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3195,7 +3204,8 @@ sub _next_boundaries {
|
||||
return 1; # continue nibbling
|
||||
}
|
||||
|
||||
if ( $self->identical_boundaries($self->{lower}, $self->{next_lower}) ) {
|
||||
if ( !$self->{manual_nibble}
|
||||
&& $self->identical_boundaries($self->{lower}, $self->{next_lower}) ) {
|
||||
MKDEBUG && _d('Infinite loop detected');
|
||||
my $tbl = $self->{tbl};
|
||||
my $index = $tbl->{tbl_struct}->{keys}->{$self->{index}};
|
||||
@@ -3227,6 +3237,8 @@ sub _next_boundaries {
|
||||
}
|
||||
}
|
||||
|
||||
return 1 if $self->{manual_nibble};
|
||||
|
||||
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
|
||||
join(', ', @{$self->{lower}}), $self->{limit});
|
||||
$self->{ub_sth}->execute(@{$self->{lower}}, $self->{limit});
|
||||
@@ -3655,6 +3667,8 @@ sub sync_table {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my ($src, $dst, $row_syncer, $changer) = @args{@required_args};
|
||||
|
||||
my $diffs = $args{diffs};
|
||||
my $changing_src = $args{changing_src};
|
||||
|
||||
my $o = $self->{OptionParser};
|
||||
@@ -3674,6 +3688,7 @@ sub sync_table {
|
||||
. " dst_host:" . $dst->{Cxn}->name()
|
||||
. " dst_tbl:" . join('.', @{$dst->{tbl}}{qw(db tbl)})
|
||||
. " changing_src:" . ($changing_src ? "yes" : "no")
|
||||
. " diffs:" . ($diffs ? scalar @$diffs : "0")
|
||||
. " " . join(" ", map { "$_:" . ($o->get($_) ? "yes" : "no") }
|
||||
qw(lock transaction replicate bidirectional))
|
||||
. " pid:$PID "
|
||||
@@ -3682,34 +3697,41 @@ sub sync_table {
|
||||
MKDEBUG && _d("Binlog trace message:", $trace);
|
||||
}
|
||||
|
||||
my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh());
|
||||
my $chunk_cols = $row_checksum->make_chunk_checksum(
|
||||
dbh => $src->{Cxn}->dbh(),
|
||||
tbl => $src->{tbl},
|
||||
%crc_args
|
||||
);
|
||||
my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh());
|
||||
my $chunk_cols;
|
||||
if ( $diffs ) {
|
||||
$chunk_cols = "0 AS cnt, '' AS crc";
|
||||
}
|
||||
else {
|
||||
|
||||
$chunk_cols = $row_checksum->make_chunk_checksum(
|
||||
dbh => $src->{Cxn}->dbh(),
|
||||
tbl => $src->{tbl},
|
||||
%crc_args
|
||||
);
|
||||
}
|
||||
|
||||
if ( !defined $src->{sql_lock} || !defined $dst->{dst_lock} ) {
|
||||
if ( !defined $src->{select_lock} || !defined $dst->{select_lock} ) {
|
||||
if ( $o->get('transaction') ) {
|
||||
if ( $o->get('bidirectional') ) {
|
||||
$src->{sql_lock} = 'FOR UPDATE';
|
||||
$dst->{sql_lock} = 'FOR UPDATE';
|
||||
$src->{select_lock} = 'FOR UPDATE';
|
||||
$dst->{select_lock} = 'FOR UPDATE';
|
||||
}
|
||||
elsif ( $changing_src ) {
|
||||
$src->{sql_lock} = 'FOR UPDATE';
|
||||
$dst->{sql_lock} = 'LOCK IN SHARE MODE';
|
||||
$src->{select_lock} = 'FOR UPDATE';
|
||||
$dst->{select_lock} = 'LOCK IN SHARE MODE';
|
||||
}
|
||||
else {
|
||||
$src->{sql_lock} = 'LOCK IN SHARE MODE';
|
||||
$dst->{sql_lock} = 'FOR UPDATE';
|
||||
$src->{select_lock} = 'LOCK IN SHARE MODE';
|
||||
$dst->{select_lock} = 'FOR UPDATE';
|
||||
}
|
||||
}
|
||||
else {
|
||||
$src->{sql_lock} = '';
|
||||
$dst->{sql_lock} = '';
|
||||
$src->{select_lock} = '';
|
||||
$dst->{select_lock} = '';
|
||||
}
|
||||
MKDEBUG && _d('src sql lock:', $src->{sql_lock});
|
||||
MKDEBUG && _d('dst sql lock:', $dst->{sql_lock});
|
||||
MKDEBUG && _d('SELECT lock:', $src->{select_lock});
|
||||
MKDEBUG && _d('SELECT lock:', $dst->{select_lock});
|
||||
}
|
||||
|
||||
my $user_where = $o->get('where');
|
||||
@@ -3756,11 +3778,25 @@ sub sync_table {
|
||||
changing_src => $changing_src,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
return $oktonibble;
|
||||
},
|
||||
next_boundaries => sub {
|
||||
my (%args) = @_;
|
||||
my $tbl = $args{tbl};
|
||||
if ( my $diff = $tbl->{diff} ) {
|
||||
my $nibble_iter = $args{NibbleIterator};
|
||||
my $boundary = $nibble_iter->boundaries();
|
||||
$nibble_iter->set_boundary(
|
||||
'upper', [ split ',', $diff->{upper_boundary} ]);
|
||||
$nibble_iter->set_boundary(
|
||||
'lower', [ split ',', $diff->{lower_boundary} ]);
|
||||
}
|
||||
return 1;
|
||||
},
|
||||
exec_nibble => sub {
|
||||
my (%args) = @_;
|
||||
my $tbl = $args{tbl};
|
||||
my $nibble_iter = $args{NibbleIterator};
|
||||
my $sths = $nibble_iter->statements();
|
||||
my $boundary = $nibble_iter->boundaries();
|
||||
@@ -3789,7 +3825,7 @@ sub sync_table {
|
||||
|
||||
MKDEBUG && _d('nibble', $args{Cxn}->name());
|
||||
$sths->{nibble}->execute(@{$boundary->{lower}}, @{$boundary->{upper}});
|
||||
return $sths->{nibble}->rows();
|
||||
return 1;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -3797,8 +3833,12 @@ sub sync_table {
|
||||
Cxn => $host->{Cxn},
|
||||
tbl => $host->{tbl},
|
||||
chunk_size => $o->get('chunk-size'),
|
||||
chunk_index => $o->get('chunk-index'),
|
||||
chunk_index => $diffs ? $diffs->[0]->{chunk_index}
|
||||
: $o->get('chunk-index'),
|
||||
manual_nibble => $diffs ? 1 : 0,
|
||||
empty_results => 1,
|
||||
select => $chunk_cols,
|
||||
select_lock => $host->{select_lock},
|
||||
callbacks => $callbacks,
|
||||
fetch_hashref => 1,
|
||||
one_nibble => $args{one_nibble},
|
||||
@@ -3865,13 +3905,26 @@ sub sync_table {
|
||||
while ( $src_nibble_iter->more_boundaries()
|
||||
|| $dst_nibble_iter->more_boundaries() ) {
|
||||
|
||||
if ( $diffs ) {
|
||||
my $diff = shift @$diffs;
|
||||
if ( !$diff ) {
|
||||
MKDEBUG && _d('No more checksum diffs');
|
||||
last;
|
||||
}
|
||||
MKDEBUG && _d('Syncing checksum diff', Dumper($diff));
|
||||
$src->{tbl}->{diff} = $diff;
|
||||
$dst->{tbl}->{diff} = $diff;
|
||||
}
|
||||
|
||||
my $src_chunk = $src_nibble_iter->next();
|
||||
my $dst_chunk = $dst_nibble_iter->next();
|
||||
MKDEBUG && _d('Got chunk');
|
||||
|
||||
if ( ($src_chunk->{cnt} || 0) != ($dst_chunk->{cnt} || 0)
|
||||
if ( $diffs
|
||||
|| ($src_chunk->{cnt} || 0) != ($dst_chunk->{cnt} || 0)
|
||||
|| ($src_chunk->{crc} || '') ne ($dst_chunk->{crc} || '') ) {
|
||||
MKDEBUG && _d("Chunks differ");
|
||||
my $boundary = $src_nibble_iter->boundaries();
|
||||
my $boundary = $src_nibble_iter->boundaries();
|
||||
foreach my $host ($src, $dst) {
|
||||
MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement},
|
||||
'params:', @{$boundary->{lower}}, @{$boundary->{upper}});
|
||||
@@ -6787,6 +6840,9 @@ use constant MKDEBUG => $ENV{MKDEBUG} || 0;
|
||||
|
||||
$OUTPUT_AUTOFLUSH = 1;
|
||||
|
||||
my $q = new Quoter();
|
||||
my $vp = new VersionParser();
|
||||
|
||||
sub main {
|
||||
@ARGV = @_; # set global ARGV for this package
|
||||
my $exit_status = 0; # 1: internal error, 2: tables differed, 3: both
|
||||
@@ -6892,8 +6948,6 @@ sub main {
|
||||
# ########################################################################
|
||||
# Connect to hosts.
|
||||
# ########################################################################
|
||||
my $q = new Quoter();
|
||||
my $vp = new VersionParser();
|
||||
my $ms = new MasterSlave(VersionParser => $vp);
|
||||
|
||||
my $set_on_connect = sub {
|
||||
@@ -7079,15 +7133,13 @@ sub main {
|
||||
# Exit status
|
||||
sub lock_and_rename {
|
||||
my ( %args ) = @_;
|
||||
my @required_args = qw(cxns OptionParser DSNParser Quoter
|
||||
VersionParser);
|
||||
my @required_args = qw(cxns OptionParser DSNParser);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my $cxns = $args{cxns};
|
||||
my $o = $args{OptionParser};
|
||||
my $dp = $args{DSNParser};
|
||||
my $q = $args{Quoter};
|
||||
|
||||
MKDEBUG && _d('Locking and syncing ONE TABLE with rename');
|
||||
my $src = {
|
||||
@@ -7264,16 +7316,14 @@ sub sync_one_table {
|
||||
# <filter_diffs()>
|
||||
sub sync_via_replication {
|
||||
my ( %args ) = @_;
|
||||
my @required_args = qw(cxns OptionParser DSNParser Quoter
|
||||
VersionParser RowChecksum MasterSlave TableSyncer
|
||||
make_cxn);
|
||||
my @required_args = qw(cxns OptionParser DSNParser
|
||||
RowChecksum MasterSlave TableSyncer make_cxn);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my $cxns = $args{cxns};
|
||||
my $o = $args{OptionParser};
|
||||
my $dp = $args{DSNParser};
|
||||
my $q = $args{Quoter};
|
||||
my $checksum = $args{RowChecksum};
|
||||
my $ms = $args{MasterSlave};
|
||||
my $table_syncer = $args{TableSyncer};
|
||||
@@ -7307,15 +7357,19 @@ sub sync_via_replication {
|
||||
my %skip_table;
|
||||
map { $skip_table{$_->{db}}->{$_->{tbl}}++ }
|
||||
$checksum->find_replication_differences(
|
||||
$src->{Cxn}->dbh(), $o->get('replicate'));
|
||||
dbh => $src->{Cxn}->dbh(),
|
||||
repl_table => $o->get('replicate'),
|
||||
);
|
||||
|
||||
# Now check the slave for differences and sync them if necessary.
|
||||
my @diffs = filter_diffs(
|
||||
my $diffs = filter_diffs(
|
||||
\%skip_table,
|
||||
$databases,
|
||||
$tables,
|
||||
$checksum->find_replication_differences(
|
||||
$dst->{Cxn}->dbh(), $o->get('replicate'))
|
||||
dbh => $dst->{Cxn}->dbh(),
|
||||
repl_table => $o->get('replicate'),
|
||||
),
|
||||
);
|
||||
|
||||
if ( $o->get('verbose') ) {
|
||||
@@ -7325,7 +7379,9 @@ sub sync_via_replication {
|
||||
. "\n";
|
||||
}
|
||||
|
||||
if ( @diffs ) {
|
||||
if ( @$diffs ) {
|
||||
my $diffs_for_table = group_diffs_by_table($diffs);
|
||||
|
||||
$table_syncer->lock_and_wait(
|
||||
lock_level => 3,
|
||||
host => $src,
|
||||
@@ -7337,16 +7393,20 @@ sub sync_via_replication {
|
||||
src => $src,
|
||||
);
|
||||
|
||||
foreach my $diff ( @diffs ) {
|
||||
print Dumper($diff);
|
||||
$src->{tbl} = $diff->{tbl};
|
||||
$dst->{tbl} = $diff->{tbl};
|
||||
foreach my $diffs ( @$diffs_for_table ) {
|
||||
my @db_tbl = $q->split_unquote($diffs->[0]->{table});
|
||||
my $tbl = {
|
||||
db => $db_tbl[0],
|
||||
tbl => $db_tbl[1],
|
||||
};
|
||||
$src->{tbl} = $tbl;
|
||||
$dst->{tbl} = $tbl;
|
||||
|
||||
$exit_status |= sync_a_table(
|
||||
%args,
|
||||
src => $src,
|
||||
dst => $dst,
|
||||
boundaries => $diff->{boundaries},
|
||||
src => $src,
|
||||
dst => $dst,
|
||||
diffs => $diffs,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7371,20 +7431,23 @@ sub sync_via_replication {
|
||||
recurse => 1,
|
||||
callback => sub {
|
||||
my ( $dsn, $dbh, $level, $parent ) = @_;
|
||||
my @diffs = $checksum
|
||||
->find_replication_differences($dbh, $o->get('replicate'));
|
||||
my $diffs = $checksum->find_replication_differences(
|
||||
dbh => $dbh,
|
||||
repl_table => $o->get('replicate'),
|
||||
);
|
||||
|
||||
if ( !$level ) {
|
||||
# This is the master; don't sync any tables that are wrong
|
||||
# here, for obvious reasons.
|
||||
map { $skip_table{$_->{db}}->{$_->{tbl}}++ } @diffs;
|
||||
map { $skip_table{$_->{db}}->{$_->{tbl}}++ } @$diffs;
|
||||
}
|
||||
else {
|
||||
# This is a slave.
|
||||
@diffs = filter_diffs(
|
||||
$diffs = filter_diffs(
|
||||
\%skip_table,
|
||||
$databases,
|
||||
$tables,
|
||||
@diffs
|
||||
$diffs
|
||||
);
|
||||
|
||||
if ( $o->get('verbose') ) {
|
||||
@@ -7396,7 +7459,9 @@ sub sync_via_replication {
|
||||
: '') . "\n";
|
||||
}
|
||||
|
||||
if ( @diffs ) {
|
||||
if ( @$diffs ) {
|
||||
my $diffs_for_table = group_diffs_by_table($diffs);
|
||||
|
||||
my $dst = {
|
||||
Cxn => $make_cxn->(dsn => $dsn, dbh => $dbh),
|
||||
tbl => undef, # set later
|
||||
@@ -7413,16 +7478,20 @@ sub sync_via_replication {
|
||||
src => $src,
|
||||
);
|
||||
|
||||
foreach my $diff ( @diffs ) {
|
||||
print Dumper($diff);
|
||||
$src->{tbl} = $diff->{tbl};
|
||||
$dst->{tbl} = $diff->{tbl};
|
||||
foreach my $diffs ( @$diffs_for_table ) {
|
||||
my @db_tbl = $q->split_unquote($diffs->[0]->{table});
|
||||
my $tbl = {
|
||||
db => $db_tbl[0],
|
||||
tbl => $db_tbl[1],
|
||||
};
|
||||
$src->{tbl} = $tbl;
|
||||
$dst->{tbl} = $tbl;
|
||||
|
||||
$exit_status |= sync_a_table(
|
||||
%args,
|
||||
src => $src,
|
||||
dst => $dst,
|
||||
boundaries => $diff->{boundaries},
|
||||
src => $src,
|
||||
dst => $dst,
|
||||
diffs => $diffs,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7466,15 +7535,14 @@ sub sync_via_replication {
|
||||
# Exit status
|
||||
sub sync_all {
|
||||
my ( %args ) = @_;
|
||||
my @required_args = qw(cxns OptionParser DSNParser Quoter
|
||||
TableSyncer VersionParser TableParser MySQLDump);
|
||||
my @required_args = qw(cxns OptionParser DSNParser
|
||||
TableSyncer TableParser MySQLDump);
|
||||
foreach my $arg ( @required_args ) {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my @cxns = @{$args{cxns}};
|
||||
my $o = $args{OptionParser};
|
||||
my $dp = $args{DSNParser};
|
||||
my $q = $args{Quoter};
|
||||
my $tp = $args{TableParser};
|
||||
my $table_syncer = $args{TableSyncer};
|
||||
|
||||
@@ -7653,9 +7721,9 @@ sub sync_a_table {
|
||||
%args,
|
||||
src => $src,
|
||||
dst => $dst,
|
||||
diff => $args{diff},
|
||||
ChangeHandler => $ch,
|
||||
RowSyncer => $row_syncer,
|
||||
boundaries => $args{boundaries},
|
||||
changing_src => $o->get('replicate')
|
||||
|| $o->get('sync-to-master')
|
||||
|| $o->get('bidirectional')
|
||||
@@ -7938,17 +8006,36 @@ sub ok_to_sync {
|
||||
# $skip_table - Hashref of databases and tables to skip
|
||||
# $databases - Hashref of databases to skip
|
||||
# $tables - Hashref of tables to skip
|
||||
# @diffs - Array of hashrefs, one for each different slave table
|
||||
# $diffs - Arrayref of hashrefs, one for each different slave table
|
||||
#
|
||||
# Returns:
|
||||
# Array of different slave tables that pass the filters
|
||||
sub filter_diffs {
|
||||
my ( $skip_table, $databases, $tables, @diffs ) = @_;
|
||||
return grep {
|
||||
!$skip_table->{$_->{db}}->{$_->{tbl}}
|
||||
&& (!$databases || $databases->{$_->{db}})
|
||||
&& (!$tables || ($tables->{$_->{tbl}} || $tables->{"$_->{db}.$_->{tbl}"}))
|
||||
} @diffs;
|
||||
my ( $skip_table, $databases, $tables, $diffs ) = @_;
|
||||
my @filtered_diffs = grep {
|
||||
my ($db, $tbl) = $q->split_unquote($_->{table});
|
||||
!$skip_table->{$db}->{$tbl}
|
||||
&& (!$databases || $databases->{$db})
|
||||
&& (!$tables || ($tables->{$tbl} || $tables->{$_->{table}}))
|
||||
} @$diffs;
|
||||
return \@filtered_diffs;
|
||||
}
|
||||
|
||||
sub group_diffs_by_table {
|
||||
my ($diffs) = @_;
|
||||
my @grouped_diffs;
|
||||
my @table_diffs;
|
||||
my $last_table = '';
|
||||
foreach my $diff ( @$diffs ) {
|
||||
if ( $diff->{table} ne $last_table && @table_diffs ) {
|
||||
push @grouped_diffs, [ @table_diffs ] if @table_diffs;
|
||||
@table_diffs = ();
|
||||
}
|
||||
push @table_diffs, $diff;
|
||||
$last_table = $diff->{table};
|
||||
}
|
||||
push @grouped_diffs, [ @table_diffs ] if @table_diffs;
|
||||
return \@grouped_diffs;
|
||||
}
|
||||
|
||||
# Sub: print_sql
|
||||
|
||||
@@ -324,10 +324,17 @@ sub next {
|
||||
: $self->{nibble_sth}->fetchrow_arrayref();
|
||||
if ( $row ) {
|
||||
$self->{rowno}++;
|
||||
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno});
|
||||
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno},
|
||||
'from', $self->{Cxn}->name());
|
||||
# fetchrow_arraryref re-uses an internal arrayref, so we must copy.
|
||||
return $self->{fetch_hashref} ? $row : [ @$row ];
|
||||
}
|
||||
else {
|
||||
MKDEBUG && _d('No row in nibble');
|
||||
if ( $self->{empty_results} ) {
|
||||
return $self->{fetch_hashref} ? {} : [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MKDEBUG && _d('No rows in nibble or nibble skipped');
|
||||
@@ -429,7 +436,10 @@ sub more_boundaries {
|
||||
|
||||
sub no_more_rows {
|
||||
my ($self) = @_;
|
||||
$self->{nibble_sth}->finish() if $self->{nibble_sth};
|
||||
$self->{have_rows} = 0;
|
||||
$self->{rowno} = 0;
|
||||
MKDEBUG && _d('No more rows');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -636,7 +646,8 @@ sub _next_boundaries {
|
||||
# which will cause us to nibble further ahead and maybe get a new lower
|
||||
# boundary that isn't identical, but we can't detect this, and in any
|
||||
# case, if there's one infinite loop there will probably be others.
|
||||
if ( $self->identical_boundaries($self->{lower}, $self->{next_lower}) ) {
|
||||
if ( !$self->{manual_nibble}
|
||||
&& $self->identical_boundaries($self->{lower}, $self->{next_lower}) ) {
|
||||
MKDEBUG && _d('Infinite loop detected');
|
||||
my $tbl = $self->{tbl};
|
||||
my $index = $tbl->{tbl_struct}->{keys}->{$self->{index}};
|
||||
@@ -668,6 +679,8 @@ sub _next_boundaries {
|
||||
}
|
||||
}
|
||||
|
||||
return 1 if $self->{manual_nibble};
|
||||
|
||||
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
|
||||
join(', ', @{$self->{lower}}), $self->{limit});
|
||||
$self->{ub_sth}->execute(@{$self->{lower}}, $self->{limit});
|
||||
|
||||
@@ -77,6 +77,8 @@ sub sync_table {
|
||||
die "I need a $arg argument" unless $args{$arg};
|
||||
}
|
||||
my ($src, $dst, $row_syncer, $changer) = @args{@required_args};
|
||||
|
||||
my $diffs = $args{diffs};
|
||||
my $changing_src = $args{changing_src};
|
||||
|
||||
my $o = $self->{OptionParser};
|
||||
@@ -97,6 +99,7 @@ sub sync_table {
|
||||
. " dst_host:" . $dst->{Cxn}->name()
|
||||
. " dst_tbl:" . join('.', @{$dst->{tbl}}{qw(db tbl)})
|
||||
. " changing_src:" . ($changing_src ? "yes" : "no")
|
||||
. " diffs:" . ($diffs ? scalar @$diffs : "0")
|
||||
. " " . join(" ", map { "$_:" . ($o->get($_) ? "yes" : "no") }
|
||||
qw(lock transaction replicate bidirectional))
|
||||
. " pid:$PID "
|
||||
@@ -107,37 +110,44 @@ sub sync_table {
|
||||
|
||||
# Make NibbleIterator for checksumming chunks of rows to see if
|
||||
# there are any diffs.
|
||||
my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh());
|
||||
my $chunk_cols = $row_checksum->make_chunk_checksum(
|
||||
dbh => $src->{Cxn}->dbh(),
|
||||
tbl => $src->{tbl},
|
||||
%crc_args
|
||||
);
|
||||
my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh());
|
||||
my $chunk_cols;
|
||||
if ( $diffs ) {
|
||||
$chunk_cols = "0 AS cnt, '' AS crc";
|
||||
}
|
||||
else {
|
||||
|
||||
$chunk_cols = $row_checksum->make_chunk_checksum(
|
||||
dbh => $src->{Cxn}->dbh(),
|
||||
tbl => $src->{tbl},
|
||||
%crc_args
|
||||
);
|
||||
}
|
||||
|
||||
if ( !defined $src->{sql_lock} || !defined $dst->{dst_lock} ) {
|
||||
if ( !defined $src->{select_lock} || !defined $dst->{select_lock} ) {
|
||||
if ( $o->get('transaction') ) {
|
||||
if ( $o->get('bidirectional') ) {
|
||||
# Making changes on src and dst.
|
||||
$src->{sql_lock} = 'FOR UPDATE';
|
||||
$dst->{sql_lock} = 'FOR UPDATE';
|
||||
$src->{select_lock} = 'FOR UPDATE';
|
||||
$dst->{select_lock} = 'FOR UPDATE';
|
||||
}
|
||||
elsif ( $changing_src ) {
|
||||
# Making changes on master (src) which replicate to slave (dst).
|
||||
$src->{sql_lock} = 'FOR UPDATE';
|
||||
$dst->{sql_lock} = 'LOCK IN SHARE MODE';
|
||||
$src->{select_lock} = 'FOR UPDATE';
|
||||
$dst->{select_lock} = 'LOCK IN SHARE MODE';
|
||||
}
|
||||
else {
|
||||
# Making changes on slave (dst).
|
||||
$src->{sql_lock} = 'LOCK IN SHARE MODE';
|
||||
$dst->{sql_lock} = 'FOR UPDATE';
|
||||
$src->{select_lock} = 'LOCK IN SHARE MODE';
|
||||
$dst->{select_lock} = 'FOR UPDATE';
|
||||
}
|
||||
}
|
||||
else {
|
||||
$src->{sql_lock} = '';
|
||||
$dst->{sql_lock} = '';
|
||||
$src->{select_lock} = '';
|
||||
$dst->{select_lock} = '';
|
||||
}
|
||||
MKDEBUG && _d('src sql lock:', $src->{sql_lock});
|
||||
MKDEBUG && _d('dst sql lock:', $dst->{sql_lock});
|
||||
MKDEBUG && _d('SELECT lock:', $src->{select_lock});
|
||||
MKDEBUG && _d('SELECT lock:', $dst->{select_lock});
|
||||
}
|
||||
|
||||
my $user_where = $o->get('where');
|
||||
@@ -187,11 +197,25 @@ sub sync_table {
|
||||
changing_src => $changing_src,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
return $oktonibble;
|
||||
},
|
||||
next_boundaries => sub {
|
||||
my (%args) = @_;
|
||||
my $tbl = $args{tbl};
|
||||
if ( my $diff = $tbl->{diff} ) {
|
||||
my $nibble_iter = $args{NibbleIterator};
|
||||
my $boundary = $nibble_iter->boundaries();
|
||||
$nibble_iter->set_boundary(
|
||||
'upper', [ split ',', $diff->{upper_boundary} ]);
|
||||
$nibble_iter->set_boundary(
|
||||
'lower', [ split ',', $diff->{lower_boundary} ]);
|
||||
}
|
||||
return 1;
|
||||
},
|
||||
exec_nibble => sub {
|
||||
my (%args) = @_;
|
||||
my $tbl = $args{tbl};
|
||||
my $nibble_iter = $args{NibbleIterator};
|
||||
my $sths = $nibble_iter->statements();
|
||||
my $boundary = $nibble_iter->boundaries();
|
||||
@@ -225,7 +249,7 @@ sub sync_table {
|
||||
# The nibble iter will return the row.
|
||||
MKDEBUG && _d('nibble', $args{Cxn}->name());
|
||||
$sths->{nibble}->execute(@{$boundary->{lower}}, @{$boundary->{upper}});
|
||||
return $sths->{nibble}->rows();
|
||||
return 1;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -233,8 +257,12 @@ sub sync_table {
|
||||
Cxn => $host->{Cxn},
|
||||
tbl => $host->{tbl},
|
||||
chunk_size => $o->get('chunk-size'),
|
||||
chunk_index => $o->get('chunk-index'),
|
||||
chunk_index => $diffs ? $diffs->[0]->{chunk_index}
|
||||
: $o->get('chunk-index'),
|
||||
manual_nibble => $diffs ? 1 : 0,
|
||||
empty_results => 1,
|
||||
select => $chunk_cols,
|
||||
select_lock => $host->{select_lock},
|
||||
callbacks => $callbacks,
|
||||
fetch_hashref => 1,
|
||||
one_nibble => $args{one_nibble},
|
||||
@@ -304,13 +332,26 @@ sub sync_table {
|
||||
while ( $src_nibble_iter->more_boundaries()
|
||||
|| $dst_nibble_iter->more_boundaries() ) {
|
||||
|
||||
if ( $diffs ) {
|
||||
my $diff = shift @$diffs;
|
||||
if ( !$diff ) {
|
||||
MKDEBUG && _d('No more checksum diffs');
|
||||
last;
|
||||
}
|
||||
MKDEBUG && _d('Syncing checksum diff', Dumper($diff));
|
||||
$src->{tbl}->{diff} = $diff;
|
||||
$dst->{tbl}->{diff} = $diff;
|
||||
}
|
||||
|
||||
my $src_chunk = $src_nibble_iter->next();
|
||||
my $dst_chunk = $dst_nibble_iter->next();
|
||||
MKDEBUG && _d('Got chunk');
|
||||
|
||||
if ( ($src_chunk->{cnt} || 0) != ($dst_chunk->{cnt} || 0)
|
||||
if ( $diffs
|
||||
|| ($src_chunk->{cnt} || 0) != ($dst_chunk->{cnt} || 0)
|
||||
|| ($src_chunk->{crc} || '') ne ($dst_chunk->{crc} || '') ) {
|
||||
MKDEBUG && _d("Chunks differ");
|
||||
my $boundary = $src_nibble_iter->boundaries();
|
||||
my $boundary = $src_nibble_iter->boundaries();
|
||||
foreach my $host ($src, $dst) {
|
||||
MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement},
|
||||
'params:', @{$boundary->{lower}}, @{$boundary->{upper}});
|
||||
|
||||
@@ -36,6 +36,11 @@ use PerconaTest;
|
||||
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
|
||||
$ENV{PERCONA_TOOLKIT_TEST_USE_DSN_NAMES} = 1;
|
||||
|
||||
use Data::Dumper;
|
||||
$Data::Dumper::Indent = 1;
|
||||
$Data::Dumper::Sortkeys = 1;
|
||||
$Data::Dumper::Quotekeys = 0;
|
||||
|
||||
my $dp = new DSNParser(opts=>$dsn_opts);
|
||||
my $sb = new Sandbox(basedir => '/tmp', DSNParser => $dp);
|
||||
my $dbh = $sb->get_dbh_for('master');
|
||||
@@ -49,7 +54,7 @@ elsif ( !$dst_dbh ) {
|
||||
plan skip_all => 'Cannot connect to sandbox slave';
|
||||
}
|
||||
else {
|
||||
plan tests => 33;
|
||||
plan tests => 37;
|
||||
}
|
||||
|
||||
$sb->create_dbs($dbh, ['test']);
|
||||
@@ -183,6 +188,7 @@ sub sync_table {
|
||||
trace => 0,
|
||||
changing_src => $args{changing_src},
|
||||
one_nibble => $args{one_nibble},
|
||||
diffs => $args{diffs},
|
||||
);
|
||||
return \%actions;
|
||||
}
|
||||
@@ -793,6 +799,146 @@ $output = '';
|
||||
}
|
||||
diag(`$trunk/sandbox/test-env reset`);
|
||||
|
||||
# #############################################################################
|
||||
# Sync diffs from pt-table-checksum --replicate table.
|
||||
# #############################################################################
|
||||
|
||||
$sb->load_file('master', "t/pt-table-sync/samples/simple-tbls.sql");
|
||||
PerconaTest::wait_for_table($dst_cxn->dbh(), "test.mt1", "id=10");
|
||||
$dst_cxn->dbh()->do("delete from test.it1 where id>5 order by id limit 5");
|
||||
$dst_cxn->dbh()->do("delete from test.mt1 where id in (1, 9)");
|
||||
|
||||
is_deeply(
|
||||
$dst_cxn->dbh()->selectall_arrayref("select id from test.it1 order by id"),
|
||||
[ [1],[2],[3],[4],[5] ],
|
||||
"Replicate it1 missing rows"
|
||||
);
|
||||
|
||||
is_deeply(
|
||||
$dst_cxn->dbh()->selectall_arrayref("select id from test.mt1 order by id"),
|
||||
[ [2],[3],[4],[5],[6],[7],[8],[10] ],
|
||||
"Replicate mt1 missing rows"
|
||||
);
|
||||
|
||||
my $diffs = [
|
||||
[
|
||||
{
|
||||
chunk => '2',
|
||||
chunk_index => 'PRIMARY',
|
||||
cnt_diff => '-1',
|
||||
crc_diff => '1',
|
||||
lower_boundary => '4',
|
||||
master_cnt => '3',
|
||||
master_crc => '528a75c4',
|
||||
table => 'test.it1',
|
||||
this_cnt => '2',
|
||||
this_crc => 'f46ae868',
|
||||
upper_boundary => '6'
|
||||
},
|
||||
{
|
||||
chunk => '3',
|
||||
chunk_index => 'PRIMARY',
|
||||
cnt_diff => '-3',
|
||||
crc_diff => '1',
|
||||
lower_boundary => '7',
|
||||
master_cnt => '3',
|
||||
master_crc => '1ddd6c71',
|
||||
table => 'test.it1',
|
||||
this_cnt => '0',
|
||||
this_crc => '0',
|
||||
upper_boundary => '9'
|
||||
},
|
||||
{
|
||||
chunk => '4',
|
||||
chunk_index => 'PRIMARY',
|
||||
cnt_diff => '-1',
|
||||
crc_diff => '1',
|
||||
lower_boundary => '10',
|
||||
master_cnt => '1',
|
||||
master_crc => '7739449',
|
||||
table => 'test.it1',
|
||||
this_cnt => '0',
|
||||
this_crc => '0',
|
||||
upper_boundary => '10'
|
||||
}
|
||||
],
|
||||
[
|
||||
{
|
||||
chunk => '1',
|
||||
chunk_index => 'PRIMARY',
|
||||
cnt_diff => '-1',
|
||||
crc_diff => '1',
|
||||
lower_boundary => '1',
|
||||
master_cnt => '3',
|
||||
master_crc => 'a2170a20',
|
||||
table => 'test.mt1',
|
||||
this_cnt => '2',
|
||||
this_crc => '2e6ab8d1',
|
||||
upper_boundary => '3'
|
||||
},
|
||||
{
|
||||
chunk => '3',
|
||||
chunk_index => 'PRIMARY',
|
||||
cnt_diff => '-1',
|
||||
crc_diff => '1',
|
||||
lower_boundary => '7',
|
||||
master_cnt => '3',
|
||||
master_crc => '1ddd6c71',
|
||||
table => 'test.mt1',
|
||||
this_cnt => '2',
|
||||
this_crc => '4a57d814',
|
||||
upper_boundary => '9'
|
||||
}
|
||||
]
|
||||
];
|
||||
|
||||
my $correct_rows = [
|
||||
[qw( 1 1 1 one )],
|
||||
[qw( 2 2 2 two )],
|
||||
[qw( 3 3 3 three )],
|
||||
[qw( 4 4 4 four )],
|
||||
[qw( 5 5 5 file )],
|
||||
[qw( 6 6 6 six )],
|
||||
[qw( 7 7 7 seven )],
|
||||
[qw( 8 8 8 eight )],
|
||||
[qw( 9 9 9 nine )],
|
||||
[qw( 10 10 10 ten )],
|
||||
];
|
||||
|
||||
sync_table(
|
||||
src => "test.it1",
|
||||
dst => "test.it1",
|
||||
diffs => $diffs->[0],
|
||||
changing_src => 1,
|
||||
one_nibble => 0,
|
||||
argv => [qw(--replicate percona.checksums)],
|
||||
);
|
||||
|
||||
my $res = $dst_cxn->dbh()->selectall_arrayref("select * from test.it1 order by id");
|
||||
is_deeply(
|
||||
$res,
|
||||
$correct_rows,
|
||||
"Sync replicate it1 rows"
|
||||
) or print STDERR Dumper($res);
|
||||
|
||||
|
||||
sync_table(
|
||||
src => "test.mt1",
|
||||
dst => "test.mt1",
|
||||
diffs => $diffs->[1],
|
||||
changing_src => 1,
|
||||
one_nibble => 0,
|
||||
argv => [qw(--replicate percona.checksums)],
|
||||
);
|
||||
|
||||
|
||||
$res = $dst_cxn->dbh()->selectall_arrayref("select * from test.it1 order by id");
|
||||
is_deeply(
|
||||
$res,
|
||||
$correct_rows,
|
||||
"Sync replicate mt1 rows"
|
||||
) or print STDERR Dumper($res);
|
||||
|
||||
# #############################################################################
|
||||
# Done.
|
||||
# #############################################################################
|
||||
|
||||
62
t/pt-table-sync/samples/simple-tbls.sql
Normal file
62
t/pt-table-sync/samples/simple-tbls.sql
Normal file
@@ -0,0 +1,62 @@
|
||||
DROP DATABASE IF EXISTS test;
|
||||
CREATE DATABASE test;
|
||||
USE test;
|
||||
|
||||
-- InnoDB table 1
|
||||
CREATE TABLE it1 (
|
||||
id int not null auto_increment primary key,
|
||||
a int not null,
|
||||
b int not null,
|
||||
c varchar(16) not null,
|
||||
key (a),
|
||||
unique key (c),
|
||||
unique key (id, c)
|
||||
) ENGINE=InnoDB;
|
||||
|
||||
-- InnoDB table 2
|
||||
CREATE TABLE it2 LIKE it1;
|
||||
|
||||
-- Empty InnoDB table
|
||||
CREATE TABLE empty_it LIKE it1;
|
||||
|
||||
-- MyISAM table 1
|
||||
CREATE TABLE mt1 (
|
||||
id int not null auto_increment primary key,
|
||||
a int not null,
|
||||
b int not null,
|
||||
c varchar(16) not null,
|
||||
key (a),
|
||||
unique key (c),
|
||||
unique key (id, c)
|
||||
) ENGINE=MyISAM;
|
||||
|
||||
-- MyISAM table 2
|
||||
CREATE TABLE mt2 LIKE mt1;
|
||||
|
||||
-- Empty MyISAM table
|
||||
CREATE TABLE empty_mt LIKE mt1;
|
||||
|
||||
INSERT INTO it1 VALUES
|
||||
(null, 1, 1, 'one'),
|
||||
(null, 2, 2, 'two'),
|
||||
(null, 3, 3, 'three'),
|
||||
(null, 4, 4, 'four'),
|
||||
(null, 5, 5, 'file'),
|
||||
(null, 6, 6, 'six'),
|
||||
(null, 7, 7, 'seven'),
|
||||
(null, 8, 8, 'eight'),
|
||||
(null, 9, 9, 'nine'),
|
||||
(null,10,10, 'ten');
|
||||
|
||||
INSERT INTO mt1 VALUES
|
||||
(null, 1, 1, 'one'),
|
||||
(null, 2, 2, 'two'),
|
||||
(null, 3, 3, 'three'),
|
||||
(null, 4, 4, 'four'),
|
||||
(null, 5, 5, 'file'),
|
||||
(null, 6, 6, 'six'),
|
||||
(null, 7, 7, 'seven'),
|
||||
(null, 8, 8, 'eight'),
|
||||
(null, 9, 9, 'nine'),
|
||||
(null,10,10, 'ten');
|
||||
|
||||
Reference in New Issue
Block a user