mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-16 08:17:20 +00:00
Rewrite TableSyncer to use only NibbleIterator. Add RowSyncer. Return hashref from NibbleIterator.
This commit is contained in:
@@ -64,8 +64,11 @@ sub new {
|
|||||||
|
|
||||||
my $where = $o->get('where');
|
my $where = $o->get('where');
|
||||||
my ($row_est, $mysql_index) = get_row_estimate(%args, where => $where);
|
my ($row_est, $mysql_index) = get_row_estimate(%args, where => $where);
|
||||||
|
my $chunk_size_limit = $o->has('chunk-size-limit')
|
||||||
|
? $o->get('chunk-size-limit')
|
||||||
|
: 1;
|
||||||
my $one_nibble = !defined $args{one_nibble} || $args{one_nibble}
|
my $one_nibble = !defined $args{one_nibble} || $args{one_nibble}
|
||||||
? $row_est <= $chunk_size * $o->get('chunk-size-limit')
|
? $row_est <= $chunk_size * $chunk_size_limit
|
||||||
: 0;
|
: 0;
|
||||||
MKDEBUG && _d('One nibble:', $one_nibble ? 'yes' : 'no');
|
MKDEBUG && _d('One nibble:', $one_nibble ? 'yes' : 'no');
|
||||||
|
|
||||||
@@ -246,11 +249,12 @@ sub new {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
$self->{row_est} = $row_est;
|
$self->{row_est} = $row_est;
|
||||||
$self->{nibbleno} = 0;
|
$self->{nibbleno} = 0;
|
||||||
$self->{have_rows} = 0;
|
$self->{have_rows} = 0;
|
||||||
$self->{rowno} = 0;
|
$self->{rowno} = 0;
|
||||||
$self->{oktonibble} = 1;
|
$self->{oktonibble} = 1;
|
||||||
|
$self->{no_more_boundaries} = 0;
|
||||||
|
|
||||||
return bless $self, $class;
|
return bless $self, $class;
|
||||||
}
|
}
|
||||||
@@ -307,12 +311,14 @@ sub next {
|
|||||||
if ( $self->{have_rows} ) {
|
if ( $self->{have_rows} ) {
|
||||||
# Return rows in nibble. sth->{Active} is always true with
|
# Return rows in nibble. sth->{Active} is always true with
|
||||||
# DBD::mysql v3, so we track the status manually.
|
# DBD::mysql v3, so we track the status manually.
|
||||||
my $row = $self->{nibble_sth}->fetchrow_arrayref();
|
my $row = $self->{fetch_hashref}
|
||||||
|
? $self->{nibble_sth}->fetchrow_hashref()
|
||||||
|
: $self->{nibble_sth}->fetchrow_arrayref();
|
||||||
if ( $row ) {
|
if ( $row ) {
|
||||||
$self->{rowno}++;
|
$self->{rowno}++;
|
||||||
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno});
|
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno});
|
||||||
# fetchrow_arraryref re-uses an internal arrayref, so we must copy.
|
# fetchrow_arraryref re-uses an internal arrayref, so we must copy.
|
||||||
return [ @$row ];
|
return $self->{fetch_hashref} ? $row : [ @$row ];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -413,6 +419,12 @@ sub more_boundaries {
|
|||||||
return !$self->{no_more_boundaries};
|
return !$self->{no_more_boundaries};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub no_more_rows {
|
||||||
|
my ($self) = @_;
|
||||||
|
$self->{have_rows} = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
sub row_estimate {
|
sub row_estimate {
|
||||||
my ($self) = @_;
|
my ($self) = @_;
|
||||||
return $self->{row_est};
|
return $self->{row_est};
|
||||||
|
@@ -93,7 +93,7 @@ sub make_row_checksum {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) {
|
if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) {
|
||||||
my $sep = $o->get('separator') || '#';
|
my $sep = ($o->has('separator') && $o->get('separator')) || '#';
|
||||||
$sep =~ s/'//g;
|
$sep =~ s/'//g;
|
||||||
$sep ||= '#';
|
$sep ||= '#';
|
||||||
|
|
||||||
|
104
lib/RowSyncer.pm
Normal file
104
lib/RowSyncer.pm
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# This program is copyright 2011 Percona Inc.
|
||||||
|
# Feedback and improvements are welcome.
|
||||||
|
#
|
||||||
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
||||||
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
||||||
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or modify it under
|
||||||
|
# the terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
||||||
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
||||||
|
# licenses.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
||||||
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
||||||
|
# ###########################################################################
|
||||||
|
# RowSyncer package
|
||||||
|
# ###########################################################################
|
||||||
|
{
|
||||||
|
# Package: RowSyncer
|
||||||
|
# RowSyncer syncs a destination row to a source row.
|
||||||
|
package RowSyncer;
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
use warnings FATAL => 'all';
|
||||||
|
use English qw(-no_match_vars);
|
||||||
|
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
|
||||||
|
|
||||||
|
use Data::Dumper;
|
||||||
|
$Data::Dumper::Indent = 1;
|
||||||
|
$Data::Dumper::Sortkeys = 1;
|
||||||
|
$Data::Dumper::Quotekeys = 0;
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my ( $class, %args ) = @_;
|
||||||
|
my @required_args = qw(ChangeHandler);
|
||||||
|
foreach my $arg ( @required_args ) {
|
||||||
|
die "I need a $arg argument" unless defined $args{$arg};
|
||||||
|
}
|
||||||
|
my $self = {
|
||||||
|
crc_col => 'crc',
|
||||||
|
%args,
|
||||||
|
};
|
||||||
|
return bless $self, $class;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub set_crc_col {
|
||||||
|
my ($self, $crc_col) = @_;
|
||||||
|
$self->{crc_col} = $crc_col;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub set_key_cols {
|
||||||
|
my ($self, $key_cols) = @_;
|
||||||
|
$self->{key_cols} = $key_cols;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub key_cols {
|
||||||
|
my ($self) = @_;
|
||||||
|
return $self->{key_cols};
|
||||||
|
}
|
||||||
|
|
||||||
|
sub same_row {
|
||||||
|
my ($self, %args) = @_;
|
||||||
|
my ($lr, $rr) = @args{qw(lr rr)};
|
||||||
|
if ( $lr->{$self->{crc_col}} ne $rr->{$self->{crc_col}} ) {
|
||||||
|
$self->{ChangeHandler}->change('UPDATE', $lr, $self->key_cols());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub not_in_right {
|
||||||
|
my ( $self, %args ) = @_;
|
||||||
|
# Row isn't in the dest, re-insert it in the source.
|
||||||
|
$self->{ChangeHandler}->change('INSERT', $args{lr}, $self->key_cols());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub not_in_left {
|
||||||
|
my ( $self, %args ) = @_;
|
||||||
|
# Row isn't in source, delete it from the dest.
|
||||||
|
$self->{ChangeHandler}->change('DELETE', $args{rr}, $self->key_cols());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub done_with_rows {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _d {
|
||||||
|
my ($package, undef, $line) = caller 0;
|
||||||
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||||
|
map { defined $_ ? $_ : 'undef' }
|
||||||
|
@_;
|
||||||
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
|
}
|
||||||
|
# ###########################################################################
|
||||||
|
# End RowSyncer package
|
||||||
|
# ###########################################################################
|
@@ -1,4 +1,4 @@
|
|||||||
# This program is copyright 2007-2011 Baron Schwartz, 2011 Percona Inc.
|
# This program is copyright 2011 Percona Inc.
|
||||||
# Feedback and improvements are welcome.
|
# Feedback and improvements are welcome.
|
||||||
#
|
#
|
||||||
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
||||||
@@ -32,16 +32,10 @@ $Data::Dumper::Indent = 1;
|
|||||||
$Data::Dumper::Sortkeys = 1;
|
$Data::Dumper::Sortkeys = 1;
|
||||||
$Data::Dumper::Quotekeys = 0;
|
$Data::Dumper::Quotekeys = 0;
|
||||||
|
|
||||||
# Arguments:
|
|
||||||
# * MasterSlave A MasterSlave module
|
|
||||||
# * Quoter A Quoter module
|
|
||||||
# * VersionParser A VersionParser module
|
|
||||||
# * TableChecksum A TableChecksum module
|
|
||||||
# * Retry A Retry module
|
|
||||||
# * DSNParser (optional)
|
|
||||||
sub new {
|
sub new {
|
||||||
my ( $class, %args ) = @_;
|
my ( $class, %args ) = @_;
|
||||||
my @required_args = qw(MasterSlave Quoter VersionParser TableChecksum Retry);
|
my @required_args = qw(MasterSlave OptionParser Quoter TableParser
|
||||||
|
TableNibbler RowChecksum RowDiff Retry);
|
||||||
foreach my $arg ( @required_args ) {
|
foreach my $arg ( @required_args ) {
|
||||||
die "I need a $arg argument" unless defined $args{$arg};
|
die "I need a $arg argument" unless defined $args{$arg};
|
||||||
}
|
}
|
||||||
@@ -49,29 +43,6 @@ sub new {
|
|||||||
return bless $self, $class;
|
return bless $self, $class;
|
||||||
}
|
}
|
||||||
|
|
||||||
# Return the first plugin from the arrayref of TableSync* plugins
|
|
||||||
# that can sync the given table struct. plugin->can_sync() usually
|
|
||||||
# returns a hashref that it wants back when plugin->prepare_to_sync()
|
|
||||||
# is called. Or, it may return nothing (false) to say that it can't
|
|
||||||
# sync the table.
|
|
||||||
sub get_best_plugin {
|
|
||||||
my ( $self, %args ) = @_;
|
|
||||||
foreach my $arg ( qw(plugins tbl_struct) ) {
|
|
||||||
die "I need a $arg argument" unless $args{$arg};
|
|
||||||
}
|
|
||||||
MKDEBUG && _d('Getting best plugin');
|
|
||||||
foreach my $plugin ( @{$args{plugins}} ) {
|
|
||||||
MKDEBUG && _d('Trying plugin', $plugin->name);
|
|
||||||
my ($can_sync, %plugin_args) = $plugin->can_sync(%args);
|
|
||||||
if ( $can_sync ) {
|
|
||||||
MKDEBUG && _d('Can sync with', $plugin->name, Dumper(\%plugin_args));
|
|
||||||
return $plugin, %plugin_args;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MKDEBUG && _d('No plugin can sync the table');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
# Required arguments:
|
# Required arguments:
|
||||||
# * plugins Arrayref of TableSync* modules, in order of preference
|
# * plugins Arrayref of TableSync* modules, in order of preference
|
||||||
# * src Hashref with source (aka left) dbh, db, tbl
|
# * src Hashref with source (aka left) dbh, db, tbl
|
||||||
@@ -101,340 +72,278 @@ sub get_best_plugin {
|
|||||||
# * wait locking
|
# * wait locking
|
||||||
sub sync_table {
|
sub sync_table {
|
||||||
my ( $self, %args ) = @_;
|
my ( $self, %args ) = @_;
|
||||||
my @required_args = qw(plugins src dst tbl_struct cols chunk_size
|
my @required_args = qw(src dst RowSyncer ChangeHandler);
|
||||||
RowDiff ChangeHandler);
|
|
||||||
foreach my $arg ( @required_args ) {
|
foreach my $arg ( @required_args ) {
|
||||||
die "I need a $arg argument" unless $args{$arg};
|
die "I need a $arg argument" unless $args{$arg};
|
||||||
}
|
}
|
||||||
MKDEBUG && _d('Syncing table with args:',
|
my ($src, $dst, $row_syncer, $changer) = @args{@required_args};
|
||||||
map { "$_: " . Dumper($args{$_}) }
|
|
||||||
qw(plugins src dst tbl_struct cols chunk_size));
|
|
||||||
|
|
||||||
my ($plugins, $src, $dst, $tbl_struct, $cols, $chunk_size, $rd, $ch)
|
my $o = $self->{OptionParser};
|
||||||
= @args{@required_args};
|
my $q = $self->{Quoter};
|
||||||
my $dp = $self->{DSNParser};
|
my $row_diff = $self->{RowDiff};
|
||||||
$args{trace} = 1 unless defined $args{trace};
|
my $row_checksum = $self->{RowChecksum};
|
||||||
|
|
||||||
if ( $args{bidirectional} && $args{ChangeHandler}->{queue} ) {
|
# USE db on src and dst for cases like when replicate-do-db is being used.
|
||||||
# This should be checked by the caller but just in case...
|
foreach my $host ( $src, $dst ) {
|
||||||
die "Queueing does not work with bidirectional syncing";
|
$host->{Cxn}->dbh()->do("USE " . $q->quote($host->{tbl}->{db}));
|
||||||
}
|
}
|
||||||
|
|
||||||
$args{index_hint} = 1 unless defined $args{index_hint};
|
return $changer->get_changes() if $o->get('dry-run');
|
||||||
$args{lock} ||= 0;
|
|
||||||
$args{wait} ||= 0;
|
|
||||||
$args{transaction} ||= 0;
|
|
||||||
$args{timeout_ok} ||= 0;
|
|
||||||
|
|
||||||
my $q = $self->{Quoter};
|
my $trace;
|
||||||
my $vp = $self->{VersionParser};
|
if ( !defined $args{trace} || $args{trace} ) {
|
||||||
|
chomp(my $hostname = `hostname`);
|
||||||
|
$trace = "src_host:" . $src->{Cxn}->name()
|
||||||
|
. " src_tbl:" . join('.', @{$src->{tbl}}{qw(db tbl)})
|
||||||
|
. "dst_host:" . $dst->{Cxn}->name()
|
||||||
|
. " dst_tbl:" . join('.', @{$dst->{tbl}}{qw(db tbl)})
|
||||||
|
. " changing_src: " . ($args{changing_src} ? "yes" : "no")
|
||||||
|
. " " . join(" ", map { "$_:" . ($o->get($_) ? "yes" : "no") }
|
||||||
|
qw(lock transaction replicate bidirectional))
|
||||||
|
. " pid:$PID "
|
||||||
|
. ($ENV{USER} ? "user:$ENV{USER} " : "")
|
||||||
|
. ($hostname ? "host:$hostname" : "");
|
||||||
|
MKDEBUG && _d("Binlog trace message:", $trace);
|
||||||
|
}
|
||||||
|
|
||||||
# ########################################################################
|
# Make NibbleIterator for checksumming chunks of rows to see if
|
||||||
# Get and prepare the first plugin that can sync this table.
|
# there are any diffs.
|
||||||
# ########################################################################
|
my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh());
|
||||||
my ($plugin, %plugin_args) = $self->get_best_plugin(%args);
|
my $chunk_cols = $row_checksum->make_chunk_checksum(
|
||||||
die "No plugin can sync $src->{db}.$src->{tbl}" unless $plugin;
|
dbh => $src->{Cxn}->dbh(),
|
||||||
|
tbl => $src->{tbl},
|
||||||
|
%crc_args
|
||||||
|
);
|
||||||
|
|
||||||
# The row-level (state 2) checksums use __crc, so the table can't use that.
|
if ( !defined $src->{sql_lock} || !defined $dst->{dst_lock} ) {
|
||||||
my $crc_col = '__crc';
|
if ( $o->get('transaction') ) {
|
||||||
while ( $tbl_struct->{is_col}->{$crc_col} ) {
|
if ( $o->get('bidirectional') ) {
|
||||||
|
# Making changes on src and dst.
|
||||||
|
$src->{sql_lock} = 'FOR UPDATE';
|
||||||
|
$dst->{sql_lock} = 'FOR UPDATE';
|
||||||
|
}
|
||||||
|
elsif ( $args{changing_src} ) {
|
||||||
|
# Making changes on master (src) which replicate to slave (dst).
|
||||||
|
$src->{sql_lock} = 'FOR UPDATE';
|
||||||
|
$dst->{sql_lock} = 'LOCK IN SHARE MODE';
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
# Making changes on slave (dst).
|
||||||
|
$src->{sql_lock} = 'LOCK IN SHARE MODE';
|
||||||
|
$dst->{sql_lock} = 'FOR UPDATE';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
$src->{sql_lock} = '';
|
||||||
|
$dst->{sql_lock} = '';
|
||||||
|
}
|
||||||
|
MKDEBUG && _d('src sql lock:', $src->{sql_lock});
|
||||||
|
MKDEBUG && _d('dst sql lock:', $dst->{sql_lock});
|
||||||
|
}
|
||||||
|
|
||||||
|
my $user_where = $o->get('where');
|
||||||
|
|
||||||
|
foreach my $host ($src, $dst) {
|
||||||
|
my $callbacks = {
|
||||||
|
init => sub {
|
||||||
|
my (%args) = @_;
|
||||||
|
my $nibble_iter = $args{NibbleIterator};
|
||||||
|
my $sths = $nibble_iter->statements();
|
||||||
|
|
||||||
|
if ( $o->get('buffer-to-client') ) {
|
||||||
|
$host->{sth}->{mysql_use_result} = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
# Lock the table.
|
||||||
|
$self->lock_and_wait(
|
||||||
|
lock_level => 2,
|
||||||
|
host => $host,
|
||||||
|
src => $src,
|
||||||
|
OptionParser => $o,
|
||||||
|
);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
},
|
||||||
|
exec_nibble => sub {
|
||||||
|
my (%args) = @_;
|
||||||
|
my $nibble_iter = $args{NibbleIterator};
|
||||||
|
my $sths = $nibble_iter->statements();
|
||||||
|
my $boundary = $nibble_iter->boundaries();
|
||||||
|
|
||||||
|
# Lock the chunk.
|
||||||
|
$self->lock_and_wait(
|
||||||
|
lock_level => 1,
|
||||||
|
host => $host,
|
||||||
|
src => $src,
|
||||||
|
OptionParser => $o,
|
||||||
|
);
|
||||||
|
|
||||||
|
# Execute the chunk checksum statement.
|
||||||
|
# The nibble iter will return the row.
|
||||||
|
MKDEBUG && _d('nibble', $args{Cxn}->name());
|
||||||
|
$sths->{nibble}->execute(@{$boundary->{lower}}, @{$boundary->{upper}});
|
||||||
|
return $sths->{nibble}->rows();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
$host->{nibble_iter} = new NibbleIterator(
|
||||||
|
Cxn => $host->{Cxn},
|
||||||
|
tbl => $host->{tbl},
|
||||||
|
chunk_size => $o->get('chunk-size'),
|
||||||
|
chunk_index => $o->get('chunk-index'),
|
||||||
|
select => $chunk_cols,
|
||||||
|
callbacks => $callbacks,
|
||||||
|
fetch_hashref => 1,
|
||||||
|
OptionParser => $self->{OptionParser},
|
||||||
|
Quoter => $self->{Quoter},
|
||||||
|
TableNibbler => $self->{TableNibbler},
|
||||||
|
TableParser => $self->{TableParser},
|
||||||
|
RowChecksum => $self->{RowChecksum},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
my $index = $src->{nibble_iter}->nibble_index();
|
||||||
|
my $key_cols = $index ? $src->{tbl}->{tbl_struct}->{keys}->{$index}->{cols}
|
||||||
|
: $src->{tbl}->{tbl_struct}->{cols};
|
||||||
|
$row_syncer->set_key_cols($key_cols);
|
||||||
|
|
||||||
|
my $crc_col = 'crc';
|
||||||
|
while ( $src->{tbl}->{tbl_struct}->{is_col}->{$crc_col} ) {
|
||||||
$crc_col = "_$crc_col"; # Prepend more _ until not a column.
|
$crc_col = "_$crc_col"; # Prepend more _ until not a column.
|
||||||
}
|
}
|
||||||
|
$row_syncer->set_crc_col($crc_col);
|
||||||
MKDEBUG && _d('CRC column:', $crc_col);
|
MKDEBUG && _d('CRC column:', $crc_col);
|
||||||
|
|
||||||
# Make an index hint for either the explicitly given chunk_index
|
foreach my $host ($src, $dst) {
|
||||||
# or the chunk_index chosen by the plugin if index_hint is true.
|
my $row_cols = $row_checksum->make_row_checksum(
|
||||||
my $index_hint;
|
dbh => $host->{Cxn}->dbh(),
|
||||||
my $hint = ($vp->version_ge($src->{dbh}, '4.0.9')
|
tbl => $host->{tbl},
|
||||||
&& $vp->version_ge($dst->{dbh}, '4.0.9') ? 'FORCE' : 'USE')
|
%crc_args,
|
||||||
. ' INDEX';
|
|
||||||
if ( $args{chunk_index} ) {
|
|
||||||
MKDEBUG && _d('Using given chunk index for index hint');
|
|
||||||
$index_hint = "$hint (" . $q->quote($args{chunk_index}) . ")";
|
|
||||||
}
|
|
||||||
elsif ( $plugin_args{chunk_index} && $args{index_hint} ) {
|
|
||||||
MKDEBUG && _d('Using chunk index chosen by plugin for index hint');
|
|
||||||
$index_hint = "$hint (" . $q->quote($plugin_args{chunk_index}) . ")";
|
|
||||||
}
|
|
||||||
MKDEBUG && _d('Index hint:', $index_hint);
|
|
||||||
|
|
||||||
eval {
|
|
||||||
$plugin->prepare_to_sync(
|
|
||||||
%args,
|
|
||||||
%plugin_args,
|
|
||||||
dbh => $src->{dbh},
|
|
||||||
db => $src->{db},
|
|
||||||
tbl => $src->{tbl},
|
|
||||||
crc_col => $crc_col,
|
|
||||||
index_hint => $index_hint,
|
|
||||||
);
|
);
|
||||||
};
|
my $nibble_iter = $host->{nibble_iter};
|
||||||
if ( $EVAL_ERROR ) {
|
|
||||||
# At present, no plugin should fail to prepare, but just in case...
|
|
||||||
die 'Failed to prepare TableSync', $plugin->name, ' plugin: ',
|
|
||||||
$EVAL_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
# Some plugins like TableSyncChunk use checksum queries, others like
|
if ( $nibble_iter->one_nibble() ) {
|
||||||
# TableSyncGroupBy do not. For those that do, make chunk (state 0)
|
my $rows_sql
|
||||||
# and row (state 2) checksum queries.
|
= 'SELECT /*rows in nibble*/ '
|
||||||
if ( $plugin->uses_checksum() ) {
|
. ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
|
||||||
eval {
|
. "$row_cols AS $crc_col"
|
||||||
my ($chunk_sql, $row_sql) = $self->make_checksum_queries(%args);
|
. " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)})
|
||||||
$plugin->set_checksum_queries($chunk_sql, $row_sql);
|
. " WHERE 1=1 "
|
||||||
};
|
. ($user_where ? " AND ($user_where)" : '');
|
||||||
if ( $EVAL_ERROR ) {
|
$host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql);
|
||||||
# This happens if src and dst are really different and the same
|
}
|
||||||
# checksum algo and hash func can't be used on both.
|
else {
|
||||||
die "Failed to make checksum queries: $EVAL_ERROR";
|
my $sql = $nibble_iter->sql();
|
||||||
|
my $rows_sql
|
||||||
|
= 'SELECT /*rows in nibble*/ '
|
||||||
|
. ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
|
||||||
|
. "$row_cols AS $crc_col"
|
||||||
|
. " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)})
|
||||||
|
. " WHERE " . $sql->{boundaries}->{'>='} # lower boundary
|
||||||
|
. " AND " . $sql->{boundaries}->{'<='} # upper boundary
|
||||||
|
. ($user_where ? " AND ($user_where)" : '')
|
||||||
|
. " ORDER BY " . $sql->{order_by};
|
||||||
|
$host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
# ########################################################################
|
|
||||||
# Plugin is ready, return now if this is a dry run.
|
|
||||||
# ########################################################################
|
|
||||||
if ( $args{dry_run} ) {
|
|
||||||
return $ch->get_changes(), ALGORITHM => $plugin->name;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# ########################################################################
|
# ########################################################################
|
||||||
# Start syncing the table.
|
# Start syncing the table.
|
||||||
# ########################################################################
|
# ########################################################################
|
||||||
|
while ( grep { $_->{nibble_iter}->more_boundaries() } ($src, $dst) ) {
|
||||||
|
my $src_chunk = $src->{nibble_iter}->next();
|
||||||
|
my $dst_chunk = $dst->{nibble_iter}->next();
|
||||||
|
|
||||||
# USE db on src and dst for cases like when replicate-do-db is being used.
|
if ( $src_chunk->{cnt} != $dst_chunk->{cnt}
|
||||||
eval {
|
|| $src_chunk->{crc} ne $dst_chunk->{crc} ) {
|
||||||
$src->{dbh}->do("USE `$src->{db}`");
|
MKDEBUG && _d("Chunks differ");
|
||||||
$dst->{dbh}->do("USE `$dst->{db}`");
|
foreach my $host ($src, $dst) {
|
||||||
};
|
my $nibble_iter = $host->{nibble_iter};
|
||||||
if ( $EVAL_ERROR ) {
|
my $boundary = $nibble_iter->boundaries();
|
||||||
# This shouldn't happen, but just in case. (The db and tbl on src
|
MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement},
|
||||||
# and dst should be checked before calling this sub.)
|
'params:', @{$boundary->{lower}}, @{$boundary->{upper}});
|
||||||
die "Failed to USE database on source or destination: $EVAL_ERROR";
|
$host->{rows_sth}->execute(
|
||||||
}
|
@{$boundary->{lower}}, @{$boundary->{upper}});
|
||||||
|
|
||||||
# For bidirectional syncing it's important to know on which dbh
|
|
||||||
# changes are made or rows are fetched. This identifies the dbhs,
|
|
||||||
# then you can search for each one by its address like
|
|
||||||
# "dbh DBI::db=HASH(0x1028b38)".
|
|
||||||
MKDEBUG && _d('left dbh', $src->{dbh});
|
|
||||||
MKDEBUG && _d('right dbh', $dst->{dbh});
|
|
||||||
|
|
||||||
chomp(my $hostname = `hostname`);
|
|
||||||
my $trace_msg
|
|
||||||
= $args{trace} ? "src_db:$src->{db} src_tbl:$src->{tbl} "
|
|
||||||
. ($dp && $src->{dsn} ? "src_dsn:".$dp->as_string($src->{dsn}) : "")
|
|
||||||
. " dst_db:$dst->{db} dst_tbl:$dst->{tbl} "
|
|
||||||
. ($dp && $dst->{dsn} ? "dst_dsn:".$dp->as_string($dst->{dsn}) : "")
|
|
||||||
. " " . join(" ", map { "$_:" . ($args{$_} || 0) }
|
|
||||||
qw(lock transaction changing_src replicate bidirectional))
|
|
||||||
. " pid:$PID "
|
|
||||||
. ($ENV{USER} ? "user:$ENV{USER} " : "")
|
|
||||||
. ($hostname ? "host:$hostname" : "")
|
|
||||||
: "";
|
|
||||||
MKDEBUG && _d("Binlog trace message:", $trace_msg);
|
|
||||||
|
|
||||||
$self->lock_and_wait(%args, lock_level => 2); # per-table lock
|
|
||||||
|
|
||||||
my $callback = $args{callback};
|
|
||||||
my $cycle = 0;
|
|
||||||
while ( !$plugin->done() ) {
|
|
||||||
|
|
||||||
# Do as much of the work as possible before opening a transaction or
|
|
||||||
# locking the tables.
|
|
||||||
MKDEBUG && _d('Beginning sync cycle', $cycle);
|
|
||||||
my $src_sql = $plugin->get_sql(
|
|
||||||
database => $src->{db},
|
|
||||||
table => $src->{tbl},
|
|
||||||
where => $args{where},
|
|
||||||
);
|
|
||||||
my $dst_sql = $plugin->get_sql(
|
|
||||||
database => $dst->{db},
|
|
||||||
table => $dst->{tbl},
|
|
||||||
where => $args{where},
|
|
||||||
);
|
|
||||||
|
|
||||||
if ( $args{transaction} ) {
|
|
||||||
if ( $args{bidirectional} ) {
|
|
||||||
# Making changes on src and dst.
|
|
||||||
$src_sql .= ' FOR UPDATE';
|
|
||||||
$dst_sql .= ' FOR UPDATE';
|
|
||||||
}
|
}
|
||||||
elsif ( $args{changing_src} ) {
|
$row_diff->compare_sets(
|
||||||
# Making changes on master (src) which replicate to slave (dst).
|
left_sth => $src->{rows_sth},
|
||||||
$src_sql .= ' FOR UPDATE';
|
right_sth => $dst->{rows_sth},
|
||||||
$dst_sql .= ' LOCK IN SHARE MODE';
|
tbl_struct => $src->{tbl}->{tbl_struct},
|
||||||
}
|
syncer => $row_syncer,
|
||||||
else {
|
);
|
||||||
# Making changes on slave (dst).
|
$changer->process_rows(1, $trace);
|
||||||
$src_sql .= ' LOCK IN SHARE MODE';
|
foreach my $host ($src, $dst) {
|
||||||
$dst_sql .= ' FOR UPDATE';
|
$host->{rows_sth}->finish();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MKDEBUG && _d('src:', $src_sql);
|
|
||||||
MKDEBUG && _d('dst:', $dst_sql);
|
|
||||||
|
|
||||||
# Give callback a chance to do something with the SQL statements.
|
# Unlock the chunks.
|
||||||
$callback->($src_sql, $dst_sql) if $callback;
|
foreach my $host ($src, $dst) {
|
||||||
|
$self->unlock(
|
||||||
# Prepare each host for next sync cycle. This does stuff
|
lock_level => 1,
|
||||||
# like reset/init MySQL accumulator vars, etc.
|
host => $host,
|
||||||
$plugin->prepare_sync_cycle($src);
|
OptionParser => $o,
|
||||||
$plugin->prepare_sync_cycle($dst);
|
);
|
||||||
|
|
||||||
# Prepare SQL statements on host. These aren't real prepared
|
|
||||||
# statements (i.e. no ? placeholders); we just need sths to
|
|
||||||
# pass to compare_sets(). Also, we can control buffering
|
|
||||||
# (mysql_use_result) on the sths.
|
|
||||||
my $src_sth = $src->{dbh}->prepare($src_sql);
|
|
||||||
my $dst_sth = $dst->{dbh}->prepare($dst_sql);
|
|
||||||
if ( $args{buffer_to_client} ) {
|
|
||||||
$src_sth->{mysql_use_result} = 1;
|
|
||||||
$dst_sth->{mysql_use_result} = 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# The first cycle should lock to begin work; after that, unlock only if
|
# Get next chunks.
|
||||||
# the plugin says it's OK (it may want to dig deeper on the rows it
|
$src->{nibble_iter}->no_more_rows();
|
||||||
# currently has locked).
|
$dst->{nibble_iter}->no_more_rows();
|
||||||
my $executed_src = 0;
|
|
||||||
if ( !$cycle || !$plugin->pending_changes() ) {
|
|
||||||
# per-sync cycle lock
|
|
||||||
$executed_src
|
|
||||||
= $self->lock_and_wait(%args, src_sth => $src_sth, lock_level => 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
# The source sth might have already been executed by lock_and_wait().
|
|
||||||
$src_sth->execute() unless $executed_src;
|
|
||||||
$dst_sth->execute();
|
|
||||||
|
|
||||||
# Compare rows in the two sths. If any differences are found
|
|
||||||
# (same_row, not_in_left, not_in_right), the appropriate $syncer
|
|
||||||
# methods are called to handle them. Changes may be immediate, or...
|
|
||||||
$rd->compare_sets(
|
|
||||||
left_sth => $src_sth,
|
|
||||||
right_sth => $dst_sth,
|
|
||||||
left_dbh => $src->{dbh},
|
|
||||||
right_dbh => $dst->{dbh},
|
|
||||||
syncer => $plugin,
|
|
||||||
tbl_struct => $tbl_struct,
|
|
||||||
);
|
|
||||||
# ...changes may be queued and executed now.
|
|
||||||
$ch->process_rows(1, $trace_msg);
|
|
||||||
|
|
||||||
MKDEBUG && _d('Finished sync cycle', $cycle);
|
|
||||||
$cycle++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$ch->process_rows(0, $trace_msg);
|
$changer->process_rows(0, $trace);
|
||||||
|
|
||||||
$self->unlock(%args, lock_level => 2);
|
# Unlock the table.
|
||||||
|
foreach my $host ($src, $dst) {
|
||||||
return $ch->get_changes(), ALGORITHM => $plugin->name;
|
$self->unlock(
|
||||||
}
|
lock_level => 2,
|
||||||
|
host => $host,
|
||||||
sub make_checksum_queries {
|
OptionParser => $o,
|
||||||
my ( $self, %args ) = @_;
|
|
||||||
my @required_args = qw(src dst tbl_struct);
|
|
||||||
foreach my $arg ( @required_args ) {
|
|
||||||
die "I need a $arg argument" unless $args{$arg};
|
|
||||||
}
|
|
||||||
my ($src, $dst, $tbl_struct) = @args{@required_args};
|
|
||||||
my $checksum = $self->{TableChecksum};
|
|
||||||
|
|
||||||
# Decide on checksumming strategy and store checksum query prototypes for
|
|
||||||
# later.
|
|
||||||
my $src_algo = $checksum->best_algorithm(
|
|
||||||
algorithm => 'BIT_XOR',
|
|
||||||
dbh => $src->{dbh},
|
|
||||||
where => 1,
|
|
||||||
chunk => 1,
|
|
||||||
count => 1,
|
|
||||||
);
|
|
||||||
my $dst_algo = $checksum->best_algorithm(
|
|
||||||
algorithm => 'BIT_XOR',
|
|
||||||
dbh => $dst->{dbh},
|
|
||||||
where => 1,
|
|
||||||
chunk => 1,
|
|
||||||
count => 1,
|
|
||||||
);
|
|
||||||
if ( $src_algo ne $dst_algo ) {
|
|
||||||
die "Source and destination checksum algorithms are different: ",
|
|
||||||
"$src_algo on source, $dst_algo on destination"
|
|
||||||
}
|
|
||||||
MKDEBUG && _d('Chosen algo:', $src_algo);
|
|
||||||
|
|
||||||
my $src_func = $checksum->choose_hash_func(dbh => $src->{dbh}, %args);
|
|
||||||
my $dst_func = $checksum->choose_hash_func(dbh => $dst->{dbh}, %args);
|
|
||||||
if ( $src_func ne $dst_func ) {
|
|
||||||
die "Source and destination hash functions are different: ",
|
|
||||||
"$src_func on source, $dst_func on destination";
|
|
||||||
}
|
|
||||||
MKDEBUG && _d('Chosen hash func:', $src_func);
|
|
||||||
|
|
||||||
# Since the checksum algo and hash func are the same on src and dst
|
|
||||||
# it doesn't matter if we use src_algo/func or dst_algo/func.
|
|
||||||
|
|
||||||
my $crc_wid = $checksum->get_crc_wid($src->{dbh}, $src_func);
|
|
||||||
my ($crc_type) = $checksum->get_crc_type($src->{dbh}, $src_func);
|
|
||||||
my $opt_slice;
|
|
||||||
if ( $src_algo eq 'BIT_XOR' && $crc_type !~ m/int$/ ) {
|
|
||||||
$opt_slice = $checksum->optimize_xor(
|
|
||||||
dbh => $src->{dbh},
|
|
||||||
function => $src_func
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
my $chunk_sql = $checksum->make_checksum_query(
|
return $changer->get_changes();
|
||||||
%args,
|
|
||||||
db => $src->{db},
|
|
||||||
tbl => $src->{tbl},
|
|
||||||
algorithm => $src_algo,
|
|
||||||
function => $src_func,
|
|
||||||
crc_wid => $crc_wid,
|
|
||||||
crc_type => $crc_type,
|
|
||||||
opt_slice => $opt_slice,
|
|
||||||
replicate => undef, # replicate means something different to this sub
|
|
||||||
); # than what we use it for; do not pass it!
|
|
||||||
MKDEBUG && _d('Chunk sql:', $chunk_sql);
|
|
||||||
my $row_sql = $checksum->make_row_checksum(
|
|
||||||
%args,
|
|
||||||
function => $src_func,
|
|
||||||
);
|
|
||||||
MKDEBUG && _d('Row sql:', $row_sql);
|
|
||||||
return $chunk_sql, $row_sql;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sub lock_table {
|
sub lock_table {
|
||||||
my ( $self, $dbh, $where, $db_tbl, $mode ) = @_;
|
my ( $self, %args ) = @_;
|
||||||
my $query = "LOCK TABLES $db_tbl $mode";
|
my @required_args = qw(host mode);
|
||||||
MKDEBUG && _d($query);
|
foreach my $arg ( @required_args ) {
|
||||||
$dbh->do($query);
|
die "I need a $arg argument" unless $args{$arg};
|
||||||
MKDEBUG && _d('Acquired table lock on', $where, 'in', $mode, 'mode');
|
}
|
||||||
|
my ($host, $mode) = @args{@required_args};
|
||||||
|
my $q = $self->{Quoter};
|
||||||
|
my $sql = "LOCK TABLES "
|
||||||
|
. $q->quote(@{$host->{tbl}}{qw(db tbl)})
|
||||||
|
. " $mode";
|
||||||
|
MKDEBUG && _d($host->{Cxn}->name(), $sql);
|
||||||
|
$host->{Cxn}->dbh()->do($sql);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
# Doesn't work quite the same way as lock_and_wait. It will unlock any LOWER
|
# Doesn't work quite the same way as lock_and_wait. It will unlock any LOWER
|
||||||
# priority lock level, not just the exact same one.
|
# priority lock level, not just the exact same one.
|
||||||
sub unlock {
|
sub unlock {
|
||||||
my ( $self, %args ) = @_;
|
my ( $self, %args ) = @_;
|
||||||
|
my @required_args = qw(lock_level host);
|
||||||
foreach my $arg ( qw(src dst lock transaction lock_level) ) {
|
foreach my $arg ( @required_args ) {
|
||||||
die "I need a $arg argument" unless defined $args{$arg};
|
die "I need a $arg argument" unless defined $args{$arg};
|
||||||
}
|
}
|
||||||
my $src = $args{src};
|
my ($lock_level, $host) = @args{@required_args};
|
||||||
my $dst = $args{dst};
|
my $o = $self->{OptionParser};
|
||||||
|
|
||||||
return unless $args{lock} && $args{lock} <= $args{lock_level};
|
my $lock = $o->get('lock');
|
||||||
|
return unless $lock && $lock <= $lock_level;
|
||||||
|
MKDEBUG && _d('Unlocking level', $lock);
|
||||||
|
|
||||||
# First, unlock/commit.
|
if ( $o->get('transaction') ) {
|
||||||
foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) {
|
MKDEBUG && _d('Committing', $host->name());
|
||||||
if ( $args{transaction} ) {
|
$host->{Cxn}->dbh()->commit();
|
||||||
MKDEBUG && _d('Committing', $dbh);
|
}
|
||||||
$dbh->commit();
|
else {
|
||||||
}
|
my $sql = 'UNLOCK TABLES';
|
||||||
else {
|
MKDEBUG && _d($host->name(), $sql);
|
||||||
my $sql = 'UNLOCK TABLES';
|
$host->{Cxn}->dbh()->do($sql);
|
||||||
MKDEBUG && _d($dbh, $sql);
|
|
||||||
$dbh->do($sql);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@@ -458,73 +367,74 @@ sub unlock {
|
|||||||
# $src_sth was executed.
|
# $src_sth was executed.
|
||||||
sub lock_and_wait {
|
sub lock_and_wait {
|
||||||
my ( $self, %args ) = @_;
|
my ( $self, %args ) = @_;
|
||||||
my $result = 0;
|
my @required_args = qw(lock_level host src);
|
||||||
|
foreach my $arg ( @required_args ) {
|
||||||
foreach my $arg ( qw(src dst lock lock_level) ) {
|
|
||||||
die "I need a $arg argument" unless defined $args{$arg};
|
die "I need a $arg argument" unless defined $args{$arg};
|
||||||
}
|
}
|
||||||
my $src = $args{src};
|
my ($lock_level, $host, $src) = @args{@required_args};
|
||||||
my $dst = $args{dst};
|
my $o = $self->{OptionParser};
|
||||||
|
|
||||||
return unless $args{lock} && $args{lock} == $args{lock_level};
|
my $lock = $o->get('lock');
|
||||||
MKDEBUG && _d('lock and wait, lock level', $args{lock});
|
return unless $lock && $lock == $lock_level;
|
||||||
|
|
||||||
# First, commit/unlock the previous transaction/lock.
|
return $host->{is_source} ? $self->_lock_src(%args)
|
||||||
foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) {
|
: $self->_lock_dst(%args);
|
||||||
if ( $args{transaction} ) {
|
}
|
||||||
MKDEBUG && _d('Committing', $dbh);
|
|
||||||
$dbh->commit();
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
my $sql = 'UNLOCK TABLES';
|
|
||||||
MKDEBUG && _d($dbh, $sql);
|
|
||||||
$dbh->do($sql);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# User wants us to lock for consistency. But lock only on source initially;
|
sub _lock_src {
|
||||||
# might have to wait for the slave to catch up before locking on the dest.
|
my ( $self, %args ) = @_;
|
||||||
if ( $args{lock} == 3 ) {
|
my @required_args = qw(lock_level host src);
|
||||||
|
my ($lock_level, $host, $src) = @args{@required_args};
|
||||||
|
|
||||||
|
my $o = $self->{OptionParser};
|
||||||
|
my $lock = $o->get('lock');
|
||||||
|
MKDEBUG && _d('Locking', $host->{Cxn}->name(), 'level', $lock);
|
||||||
|
|
||||||
|
if ( $lock == 3 ) {
|
||||||
my $sql = 'FLUSH TABLES WITH READ LOCK';
|
my $sql = 'FLUSH TABLES WITH READ LOCK';
|
||||||
MKDEBUG && _d($src->{dbh}, $sql);
|
MKDEBUG && _d($host->{Cxn}->name(), $sql);
|
||||||
$src->{dbh}->do($sql);
|
$host->{Cxn}->dbh()->do($sql);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
# Lock level 2 (per-table) or 1 (per-sync cycle)
|
# Lock level 2 (per-table) or 1 (per-chunk).
|
||||||
if ( $args{transaction} ) {
|
if ( $o->get('transaction') ) {
|
||||||
if ( $args{src_sth} ) {
|
my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */";
|
||||||
# Execute the $src_sth on the source, so LOCK IN SHARE MODE/FOR
|
MKDEBUG && _d($host->{Cxn}->name(), $sql);
|
||||||
# UPDATE will lock the rows examined.
|
$host->{Cxn}->dbh()->do($sql);
|
||||||
MKDEBUG && _d('Executing statement on source to lock rows');
|
|
||||||
|
|
||||||
my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */";
|
|
||||||
MKDEBUG && _d($src->{dbh}, $sql);
|
|
||||||
$src->{dbh}->do($sql);
|
|
||||||
|
|
||||||
$args{src_sth}->execute();
|
|
||||||
$result = 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
$self->lock_table($src->{dbh}, 'source',
|
$self->lock_table(
|
||||||
$self->{Quoter}->quote($src->{db}, $src->{tbl}),
|
host => $host,
|
||||||
$args{changing_src} ? 'WRITE' : 'READ');
|
mode => $args{changing_src} ? 'WRITE' : 'READ',
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _lock_dst {
|
||||||
|
my ( $self, %args ) = @_;
|
||||||
|
my @required_args = qw(lock_level host src);
|
||||||
|
my ($lock_level, $host, $src) = @args{@required_args};
|
||||||
|
|
||||||
|
my $o = $self->{OptionParser};
|
||||||
|
my $lock = $o->get('lock');
|
||||||
|
MKDEBUG && _d('Locking', $host->{Cxn}->name(), 'level', $lock);
|
||||||
|
|
||||||
|
# Wait for the dest to catchup to the source, then lock the dest.
|
||||||
# If there is any error beyond this point, we need to unlock/commit.
|
# If there is any error beyond this point, we need to unlock/commit.
|
||||||
eval {
|
eval {
|
||||||
if ( my $timeout = $args{wait} ) {
|
if ( my $timeout = $o->get('wait') ) {
|
||||||
my $ms = $self->{MasterSlave};
|
my $ms = $self->{MasterSlave};
|
||||||
my $tries = $args{wait_retry_args}->{tries} || 3;
|
my $tries = 3;
|
||||||
my $wait;
|
my $wait;
|
||||||
$self->{Retry}->retry(
|
$self->{Retry}->retry(
|
||||||
tries => $tries,
|
tries => $tries,
|
||||||
wait => sub { sleep $args{wait_retry_args}->{wait} || 10 },
|
wait => sub { sleep 5; },
|
||||||
try => sub {
|
try => sub {
|
||||||
my ( %args ) = @_;
|
my ( %args ) = @_;
|
||||||
# Be careful using $args{...} in this callback! %args in
|
# Be careful using $args{...} in this callback! %args in
|
||||||
# here are the passed-in args, not the args to lock_and_wait().
|
# here are the passed-in args, not the args to the sub.
|
||||||
|
|
||||||
if ( $args{tryno} > 1 ) {
|
if ( $args{tryno} > 1 ) {
|
||||||
warn "Retrying MASTER_POS_WAIT() for --wait $timeout...";
|
warn "Retrying MASTER_POS_WAIT() for --wait $timeout...";
|
||||||
@@ -535,7 +445,7 @@ sub lock_and_wait {
|
|||||||
# $src_sth.
|
# $src_sth.
|
||||||
$wait = $ms->wait_for_master(
|
$wait = $ms->wait_for_master(
|
||||||
master_status => $ms->get_master_status($src->{misc_dbh}),
|
master_status => $ms->get_master_status($src->{misc_dbh}),
|
||||||
slave_dbh => $dst->{dbh},
|
slave_dbh => $host->{Cxn}->dbh(),
|
||||||
timeout => $timeout,
|
timeout => $timeout,
|
||||||
);
|
);
|
||||||
if ( defined $wait->{result} && $wait->{result} != -1 ) {
|
if ( defined $wait->{result} && $wait->{result} != -1 ) {
|
||||||
@@ -592,27 +502,24 @@ sub lock_and_wait {
|
|||||||
'(syncing via replication or sync-to-master)');
|
'(syncing via replication or sync-to-master)');
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if ( $args{lock} == 3 ) {
|
if ( $lock == 3 ) {
|
||||||
my $sql = 'FLUSH TABLES WITH READ LOCK';
|
my $sql = 'FLUSH TABLES WITH READ LOCK';
|
||||||
MKDEBUG && _d($dst->{dbh}, ',', $sql);
|
MKDEBUG && _d($host->{Cxn}->name(), $sql);
|
||||||
$dst->{dbh}->do($sql);
|
$host->{Cxn}->dbh()->do($sql);
|
||||||
}
|
}
|
||||||
elsif ( !$args{transaction} ) {
|
elsif ( !$o->get('transaction') ) {
|
||||||
$self->lock_table($dst->{dbh}, 'dest',
|
$self->lock_table(
|
||||||
$self->{Quoter}->quote($dst->{db}, $dst->{tbl}),
|
host => $host,
|
||||||
$args{execute} ? 'WRITE' : 'READ');
|
mode => 'READ', # $args{execute} ? 'WRITE' : 'READ')
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if ( $EVAL_ERROR ) {
|
if ( $EVAL_ERROR ) {
|
||||||
# Must abort/unlock/commit so that we don't interfere with any further
|
# Must abort/unlock/commit so that we don't interfere with any further
|
||||||
# tables we try to do.
|
# tables we try to do.
|
||||||
if ( $args{src_sth}->{Active} ) {
|
foreach my $dbh ( $host->{Cxn}->dbh(), $src->{Cxn}->dbh() ) {
|
||||||
$args{src_sth}->finish();
|
MKDEBUG && _d('Caught error, unlocking/committing', $dbh);
|
||||||
}
|
|
||||||
foreach my $dbh ( $src->{dbh}, $dst->{dbh}, $src->{misc_dbh} ) {
|
|
||||||
next unless $dbh;
|
|
||||||
MKDEBUG && _d('Caught error, unlocking/committing on', $dbh);
|
|
||||||
$dbh->do('UNLOCK TABLES');
|
$dbh->do('UNLOCK TABLES');
|
||||||
$dbh->commit() unless $dbh->{AutoCommit};
|
$dbh->commit() unless $dbh->{AutoCommit};
|
||||||
}
|
}
|
||||||
@@ -620,7 +527,7 @@ sub lock_and_wait {
|
|||||||
die $EVAL_ERROR;
|
die $EVAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
return $result;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
# This query will check all needed privileges on the table without actually
|
# This query will check all needed privileges on the table without actually
|
||||||
@@ -651,6 +558,7 @@ sub have_all_privs {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sub _d {
|
sub _d {
|
||||||
my ($package, undef, $line) = caller 0;
|
my ($package, undef, $line) = caller 0;
|
||||||
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
||||||
|
@@ -12,26 +12,22 @@ use English qw(-no_match_vars);
|
|||||||
use Test::More;
|
use Test::More;
|
||||||
|
|
||||||
# TableSyncer and its required modules:
|
# TableSyncer and its required modules:
|
||||||
|
use OptionParser;
|
||||||
|
use NibbleIterator;
|
||||||
use TableSyncer;
|
use TableSyncer;
|
||||||
use MasterSlave;
|
use MasterSlave;
|
||||||
use Quoter;
|
use Quoter;
|
||||||
use TableChecksum;
|
use RowChecksum;
|
||||||
use VersionParser;
|
|
||||||
use Retry;
|
use Retry;
|
||||||
# The sync plugins:
|
use TableParser;
|
||||||
use TableSyncChunk;
|
|
||||||
use TableSyncNibble;
|
|
||||||
use TableSyncGroupBy;
|
|
||||||
use TableSyncStream;
|
|
||||||
# Helper modules for the sync plugins:
|
|
||||||
use TableChunker;
|
|
||||||
use TableNibbler;
|
use TableNibbler;
|
||||||
# Modules for sync():
|
use TableParser;
|
||||||
use ChangeHandler;
|
use ChangeHandler;
|
||||||
use RowDiff;
|
use RowDiff;
|
||||||
# And other modules:
|
use RowSyncer;
|
||||||
use TableParser;
|
use RowChecksum;
|
||||||
use DSNParser;
|
use DSNParser;
|
||||||
|
use Cxn;
|
||||||
use Sandbox;
|
use Sandbox;
|
||||||
use PerconaTest;
|
use PerconaTest;
|
||||||
|
|
||||||
@@ -56,53 +52,46 @@ else {
|
|||||||
$sb->create_dbs($dbh, ['test']);
|
$sb->create_dbs($dbh, ['test']);
|
||||||
$sb->load_file('master', 't/lib/samples/before-TableSyncChunk.sql');
|
$sb->load_file('master', 't/lib/samples/before-TableSyncChunk.sql');
|
||||||
|
|
||||||
my $q = new Quoter();
|
|
||||||
my $tp = new TableParser(Quoter=>$q);
|
|
||||||
|
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
# Make a TableSyncer object.
|
# Make a TableSyncer object.
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
throws_ok(
|
my $ms = new MasterSlave();
|
||||||
sub { new TableSyncer() },
|
my $o = new OptionParser(description => 'TableSyncer');
|
||||||
qr/I need a MasterSlave/,
|
my $q = new Quoter();
|
||||||
'MasterSlave required'
|
my $tp = new TableParser(Quoter => $q);
|
||||||
);
|
my $tn = new TableNibbler(TableParser => $tp, Quoter => $q);
|
||||||
throws_ok(
|
my $rc = new RowChecksum(OptionParser => $o, Quoter => $q);
|
||||||
sub { new TableSyncer(MasterSlave=>1) },
|
my $rd = new RowDiff(dbh=>$dbh);
|
||||||
qr/I need a Quoter/,
|
my $rt = new Retry();
|
||||||
'Quoter required'
|
|
||||||
);
|
|
||||||
throws_ok(
|
|
||||||
sub { new TableSyncer(MasterSlave=>1, Quoter=>1) },
|
|
||||||
qr/I need a VersionParser/,
|
|
||||||
'VersionParser required'
|
|
||||||
);
|
|
||||||
throws_ok(
|
|
||||||
sub { new TableSyncer(MasterSlave=>1, Quoter=>1, VersionParser=>1) },
|
|
||||||
qr/I need a TableChecksum/,
|
|
||||||
'TableChecksum required'
|
|
||||||
);
|
|
||||||
|
|
||||||
my $rd = new RowDiff(dbh=>$src_dbh);
|
|
||||||
my $ms = new MasterSlave();
|
|
||||||
my $vp = new VersionParser();
|
|
||||||
my $rt = new Retry();
|
|
||||||
my $checksum = new TableChecksum(
|
|
||||||
Quoter => $q,
|
|
||||||
VersionParser => $vp,
|
|
||||||
);
|
|
||||||
my $syncer = new TableSyncer(
|
my $syncer = new TableSyncer(
|
||||||
MasterSlave => $ms,
|
MasterSlave => $ms,
|
||||||
|
OptionParser => $o,
|
||||||
Quoter => $q,
|
Quoter => $q,
|
||||||
TableChecksum => $checksum,
|
TableParser => $tp,
|
||||||
VersionParser => $vp,
|
TableNibbler => $tn,
|
||||||
DSNParser => $dp,
|
RowChecksum => $rc,
|
||||||
|
RowDiff => $rd,
|
||||||
Retry => $rt,
|
Retry => $rt,
|
||||||
);
|
);
|
||||||
isa_ok($syncer, 'TableSyncer');
|
isa_ok($syncer, 'TableSyncer');
|
||||||
|
|
||||||
my $chunker = new TableChunker( Quoter => $q, TableParser => $tp );
|
$o->get_specs("$trunk/bin/pt-table-sync");
|
||||||
my $nibbler = new TableNibbler( Quoter => $q, TableParser => $tp );
|
$o->get_opts();
|
||||||
|
|
||||||
|
my $src_cxn = new Cxn(
|
||||||
|
DSNParser => $dp,
|
||||||
|
OptionParser => $o,
|
||||||
|
dsn => "h=127,P=12345",
|
||||||
|
dbh => $src_dbh,
|
||||||
|
);
|
||||||
|
|
||||||
|
my $dst_cxn = new Cxn(
|
||||||
|
DSNParser => $dp,
|
||||||
|
OptionParser => $o,
|
||||||
|
dsn => "h=127,P=12346",
|
||||||
|
dbh => $dst_dbh,
|
||||||
|
);
|
||||||
|
|
||||||
# Global vars used/set by the subs below and accessed throughout the tests.
|
# Global vars used/set by the subs below and accessed throughout the tests.
|
||||||
my $src;
|
my $src;
|
||||||
@@ -110,37 +99,17 @@ my $dst;
|
|||||||
my $tbl_struct;
|
my $tbl_struct;
|
||||||
my %actions;
|
my %actions;
|
||||||
my @rows;
|
my @rows;
|
||||||
my ($sync_chunk, $sync_nibble, $sync_groupby, $sync_stream);
|
my $ch;
|
||||||
my $plugins = [];
|
my $rs;
|
||||||
|
|
||||||
# Call this func to re-make/reset the plugins.
|
|
||||||
sub make_plugins {
|
|
||||||
$sync_chunk = new TableSyncChunk(
|
|
||||||
TableChunker => $chunker,
|
|
||||||
Quoter => $q,
|
|
||||||
);
|
|
||||||
$sync_nibble = new TableSyncNibble(
|
|
||||||
TableNibbler => $nibbler,
|
|
||||||
TableChunker => $chunker,
|
|
||||||
TableParser => $tp,
|
|
||||||
Quoter => $q,
|
|
||||||
);
|
|
||||||
$sync_groupby = new TableSyncGroupBy( Quoter => $q );
|
|
||||||
$sync_stream = new TableSyncStream( Quoter => $q );
|
|
||||||
|
|
||||||
$plugins = [$sync_chunk, $sync_nibble, $sync_groupby, $sync_stream];
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub new_ch {
|
sub new_ch {
|
||||||
my ( $dbh, $queue ) = @_;
|
my ( $dbh, $queue ) = @_;
|
||||||
return new ChangeHandler(
|
$ch = new ChangeHandler(
|
||||||
Quoter => $q,
|
Quoter => $q,
|
||||||
left_db => $src->{db},
|
left_db => $src->{tbl}->{db},
|
||||||
left_tbl => $src->{tbl},
|
left_tbl => $src->{tbl}->{tbl},
|
||||||
right_db => $dst->{db},
|
right_db => $dst->{tbl}->{db},
|
||||||
right_tbl => $dst->{tbl},
|
right_tbl => $dst->{tbl}->{tbl},
|
||||||
actions => [
|
actions => [
|
||||||
sub {
|
sub {
|
||||||
my ( $sql, $change_dbh ) = @_;
|
my ( $sql, $change_dbh ) = @_;
|
||||||
@@ -162,6 +131,7 @@ sub new_ch {
|
|||||||
replace => 0,
|
replace => 0,
|
||||||
queue => defined $queue ? $queue : 1,
|
queue => defined $queue ? $queue : 1,
|
||||||
);
|
);
|
||||||
|
$ch->fetch_back($src_cxn->dbh());
|
||||||
}
|
}
|
||||||
|
|
||||||
# Shortens/automates a lot of the setup needed for calling
|
# Shortens/automates a lot of the setup needed for calling
|
||||||
@@ -173,99 +143,47 @@ sub sync_table {
|
|||||||
my ($src_db_tbl, $dst_db_tbl) = @args{qw(src dst)};
|
my ($src_db_tbl, $dst_db_tbl) = @args{qw(src dst)};
|
||||||
my ($src_db, $src_tbl) = $q->split_unquote($src_db_tbl);
|
my ($src_db, $src_tbl) = $q->split_unquote($src_db_tbl);
|
||||||
my ($dst_db, $dst_tbl) = $q->split_unquote($dst_db_tbl);
|
my ($dst_db, $dst_tbl) = $q->split_unquote($dst_db_tbl);
|
||||||
if ( $args{plugins} ) {
|
|
||||||
$plugins = $args{plugins};
|
@ARGV = $args{argv} ? @{$args{argv}} : ();
|
||||||
}
|
$o->get_opts();
|
||||||
else {
|
|
||||||
make_plugins();
|
|
||||||
}
|
|
||||||
$tbl_struct = $tp->parse(
|
$tbl_struct = $tp->parse(
|
||||||
$tp->get_create_table($src_dbh, $src_db, $src_tbl));
|
$tp->get_create_table($src_dbh, $src_db, $src_tbl));
|
||||||
$src = {
|
$src = {
|
||||||
dbh => $src_dbh,
|
Cxn => $src_cxn,
|
||||||
dsn => {h=>'127.1',P=>'12345',},
|
misc_dbh => $dbh,
|
||||||
misc_dbh => $dbh,
|
is_source => 1,
|
||||||
db => $src_db,
|
tbl => {
|
||||||
tbl => $src_tbl,
|
db => $src_db,
|
||||||
|
tbl => $src_tbl,
|
||||||
|
tbl_struct => $tbl_struct,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
$dst = {
|
$dst = {
|
||||||
dbh => $dst_dbh,
|
Cxn => $dst_cxn,
|
||||||
dsn => {h=>'127.1',P=>'12346',},
|
misc_dbh => $dbh,
|
||||||
db => $dst_db,
|
tbl => {
|
||||||
tbl => $dst_tbl,
|
db => $dst_db,
|
||||||
|
tbl => $dst_tbl,
|
||||||
|
tbl_struct => $tbl_struct,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
@rows = ();
|
@rows = ();
|
||||||
|
new_ch();
|
||||||
|
$rs = new RowSyncer(
|
||||||
|
ChangeHandler => $ch,
|
||||||
|
);
|
||||||
%actions = $syncer->sync_table(
|
%actions = $syncer->sync_table(
|
||||||
plugins => $plugins,
|
|
||||||
src => $src,
|
src => $src,
|
||||||
dst => $dst,
|
dst => $dst,
|
||||||
tbl_struct => $tbl_struct,
|
RowSyncer => $rs,
|
||||||
cols => $tbl_struct->{cols},
|
ChangeHandler => $ch,
|
||||||
chunk_size => $args{chunk_size} || 5,
|
|
||||||
dry_run => $args{dry_run},
|
|
||||||
function => $args{function} || 'SHA1',
|
|
||||||
lock => $args{lock},
|
|
||||||
transaction => $args{transaction},
|
|
||||||
callback => $args{callback},
|
|
||||||
RowDiff => $rd,
|
|
||||||
ChangeHandler => new_ch(),
|
|
||||||
trace => 0,
|
trace => 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
# ###########################################################################
|
|
||||||
# Test get_best_plugin() (formerly best_algorithm()).
|
|
||||||
# ###########################################################################
|
|
||||||
make_plugins();
|
|
||||||
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test5'));
|
|
||||||
is_deeply(
|
|
||||||
[
|
|
||||||
$syncer->get_best_plugin(
|
|
||||||
plugins => $plugins,
|
|
||||||
tbl_struct => $tbl_struct,
|
|
||||||
)
|
|
||||||
],
|
|
||||||
[ $sync_groupby ],
|
|
||||||
'Best plugin GroupBy'
|
|
||||||
);
|
|
||||||
|
|
||||||
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test3'));
|
|
||||||
my ($plugin, %plugin_args) = $syncer->get_best_plugin(
|
|
||||||
plugins => $plugins,
|
|
||||||
tbl_struct => $tbl_struct,
|
|
||||||
);
|
|
||||||
is_deeply(
|
|
||||||
[ $plugin, \%plugin_args, ],
|
|
||||||
[ $sync_chunk, { chunk_index => 'PRIMARY', chunk_col => 'a', } ],
|
|
||||||
'Best plugin Chunk'
|
|
||||||
);
|
|
||||||
|
|
||||||
# With the introduction of char chunking (issue 568), test6 can be chunked
|
|
||||||
# with Chunk or Nibble. Chunk will be prefered.
|
|
||||||
|
|
||||||
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test6'));
|
|
||||||
($plugin, %plugin_args) = $syncer->get_best_plugin(
|
|
||||||
plugins => $plugins,
|
|
||||||
tbl_struct => $tbl_struct,
|
|
||||||
);
|
|
||||||
is_deeply(
|
|
||||||
[ $plugin, \%plugin_args, ],
|
|
||||||
[ $sync_chunk, { chunk_index => 'a', chunk_col => 'a'} ],
|
|
||||||
'Best plugin Chunk (char chunking)'
|
|
||||||
);
|
|
||||||
# Remove TableSyncChunk to test that it can chunk that char col with Nibble too.
|
|
||||||
($plugin, %plugin_args) = $syncer->get_best_plugin(
|
|
||||||
plugins => [$sync_nibble, $sync_groupby, $sync_stream],
|
|
||||||
tbl_struct => $tbl_struct,
|
|
||||||
);
|
|
||||||
is_deeply(
|
|
||||||
[ $plugin, \%plugin_args, ],
|
|
||||||
[ $sync_nibble,{ chunk_index => 'a', key_cols => [qw(a)], small_table=>0 } ],
|
|
||||||
'Best plugin Nibble'
|
|
||||||
);
|
|
||||||
|
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
# Test sync_table() for each plugin with a basic, 4 row data set.
|
# Test sync_table() for each plugin with a basic, 4 row data set.
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
@@ -289,9 +207,9 @@ my $inserts = [
|
|||||||
$dst_dbh->do('TRUNCATE TABLE test.test2');
|
$dst_dbh->do('TRUNCATE TABLE test.test2');
|
||||||
|
|
||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test1",
|
src => "test.test1",
|
||||||
dst => "test.test2",
|
dst => "test.test2",
|
||||||
dry_run => 1,
|
argv => [qw(--dry-run)],
|
||||||
);
|
);
|
||||||
is_deeply(
|
is_deeply(
|
||||||
\%actions,
|
\%actions,
|
||||||
@@ -300,9 +218,8 @@ is_deeply(
|
|||||||
INSERT => 0,
|
INSERT => 0,
|
||||||
REPLACE => 0,
|
REPLACE => 0,
|
||||||
UPDATE => 0,
|
UPDATE => 0,
|
||||||
ALGORITHM => 'Chunk',
|
|
||||||
},
|
},
|
||||||
'Dry run, no changes, Chunk plugin'
|
'Dry run, no changes'
|
||||||
);
|
);
|
||||||
|
|
||||||
is_deeply(
|
is_deeply(
|
||||||
@@ -319,42 +236,10 @@ is_deeply(
|
|||||||
|
|
||||||
# Now do the real syncs that should insert 4 rows into test2.
|
# Now do the real syncs that should insert 4 rows into test2.
|
||||||
|
|
||||||
# Sync with Chunk.
|
|
||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test1",
|
src => "test.test1",
|
||||||
dst => "test.test2",
|
dst => "test.test2",
|
||||||
);
|
);
|
||||||
is_deeply(
|
|
||||||
\%actions,
|
|
||||||
{
|
|
||||||
DELETE => 0,
|
|
||||||
INSERT => 4,
|
|
||||||
REPLACE => 0,
|
|
||||||
UPDATE => 0,
|
|
||||||
ALGORITHM => 'Chunk',
|
|
||||||
},
|
|
||||||
'Sync with Chunk, 4 INSERTs'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\@rows,
|
|
||||||
$inserts,
|
|
||||||
'Sync with Chunk, ChangeHandler made INSERT statements'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
|
|
||||||
$test1_rows,
|
|
||||||
'Sync with Chunk, dst rows match src rows'
|
|
||||||
);
|
|
||||||
|
|
||||||
# Sync with Chunk again, but use chunk_size = 1k which should be converted.
|
|
||||||
$dst_dbh->do('TRUNCATE TABLE test.test2');
|
|
||||||
sync_table(
|
|
||||||
src => "test.test1",
|
|
||||||
dst => "test.test2",
|
|
||||||
chunk_size => '1k',
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
is_deeply(
|
||||||
\%actions,
|
\%actions,
|
||||||
@@ -363,166 +248,52 @@ is_deeply(
|
|||||||
INSERT => 4,
|
INSERT => 4,
|
||||||
REPLACE => 0,
|
REPLACE => 0,
|
||||||
UPDATE => 0,
|
UPDATE => 0,
|
||||||
ALGORITHM => 'Chunk',
|
|
||||||
},
|
},
|
||||||
'Sync with Chunk chunk size 1k, 4 INSERTs'
|
'Basic sync 4 INSERT'
|
||||||
);
|
);
|
||||||
|
|
||||||
is_deeply(
|
is_deeply(
|
||||||
\@rows,
|
\@rows,
|
||||||
$inserts,
|
$inserts,
|
||||||
'Sync with Chunk chunk size 1k, ChangeHandler made INSERT statements'
|
'Basic sync ChangeHandler INSERT statements'
|
||||||
);
|
);
|
||||||
|
|
||||||
is_deeply(
|
is_deeply(
|
||||||
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
|
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
|
||||||
$test1_rows,
|
$test1_rows,
|
||||||
'Sync with Chunk chunk size 1k, dst rows match src rows'
|
'Basic sync dst rows match src rows'
|
||||||
);
|
|
||||||
|
|
||||||
# Sync with Nibble.
|
|
||||||
$dst_dbh->do('TRUNCATE TABLE test.test2');
|
|
||||||
sync_table(
|
|
||||||
src => "test.test1",
|
|
||||||
dst => "test.test2",
|
|
||||||
plugins => [ $sync_nibble ],
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\%actions,
|
|
||||||
{
|
|
||||||
DELETE => 0,
|
|
||||||
INSERT => 4,
|
|
||||||
REPLACE => 0,
|
|
||||||
UPDATE => 0,
|
|
||||||
ALGORITHM => 'Nibble',
|
|
||||||
},
|
|
||||||
'Sync with Nibble, 4 INSERTs'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\@rows,
|
|
||||||
$inserts,
|
|
||||||
'Sync with Nibble, ChangeHandler made INSERT statements'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
|
|
||||||
$test1_rows,
|
|
||||||
'Sync with Nibble, dst rows match src rows'
|
|
||||||
);
|
|
||||||
|
|
||||||
# Sync with GroupBy.
|
|
||||||
$dst_dbh->do('TRUNCATE TABLE test.test2');
|
|
||||||
sync_table(
|
|
||||||
src => "test.test1",
|
|
||||||
dst => "test.test2",
|
|
||||||
plugins => [ $sync_groupby ],
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\%actions,
|
|
||||||
{
|
|
||||||
DELETE => 0,
|
|
||||||
INSERT => 4,
|
|
||||||
REPLACE => 0,
|
|
||||||
UPDATE => 0,
|
|
||||||
ALGORITHM => 'GroupBy',
|
|
||||||
},
|
|
||||||
'Sync with GroupBy, 4 INSERTs'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\@rows,
|
|
||||||
$inserts,
|
|
||||||
'Sync with GroupBy, ChangeHandler made INSERT statements'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
|
|
||||||
$test1_rows,
|
|
||||||
'Sync with GroupBy, dst rows match src rows'
|
|
||||||
);
|
|
||||||
|
|
||||||
# Sync with Stream.
|
|
||||||
$dst_dbh->do('TRUNCATE TABLE test.test2');
|
|
||||||
sync_table(
|
|
||||||
src => "test.test1",
|
|
||||||
dst => "test.test2",
|
|
||||||
plugins => [ $sync_stream ],
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\%actions,
|
|
||||||
{
|
|
||||||
DELETE => 0,
|
|
||||||
INSERT => 4,
|
|
||||||
REPLACE => 0,
|
|
||||||
UPDATE => 0,
|
|
||||||
ALGORITHM => 'Stream',
|
|
||||||
},
|
|
||||||
'Sync with Stream, 4 INSERTs'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\@rows,
|
|
||||||
$inserts,
|
|
||||||
'Sync with Stream, ChangeHandler made INSERT statements'
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
|
|
||||||
$test1_rows,
|
|
||||||
'Sync with Stream, dst rows match src rows'
|
|
||||||
);
|
);
|
||||||
|
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
# Check that the plugins can resolve unique key violations.
|
# Check that the plugins can resolve unique key violations.
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
make_plugins();
|
|
||||||
|
|
||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test3",
|
src => "test.test3",
|
||||||
dst => "test.test4",
|
dst => "test.test4",
|
||||||
plugins => [ $sync_stream ],
|
|
||||||
);
|
);
|
||||||
|
|
||||||
is_deeply(
|
is_deeply(
|
||||||
$dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ),
|
$dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ),
|
||||||
[ { a => 1, b => 2 }, { a => 2, b => 1 } ],
|
[ { a => 1, b => 2 }, { a => 2, b => 1 } ],
|
||||||
'Resolves unique key violations with Stream'
|
'Resolves unique key violations'
|
||||||
);
|
|
||||||
|
|
||||||
sync_table(
|
|
||||||
src => "test.test3",
|
|
||||||
dst => "test.test4",
|
|
||||||
plugins => [ $sync_chunk ],
|
|
||||||
);
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
$dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ),
|
|
||||||
[ { a => 1, b => 2 }, { a => 2, b => 1 } ],
|
|
||||||
'Resolves unique key violations with Chunk'
|
|
||||||
);
|
);
|
||||||
|
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
# Test locking.
|
# Test locking.
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
make_plugins();
|
|
||||||
|
|
||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test1",
|
src => "test.test1",
|
||||||
dst => "test.test2",
|
dst => "test.test2",
|
||||||
lock => 1,
|
argv => [qw(--lock 1)],
|
||||||
);
|
);
|
||||||
|
|
||||||
# The locks should be released.
|
# The locks should be released.
|
||||||
ok($src_dbh->do('select * from test.test4'), 'Cycle locks released');
|
ok($src_dbh->do('select * from test.test4'), 'Chunk locks released');
|
||||||
|
|
||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test1",
|
src => "test.test1",
|
||||||
dst => "test.test2",
|
dst => "test.test2",
|
||||||
lock => 2,
|
argv => [qw(--lock 2)],
|
||||||
);
|
);
|
||||||
|
|
||||||
# The locks should be released.
|
# The locks should be released.
|
||||||
@@ -531,7 +302,7 @@ ok($src_dbh->do('select * from test.test4'), 'Table locks released');
|
|||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test1",
|
src => "test.test1",
|
||||||
dst => "test.test2",
|
dst => "test.test2",
|
||||||
lock => 3,
|
argv => [qw(--lock 3)],
|
||||||
);
|
);
|
||||||
|
|
||||||
ok(
|
ok(
|
||||||
@@ -541,14 +312,9 @@ ok(
|
|||||||
|
|
||||||
eval {
|
eval {
|
||||||
$syncer->lock_and_wait(
|
$syncer->lock_and_wait(
|
||||||
src => $src,
|
|
||||||
dst => $dst,
|
|
||||||
lock => 3,
|
|
||||||
lock_level => 3,
|
lock_level => 3,
|
||||||
replicate => 0,
|
host => $src,
|
||||||
timeout_ok => 1,
|
src => $src,
|
||||||
transaction => 0,
|
|
||||||
wait => 60,
|
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
is($EVAL_ERROR, '', 'Locks in level 3');
|
is($EVAL_ERROR, '', 'Locks in level 3');
|
||||||
@@ -571,25 +337,32 @@ throws_ok (
|
|||||||
|
|
||||||
# Kill the DBHs it in the right order: there's a connection waiting on
|
# Kill the DBHs it in the right order: there's a connection waiting on
|
||||||
# a lock.
|
# a lock.
|
||||||
$src_dbh->disconnect();
|
$src_cxn = undef;
|
||||||
$dst_dbh->disconnect();
|
$dst_cxn = undef;
|
||||||
$src_dbh = $sb->get_dbh_for('master');
|
$src_dbh = $sb->get_dbh_for('master');
|
||||||
$dst_dbh = $sb->get_dbh_for('slave1');
|
$dst_dbh = $sb->get_dbh_for('slave1');
|
||||||
|
$src_cxn = new Cxn(
|
||||||
$src->{dbh} = $src_dbh;
|
DSNParser => $dp,
|
||||||
$dst->{dbh} = $dst_dbh;
|
OptionParser => $o,
|
||||||
|
dsn => "h=127,P=12345",
|
||||||
|
dbh => $src_dbh,
|
||||||
|
);
|
||||||
|
$dst_cxn = new Cxn(
|
||||||
|
DSNParser => $dp,
|
||||||
|
OptionParser => $o,
|
||||||
|
dsn => "h=127,P=12346",
|
||||||
|
dbh => $dst_dbh,
|
||||||
|
);
|
||||||
|
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
# Test TableSyncGroupBy.
|
# Test TableSyncGroupBy.
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
make_plugins();
|
|
||||||
$sb->load_file('master', 't/lib/samples/before-TableSyncGroupBy.sql');
|
$sb->load_file('master', 't/lib/samples/before-TableSyncGroupBy.sql');
|
||||||
sleep 1;
|
sleep 1;
|
||||||
|
|
||||||
sync_table(
|
sync_table(
|
||||||
src => "test.test1",
|
src => "test.test1",
|
||||||
dst => "test.test2",
|
dst => "test.test2",
|
||||||
plugins => [ $sync_groupby ],
|
|
||||||
);
|
);
|
||||||
|
|
||||||
is_deeply(
|
is_deeply(
|
||||||
@@ -608,11 +381,10 @@ is_deeply(
|
|||||||
],
|
],
|
||||||
'Table synced with GroupBy',
|
'Table synced with GroupBy',
|
||||||
);
|
);
|
||||||
|
exit;
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
# Issue 96: mk-table-sync: Nibbler infinite loop
|
# Issue 96: mk-table-sync: Nibbler infinite loop
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
make_plugins();
|
|
||||||
$sb->load_file('master', 't/lib/samples/issue_96.sql');
|
$sb->load_file('master', 't/lib/samples/issue_96.sql');
|
||||||
sleep 1;
|
sleep 1;
|
||||||
|
|
||||||
@@ -628,7 +400,6 @@ is_deeply(
|
|||||||
sync_table(
|
sync_table(
|
||||||
src => "issue_96.t",
|
src => "issue_96.t",
|
||||||
dst => "issue_96.t2",
|
dst => "issue_96.t2",
|
||||||
plugins => [ $sync_nibble ],
|
|
||||||
);
|
);
|
||||||
|
|
||||||
$r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4');
|
$r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4');
|
||||||
@@ -696,7 +467,6 @@ diag(`/tmp/12345/use -u root -e "DROP USER 'bob'"`);
|
|||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
# Test that the calback gives us the src and dst sql.
|
# Test that the calback gives us the src and dst sql.
|
||||||
# ###########################################################################
|
# ###########################################################################
|
||||||
make_plugins;
|
|
||||||
# Re-using issue_96.t from above. The tables are already in sync so there
|
# Re-using issue_96.t from above. The tables are already in sync so there
|
||||||
# should only be 1 sync cycle.
|
# should only be 1 sync cycle.
|
||||||
my @sqls;
|
my @sqls;
|
||||||
@@ -704,7 +474,6 @@ sync_table(
|
|||||||
src => "issue_96.t",
|
src => "issue_96.t",
|
||||||
dst => "issue_96.t2",
|
dst => "issue_96.t2",
|
||||||
chunk_size => 1000,
|
chunk_size => 1000,
|
||||||
plugins => [ $sync_nibble ],
|
|
||||||
callback => sub { push @sqls, @_; },
|
callback => sub { push @sqls, @_; },
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -762,6 +531,7 @@ my $dbh3 = $sb->get_dbh_for('master1');
|
|||||||
SKIP: {
|
SKIP: {
|
||||||
skip 'Cannot connect to sandbox master', 7 unless $dbh;
|
skip 'Cannot connect to sandbox master', 7 unless $dbh;
|
||||||
skip 'Cannot connect to second sandbox master', 7 unless $dbh3;
|
skip 'Cannot connect to second sandbox master', 7 unless $dbh3;
|
||||||
|
my $sync_chunk;
|
||||||
|
|
||||||
sub set_bidi_callbacks {
|
sub set_bidi_callbacks {
|
||||||
$sync_chunk->set_callback('same_row', sub {
|
$sync_chunk->set_callback('same_row', sub {
|
||||||
@@ -834,7 +604,6 @@ SKIP: {
|
|||||||
# Load remote data.
|
# Load remote data.
|
||||||
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
|
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
|
||||||
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
|
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
|
||||||
make_plugins();
|
|
||||||
set_bidi_callbacks();
|
set_bidi_callbacks();
|
||||||
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'bidi', 't'));
|
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'bidi', 't'));
|
||||||
|
|
||||||
@@ -849,15 +618,13 @@ SKIP: {
|
|||||||
dst => $dst,
|
dst => $dst,
|
||||||
tbl_struct => $tbl_struct,
|
tbl_struct => $tbl_struct,
|
||||||
cols => [qw(ts)], # Compare only ts col when chunks differ.
|
cols => [qw(ts)], # Compare only ts col when chunks differ.
|
||||||
plugins => $plugins,
|
|
||||||
function => 'SHA1',
|
|
||||||
ChangeHandler => new_ch($dbh3, 0), # here to override $dst_dbh.
|
ChangeHandler => new_ch($dbh3, 0), # here to override $dst_dbh.
|
||||||
RowDiff => $rd,
|
RowDiff => $rd,
|
||||||
chunk_size => 2,
|
chunk_size => 2,
|
||||||
);
|
);
|
||||||
@rows = ();
|
@rows = ();
|
||||||
|
|
||||||
$syncer->sync_table(%args, plugins => [$sync_chunk]);
|
$syncer->sync_table(%args);
|
||||||
|
|
||||||
my $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
|
my $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
|
||||||
is_deeply(
|
is_deeply(
|
||||||
@@ -880,12 +647,11 @@ SKIP: {
|
|||||||
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
|
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
|
||||||
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
|
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
|
||||||
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
|
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
|
||||||
make_plugins();
|
|
||||||
set_bidi_callbacks();
|
set_bidi_callbacks();
|
||||||
$args{ChangeHandler} = new_ch($dbh3, 0);
|
$args{ChangeHandler} = new_ch($dbh3, 0);
|
||||||
@rows = ();
|
@rows = ();
|
||||||
|
|
||||||
$syncer->sync_table(%args, plugins => [$sync_chunk], chunk_size => 10);
|
$syncer->sync_table(%args, chunk_size => 10);
|
||||||
|
|
||||||
$res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
|
$res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
|
||||||
is_deeply(
|
is_deeply(
|
||||||
@@ -908,12 +674,11 @@ SKIP: {
|
|||||||
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
|
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
|
||||||
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
|
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
|
||||||
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
|
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
|
||||||
make_plugins();
|
|
||||||
set_bidi_callbacks();
|
set_bidi_callbacks();
|
||||||
$args{ChangeHandler} = new_ch($dbh3, 0);
|
$args{ChangeHandler} = new_ch($dbh3, 0);
|
||||||
@rows = ();
|
@rows = ();
|
||||||
|
|
||||||
$syncer->sync_table(%args, plugins => [$sync_chunk], chunk_size => 100000);
|
$syncer->sync_table(%args, chunk_size => 100000);
|
||||||
|
|
||||||
$res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
|
$res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
|
||||||
is_deeply(
|
is_deeply(
|
||||||
@@ -934,7 +699,7 @@ SKIP: {
|
|||||||
# ########################################################################
|
# ########################################################################
|
||||||
$args{ChangeHandler} = new_ch($dbh3, 1);
|
$args{ChangeHandler} = new_ch($dbh3, 1);
|
||||||
throws_ok(
|
throws_ok(
|
||||||
sub { $syncer->sync_table(%args, bidirectional => 1, plugins => [$sync_chunk]) },
|
sub { $syncer->sync_table(%args, bidirectional => 1) },
|
||||||
qr/Queueing does not work with bidirectional syncing/,
|
qr/Queueing does not work with bidirectional syncing/,
|
||||||
'Queueing does not work with bidirectional syncing'
|
'Queueing does not work with bidirectional syncing'
|
||||||
);
|
);
|
||||||
@@ -945,7 +710,6 @@ SKIP: {
|
|||||||
# #############################################################################
|
# #############################################################################
|
||||||
# Test with transactions.
|
# Test with transactions.
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
make_plugins();
|
|
||||||
# Sandbox::get_dbh_for() defaults to AutoCommit=1. Autocommit must
|
# Sandbox::get_dbh_for() defaults to AutoCommit=1. Autocommit must
|
||||||
# be off else commit() will cause an error.
|
# be off else commit() will cause an error.
|
||||||
$dbh = $sb->get_dbh_for('master', {AutoCommit=>0});
|
$dbh = $sb->get_dbh_for('master', {AutoCommit=>0});
|
||||||
@@ -997,32 +761,18 @@ like(
|
|||||||
# #############################################################################
|
# #############################################################################
|
||||||
# Issue 672: mk-table-sync should COALESCE to avoid undef
|
# Issue 672: mk-table-sync should COALESCE to avoid undef
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
make_plugins();
|
|
||||||
$sb->load_file('master', "t/lib/samples/empty_tables.sql");
|
$sb->load_file('master', "t/lib/samples/empty_tables.sql");
|
||||||
|
|
||||||
foreach my $sync( $sync_chunk, $sync_nibble, $sync_groupby ) {
|
sync_table(
|
||||||
sync_table(
|
src => 'et.et1',
|
||||||
src => 'et.et1',
|
dst => 'et.et1',
|
||||||
dst => 'et.et1',
|
);
|
||||||
plugins => [ $sync ],
|
|
||||||
);
|
|
||||||
my $sync_name = ref $sync;
|
|
||||||
my $algo = $sync_name;
|
|
||||||
$algo =~ s/TableSync//;
|
|
||||||
|
|
||||||
is_deeply(
|
|
||||||
\@rows,
|
|
||||||
[],
|
|
||||||
"Sync empty tables with " . ref $sync,
|
|
||||||
);
|
|
||||||
|
|
||||||
is(
|
|
||||||
$actions{ALGORITHM},
|
|
||||||
$algo,
|
|
||||||
"$algo algo used to sync empty table"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
is_deeply(
|
||||||
|
\@rows,
|
||||||
|
[],
|
||||||
|
"Sync empty tables"
|
||||||
|
);
|
||||||
|
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
# Retry wait.
|
# Retry wait.
|
||||||
|
Reference in New Issue
Block a user