Files
percona-toolkit/t/lib/TableSyncer.t

1029 lines
31 KiB
Perl

#!/usr/bin/perl
BEGIN {
die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n"
unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH};
unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib";
};
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Test::More;
# TableSyncer and its required modules:
use TableSyncer;
use MasterSlave;
use Quoter;
use TableChecksum;
use VersionParser;
use Retry;
# The sync plugins:
use TableSyncChunk;
use TableSyncNibble;
use TableSyncGroupBy;
use TableSyncStream;
# Helper modules for the sync plugins:
use TableChunker;
use TableNibbler;
# Modules for sync():
use ChangeHandler;
use RowDiff;
# And other modules:
use TableParser;
use DSNParser;
use Sandbox;
use PerconaTest;
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
my $dp = new DSNParser(opts=>$dsn_opts);
my $sb = new Sandbox(basedir => '/tmp', DSNParser => $dp);
my $dbh = $sb->get_dbh_for('master');
my $src_dbh = $sb->get_dbh_for('master');
my $dst_dbh = $sb->get_dbh_for('slave1');
if ( !$src_dbh || !$dbh ) {
plan skip_all => 'Cannot connect to sandbox master';
}
elsif ( !$dst_dbh ) {
plan skip_all => 'Cannot connect to sandbox slave';
}
$sb->create_dbs($dbh, ['test']);
$sb->load_file('master', 't/lib/samples/before-TableSyncChunk.sql');
my $q = new Quoter();
my $tp = new TableParser(Quoter=>$q);
# ###########################################################################
# Make a TableSyncer object.
# ###########################################################################
throws_ok(
sub { new TableSyncer() },
qr/I need a MasterSlave/,
'MasterSlave required'
);
throws_ok(
sub { new TableSyncer(MasterSlave=>1) },
qr/I need a Quoter/,
'Quoter required'
);
throws_ok(
sub { new TableSyncer(MasterSlave=>1, Quoter=>1) },
qr/I need a TableChecksum/,
'TableChecksum required'
);
my $rd = new RowDiff(dbh=>$src_dbh);
my $ms = new MasterSlave(OptionParser=>1,DSNParser=>1,Quoter=>1);
my $rt = new Retry();
my $checksum = new TableChecksum(
Quoter => $q,
);
my $syncer = new TableSyncer(
MasterSlave => $ms,
Quoter => $q,
TableChecksum => $checksum,
DSNParser => $dp,
Retry => $rt,
);
isa_ok($syncer, 'TableSyncer');
my $chunker = new TableChunker( Quoter => $q, TableParser => $tp );
my $nibbler = new TableNibbler( Quoter => $q, TableParser => $tp );
# Global vars used/set by the subs below and accessed throughout the tests.
my $src;
my $dst;
my $tbl_struct;
my %actions;
my @rows;
my ($sync_chunk, $sync_nibble, $sync_groupby, $sync_stream);
my $plugins = [];
# Call this func to re-make/reset the plugins.
sub make_plugins {
$sync_chunk = new TableSyncChunk(
TableChunker => $chunker,
Quoter => $q,
);
$sync_nibble = new TableSyncNibble(
TableNibbler => $nibbler,
TableChunker => $chunker,
TableParser => $tp,
Quoter => $q,
);
$sync_groupby = new TableSyncGroupBy( Quoter => $q );
$sync_stream = new TableSyncStream( Quoter => $q );
$plugins = [$sync_chunk, $sync_nibble, $sync_groupby, $sync_stream];
return;
}
sub new_ch {
my ( $dbh, $queue ) = @_;
return new ChangeHandler(
Quoter => $q,
left_db => $src->{db},
left_tbl => $src->{tbl},
right_db => $dst->{db},
right_tbl => $dst->{tbl},
actions => [
sub {
my ( $sql, $change_dbh ) = @_;
push @rows, $sql;
if ( $change_dbh ) {
# dbh passed through change() or process_rows()
$change_dbh->do($sql);
}
elsif ( $dbh ) {
# dbh passed to this sub
$dbh->do($sql);
}
else {
# default dst dbh for this test script
$dst_dbh->do($sql);
}
}
],
replace => 0,
queue => defined $queue ? $queue : 1,
);
}
# Shortens/automates a lot of the setup needed for calling
# TableSyncer::sync_table. At minimum, you can pass just
# the src and dst args which are db.tbl args to sync. Various
# global vars are set: @rows, %actions, etc.
sub sync_table {
my ( %args ) = @_;
my ($src_db_tbl, $dst_db_tbl) = @args{qw(src dst)};
my ($src_db, $src_tbl) = $q->split_unquote($src_db_tbl);
my ($dst_db, $dst_tbl) = $q->split_unquote($dst_db_tbl);
if ( $args{plugins} ) {
$plugins = $args{plugins};
}
else {
make_plugins();
}
$tbl_struct = $tp->parse(
$tp->get_create_table($src_dbh, $src_db, $src_tbl));
$src = {
dbh => $src_dbh,
dsn => {h=>'127.1',P=>'12345',},
misc_dbh => $dbh,
db => $src_db,
tbl => $src_tbl,
};
$dst = {
dbh => $dst_dbh,
dsn => {h=>'127.1',P=>'12346',},
db => $dst_db,
tbl => $dst_tbl,
};
@rows = ();
%actions = $syncer->sync_table(
plugins => $plugins,
src => $src,
dst => $dst,
tbl_struct => $tbl_struct,
cols => $tbl_struct->{cols},
chunk_size => $args{chunk_size} || 5,
dry_run => $args{dry_run},
function => $args{function} || 'SHA1',
lock => $args{lock},
transaction => $args{transaction},
callback => $args{callback},
RowDiff => $rd,
ChangeHandler => new_ch(),
trace => 0,
);
return;
}
# ###########################################################################
# Test get_best_plugin() (formerly best_algorithm()).
# ###########################################################################
make_plugins();
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test5'));
is_deeply(
[
$syncer->get_best_plugin(
plugins => $plugins,
tbl_struct => $tbl_struct,
)
],
[ $sync_groupby ],
'Best plugin GroupBy'
);
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test3'));
my ($plugin, %plugin_args) = $syncer->get_best_plugin(
plugins => $plugins,
tbl_struct => $tbl_struct,
);
is_deeply(
[ $plugin, \%plugin_args, ],
[ $sync_chunk, { chunk_index => 'PRIMARY', chunk_col => 'a', } ],
'Best plugin Chunk'
);
# With the introduction of char chunking (issue 568), test6 can be chunked
# with Chunk or Nibble. Chunk will be prefered.
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test6'));
($plugin, %plugin_args) = $syncer->get_best_plugin(
plugins => $plugins,
tbl_struct => $tbl_struct,
);
is_deeply(
[ $plugin, \%plugin_args, ],
[ $sync_chunk, { chunk_index => 'a', chunk_col => 'a'} ],
'Best plugin Chunk (char chunking)'
);
# Remove TableSyncChunk to test that it can chunk that char col with Nibble too.
($plugin, %plugin_args) = $syncer->get_best_plugin(
plugins => [$sync_nibble, $sync_groupby, $sync_stream],
tbl_struct => $tbl_struct,
);
is_deeply(
[ $plugin, \%plugin_args, ],
[ $sync_nibble,{ chunk_index => 'a', key_cols => [qw(a)], small_table=>0 } ],
'Best plugin Nibble'
);
# ###########################################################################
# Test sync_table() for each plugin with a basic, 4 row data set.
# ###########################################################################
# test1 has 4 rows and test2, which is the same struct, is empty.
# So after sync, test2 should have the same 4 rows as test1.
my $test1_rows = [
[qw(1 en)],
[qw(2 ca)],
[qw(3 ab)],
[qw(4 bz)],
];
my $inserts = [
"INSERT INTO `test`.`test2`(`a`, `b`) VALUES ('1', 'en')",
"INSERT INTO `test`.`test2`(`a`, `b`) VALUES ('2', 'ca')",
"INSERT INTO `test`.`test2`(`a`, `b`) VALUES ('3', 'ab')",
"INSERT INTO `test`.`test2`(`a`, `b`) VALUES ('4', 'bz')",
];
# First, do a dry run sync, so nothing should happen.
$dst_dbh->do('TRUNCATE TABLE test.test2');
sync_table(
src => "test.test1",
dst => "test.test2",
dry_run => 1,
);
is_deeply(
\%actions,
{
DELETE => 0,
INSERT => 0,
REPLACE => 0,
UPDATE => 0,
ALGORITHM => 'Chunk',
},
'Dry run, no changes, Chunk plugin'
);
is_deeply(
\@rows,
[],
'Dry run, no SQL statements made'
);
is_deeply(
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
[],
'Dry run, no rows changed'
);
# Now do the real syncs that should insert 4 rows into test2.
# Sync with Chunk.
sync_table(
src => "test.test1",
dst => "test.test2",
);
is_deeply(
\%actions,
{
DELETE => 0,
INSERT => 4,
REPLACE => 0,
UPDATE => 0,
ALGORITHM => 'Chunk',
},
'Sync with Chunk, 4 INSERTs'
);
is_deeply(
\@rows,
$inserts,
'Sync with Chunk, ChangeHandler made INSERT statements'
);
is_deeply(
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
$test1_rows,
'Sync with Chunk, dst rows match src rows'
);
# Sync with Chunk again, but use chunk_size = 1k which should be converted.
$dst_dbh->do('TRUNCATE TABLE test.test2');
sync_table(
src => "test.test1",
dst => "test.test2",
chunk_size => '1k',
);
is_deeply(
\%actions,
{
DELETE => 0,
INSERT => 4,
REPLACE => 0,
UPDATE => 0,
ALGORITHM => 'Chunk',
},
'Sync with Chunk chunk size 1k, 4 INSERTs'
);
is_deeply(
\@rows,
$inserts,
'Sync with Chunk chunk size 1k, ChangeHandler made INSERT statements'
);
is_deeply(
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
$test1_rows,
'Sync with Chunk chunk size 1k, dst rows match src rows'
);
# Sync with Nibble.
$dst_dbh->do('TRUNCATE TABLE test.test2');
sync_table(
src => "test.test1",
dst => "test.test2",
plugins => [ $sync_nibble ],
);
is_deeply(
\%actions,
{
DELETE => 0,
INSERT => 4,
REPLACE => 0,
UPDATE => 0,
ALGORITHM => 'Nibble',
},
'Sync with Nibble, 4 INSERTs'
);
is_deeply(
\@rows,
$inserts,
'Sync with Nibble, ChangeHandler made INSERT statements'
);
is_deeply(
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
$test1_rows,
'Sync with Nibble, dst rows match src rows'
);
# Sync with GroupBy.
$dst_dbh->do('TRUNCATE TABLE test.test2');
sync_table(
src => "test.test1",
dst => "test.test2",
plugins => [ $sync_groupby ],
);
is_deeply(
\%actions,
{
DELETE => 0,
INSERT => 4,
REPLACE => 0,
UPDATE => 0,
ALGORITHM => 'GroupBy',
},
'Sync with GroupBy, 4 INSERTs'
);
is_deeply(
\@rows,
$inserts,
'Sync with GroupBy, ChangeHandler made INSERT statements'
);
is_deeply(
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
$test1_rows,
'Sync with GroupBy, dst rows match src rows'
);
# Sync with Stream.
$dst_dbh->do('TRUNCATE TABLE test.test2');
sync_table(
src => "test.test1",
dst => "test.test2",
plugins => [ $sync_stream ],
);
is_deeply(
\%actions,
{
DELETE => 0,
INSERT => 4,
REPLACE => 0,
UPDATE => 0,
ALGORITHM => 'Stream',
},
'Sync with Stream, 4 INSERTs'
);
is_deeply(
\@rows,
$inserts,
'Sync with Stream, ChangeHandler made INSERT statements'
);
is_deeply(
$dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'),
$test1_rows,
'Sync with Stream, dst rows match src rows'
);
# #############################################################################
# Check that the plugins can resolve unique key violations.
# #############################################################################
make_plugins();
sync_table(
src => "test.test3",
dst => "test.test4",
plugins => [ $sync_stream ],
);
is_deeply(
$dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ),
[ { a => 1, b => 2 }, { a => 2, b => 1 } ],
'Resolves unique key violations with Stream'
);
sync_table(
src => "test.test3",
dst => "test.test4",
plugins => [ $sync_chunk ],
);
is_deeply(
$dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ),
[ { a => 1, b => 2 }, { a => 2, b => 1 } ],
'Resolves unique key violations with Chunk'
);
# ###########################################################################
# Test locking.
# ###########################################################################
make_plugins();
sync_table(
src => "test.test1",
dst => "test.test2",
lock => 1,
);
# The locks should be released.
ok($src_dbh->do('select * from test.test4'), 'Cycle locks released');
sync_table(
src => "test.test1",
dst => "test.test2",
lock => 2,
);
# The locks should be released.
ok($src_dbh->do('select * from test.test4'), 'Table locks released');
sync_table(
src => "test.test1",
dst => "test.test2",
lock => 3,
);
ok(
$dbh->do('replace into test.test3 select * from test.test3 limit 0'),
'Does not lock in level 3 locking'
);
eval {
$syncer->lock_and_wait(
src => $src,
dst => $dst,
lock => 3,
lock_level => 3,
replicate => 0,
timeout_ok => 1,
transaction => 0,
wait => 60,
);
};
is($EVAL_ERROR, '', 'Locks in level 3');
# See DBI man page.
use POSIX ':signal_h';
my $mask = POSIX::SigSet->new(SIGALRM); # signals to mask in the handler
my $action = POSIX::SigAction->new( sub { die "maatkit timeout" }, $mask, );
my $oldaction = POSIX::SigAction->new();
sigaction( SIGALRM, $action, $oldaction );
throws_ok (
sub {
alarm 1;
$dbh->do('replace into test.test3 select * from test.test3 limit 0');
},
qr/maatkit timeout/,
"Level 3 lock NOT released",
);
# Kill the DBHs it in the right order: there's a connection waiting on
# a lock.
$src_dbh->disconnect();
$dst_dbh->disconnect();
$src_dbh = $sb->get_dbh_for('master');
$dst_dbh = $sb->get_dbh_for('slave1');
$src->{dbh} = $src_dbh;
$dst->{dbh} = $dst_dbh;
# ###########################################################################
# Test TableSyncGroupBy.
# ###########################################################################
make_plugins();
$sb->load_file('master', 't/lib/samples/before-TableSyncGroupBy.sql');
sync_table(
src => "test.test1",
dst => "test.test2",
plugins => [ $sync_groupby ],
);
is_deeply(
$dst_dbh->selectall_arrayref('select * from test.test2 order by a, b, c', { Slice => {}} ),
[
{ a => 1, b => 2, c => 3 },
{ a => 1, b => 2, c => 3 },
{ a => 1, b => 2, c => 3 },
{ a => 1, b => 2, c => 3 },
{ a => 2, b => 2, c => 3 },
{ a => 2, b => 2, c => 3 },
{ a => 2, b => 2, c => 3 },
{ a => 2, b => 2, c => 3 },
{ a => 3, b => 2, c => 3 },
{ a => 3, b => 2, c => 3 },
],
'Table synced with GroupBy',
);
# #############################################################################
# Issue 96: mk-table-sync: Nibbler infinite loop
# #############################################################################
make_plugins();
$sb->load_file('master', 't/lib/samples/issue_96.sql');
# Make paranoid-sure that the tables differ.
my $r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4');
my $r2 = $dst_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t2 WHERE package_id=4');
is_deeply(
[ $r1->[0]->[0], $r2->[0]->[0] ],
[ 'ta', 'zz' ],
'Infinite loop table differs (issue 96)'
);
sync_table(
src => "issue_96.t",
dst => "issue_96.t2",
plugins => [ $sync_nibble ],
);
$r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4');
$r2 = $dst_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t2 WHERE package_id=4');
# Other tests below rely on this table being synced, so die
# if it fails to sync.
is(
$r1->[0]->[0],
$r2->[0]->[0],
'Sync infinite loop table (issue 96)'
) or die "Failed to sync issue_96.t";
# #############################################################################
# Test check_permissions().
# #############################################################################
# have_all_privs() removed due to
# https://bugs.launchpad.net/percona-toolkit/+bug/1036747
# ###########################################################################
# Test that the calback gives us the src and dst sql.
# ###########################################################################
make_plugins;
# Re-using issue_96.t from above. The tables are already in sync so there
# should only be 1 sync cycle.
my @sqls;
sync_table(
src => "issue_96.t",
dst => "issue_96.t2",
chunk_size => 1000,
plugins => [ $sync_nibble ],
callback => sub { push @sqls, @_; },
);
my $queries = ($sandbox_version gt '4.0' ?
[
'SELECT /*issue_96.t:1/1*/ 0 AS chunk_num, COUNT(*) AS cnt, COALESCE(LOWER(CONCAT(LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 1, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 17, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc := SHA1(CONCAT_WS(\'#\', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))), 33, 8), 16, 10) AS UNSIGNED)), 10, 16), 8, \'0\'))), 0) AS crc FROM `issue_96`.`t` FORCE INDEX (`package_id`) WHERE (1=1)',
'SELECT /*issue_96.t2:1/1*/ 0 AS chunk_num, COUNT(*) AS cnt, COALESCE(LOWER(CONCAT(LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 1, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 17, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc := SHA1(CONCAT_WS(\'#\', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))), 33, 8), 16, 10) AS UNSIGNED)), 10, 16), 8, \'0\'))), 0) AS crc FROM `issue_96`.`t2` FORCE INDEX (`package_id`) WHERE (1=1)',
] :
[
"SELECT /*issue_96.t:1/1*/ 0 AS chunk_num, COUNT(*) AS cnt, COALESCE(RIGHT(MAX(\@crc := CONCAT(LPAD(\@cnt := \@cnt + 1, 16, '0'), SHA1(CONCAT(\@crc, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))))))), 40), 0) AS crc FROM `issue_96`.`t` FORCE INDEX (`package_id`) WHERE (1=1)",
"SELECT /*issue_96.t2:1/1*/ 0 AS chunk_num, COUNT(*) AS cnt, COALESCE(RIGHT(MAX(\@crc := CONCAT(LPAD(\@cnt := \@cnt + 1, 16, '0'), SHA1(CONCAT(\@crc, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))))))), 40), 0) AS crc FROM `issue_96`.`t2` FORCE INDEX (`package_id`) WHERE (1=1)",
],
);
is_deeply(
\@sqls,
$queries,
'Callback gives src and dst sql'
);
# #############################################################################
# Test that make_checksum_queries() doesn't pass replicate.
# #############################################################################
# Re-using issue_96.* tables from above.
$queries = ($sandbox_version gt '4.0' ?
[
'SELECT /*PROGRESS_COMMENT*//*CHUNK_NUM*/ COUNT(*) AS cnt, COALESCE(LOWER(CONCAT(LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 1, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 17, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc := SHA1(CONCAT_WS(\'#\', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))), 33, 8), 16, 10) AS UNSIGNED)), 10, 16), 8, \'0\'))), 0) AS crc FROM /*DB_TBL*//*INDEX_HINT*//*WHERE*/',
"`package_id`, `location`, `from_city`, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`))))",
] :
[
"SELECT /*PROGRESS_COMMENT*//*CHUNK_NUM*/ COUNT(*) AS cnt, COALESCE(RIGHT(MAX(\@crc := CONCAT(LPAD(\@cnt := \@cnt + 1, 16, '0'), SHA1(CONCAT(\@crc, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))))))), 40), 0) AS crc FROM /*DB_TBL*//*INDEX_HINT*//*WHERE*/",
"`package_id`, `location`, `from_city`, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`))))",
],
);
@sqls = $syncer->make_checksum_queries(
replicate => 'bad',
src => $src,
dst => $dst,
tbl_struct => $tbl_struct,
function => 'SHA1',
);
is_deeply(
\@sqls,
$queries,
'make_checksum_queries() does not pass replicate arg'
);
# #############################################################################
# Issue 464: Make mk-table-sync do two-way sync
# #############################################################################
diag(`$trunk/sandbox/start-sandbox master 12348 >/dev/null`);
my $dbh3 = $sb->get_dbh_for('master1');
SKIP: {
skip 'Cannot connect to sandbox master', 7 unless $dbh;
skip 'Cannot connect to second sandbox master', 7 unless $dbh3;
sub set_bidi_callbacks {
$sync_chunk->set_callback('same_row', sub {
my ( %args ) = @_;
my ($lr, $rr, $syncer) = @args{qw(lr rr syncer)};
my $ch = $syncer->{ChangeHandler};
my $change_dbh;
my $auth_row;
my $left_ts = $lr->{ts};
my $right_ts = $rr->{ts};
PTDEBUG && TableSyncer::_d("left ts: $left_ts");
PTDEBUG && TableSyncer::_d("right ts: $right_ts");
my $cmp = ($left_ts || '') cmp ($right_ts || '');
if ( $cmp == -1 ) {
PTDEBUG && TableSyncer::_d("right dbh $dbh3 is newer; update left dbh $src_dbh");
$ch->set_src('right', $dbh3);
$auth_row = $args{rr};
$change_dbh = $src_dbh;
}
elsif ( $cmp == 1 ) {
PTDEBUG && TableSyncer::_d("left dbh $src_dbh is newer; update right dbh $dbh3");
$ch->set_src('left', $src_dbh);
$auth_row = $args{lr};
$change_dbh = $dbh3;
}
return ('UPDATE', $auth_row, $change_dbh);
});
$sync_chunk->set_callback('not_in_right', sub {
my ( %args ) = @_;
$args{syncer}->{ChangeHandler}->set_src('left', $src_dbh);
return 'INSERT', $args{lr}, $dbh3;
});
$sync_chunk->set_callback('not_in_left', sub {
my ( %args ) = @_;
$args{syncer}->{ChangeHandler}->set_src('right', $dbh3);
return 'INSERT', $args{rr}, $src_dbh;
});
};
# Proper data on both tables after bidirectional sync.
my $bidi_data =
[
[1, 'abc', 1, '2010-02-01 05:45:30'],
[2, 'def', 2, '2010-01-31 06:11:11'],
[3, 'ghi', 5, '2010-02-01 09:17:52'],
[4, 'jkl', 6, '2010-02-01 10:11:33'],
[5, undef, 0, '2010-02-02 05:10:00'],
[6, 'p', 4, '2010-01-31 10:17:00'],
[7, 'qrs', 5, '2010-02-01 10:11:11'],
[8, 'tuv', 6, '2010-01-31 10:17:20'],
[9, 'wxy', 7, '2010-02-01 10:17:00'],
[10, 'z', 8, '2010-01-31 10:17:08'],
[11, '?', 0, '2010-01-29 11:17:12'],
[12, '', 0, '2010-02-01 11:17:00'],
[13, 'hmm', 1, '2010-02-02 12:17:31'],
[14, undef, 0, '2010-01-31 10:17:00'],
[15, 'gtg', 7, '2010-02-02 06:01:08'],
[17, 'good', 1, '2010-02-02 21:38:03'],
[20, 'new', 100, '2010-02-01 04:15:36'],
];
# ########################################################################
# First bidi test with chunk size=2, roughly 9 chunks.
# ########################################################################
# Load "master" data.
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/table.sql');
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
# Load remote data.
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
make_plugins();
set_bidi_callbacks();
$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'bidi', 't'));
$src->{db} = 'bidi';
$src->{tbl} = 't';
$dst->{db} = 'bidi';
$dst->{tbl} = 't';
$dst->{dbh} = $dbh3; # Must set $dbh3 here and
my %args = (
src => $src,
dst => $dst,
tbl_struct => $tbl_struct,
cols => [qw(ts)], # Compare only ts col when chunks differ.
plugins => $plugins,
function => 'SHA1',
ChangeHandler => new_ch($dbh3, 0), # here to override $dst_dbh.
RowDiff => $rd,
chunk_size => 2,
);
@rows = ();
$syncer->sync_table(%args, plugins => [$sync_chunk]);
my $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
is_deeply(
$res,
$bidi_data,
'Bidirectional sync "master" (chunk size 2)'
);
$res = $dbh3->selectall_arrayref('select * from bidi.t order by id');
is_deeply(
$res,
$bidi_data,
'Bidirectional sync remote-1 (chunk size 2)'
);
# ########################################################################
# Test it again with a larger chunk size, roughly half the table.
# ########################################################################
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/table.sql');
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
make_plugins();
set_bidi_callbacks();
$args{ChangeHandler} = new_ch($dbh3, 0);
@rows = ();
$syncer->sync_table(%args, plugins => [$sync_chunk], chunk_size => 10);
$res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
is_deeply(
$res,
$bidi_data,
'Bidirectional sync "master" (chunk size 10)'
);
$res = $dbh3->selectall_arrayref('select * from bidi.t order by id');
is_deeply(
$res,
$bidi_data,
'Bidirectional sync remote-1 (chunk size 10)'
);
# ########################################################################
# Chunk whole table.
# ########################################################################
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/table.sql');
$sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql');
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql');
$sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql');
make_plugins();
set_bidi_callbacks();
$args{ChangeHandler} = new_ch($dbh3, 0);
@rows = ();
$syncer->sync_table(%args, plugins => [$sync_chunk], chunk_size => 100000);
$res = $src_dbh->selectall_arrayref('select * from bidi.t order by id');
is_deeply(
$res,
$bidi_data,
'Bidirectional sync "master" (whole table chunk)'
);
$res = $dbh3->selectall_arrayref('select * from bidi.t order by id');
is_deeply(
$res,
$bidi_data,
'Bidirectional sync remote-1 (whole table chunk)'
);
# ########################################################################
# See TableSyncer.pm for why this is so.
# ########################################################################
$args{ChangeHandler} = new_ch($dbh3, 1);
throws_ok(
sub { $syncer->sync_table(%args, bidirectional => 1, plugins => [$sync_chunk]) },
qr/Queueing does not work with bidirectional syncing/,
'Queueing does not work with bidirectional syncing'
);
diag(`$trunk/sandbox/stop-sandbox 12348 >/dev/null &`);
$dbh3->disconnect();
}
# #############################################################################
# Test with transactions.
# #############################################################################
make_plugins();
# Sandbox::get_dbh_for() defaults to AutoCommit=1. Autocommit must
# be off else commit() will cause an error.
$dbh = $sb->get_dbh_for('master', {AutoCommit=>0});
$src_dbh = $sb->get_dbh_for('master', {AutoCommit=>0});
$dst_dbh = $sb->get_dbh_for('slave1', {AutoCommit=>0});
sync_table(
src => "test.test1",
dst => "test.test1",
transaction => 1,
lock => 1,
);
# There are no diffs. This just tests that the code doesn't crash
# when transaction is true.
is_deeply(
\@rows,
[],
"Sync with transaction"
);
$syncer->lock_and_wait(
src => {
dbh => $src_dbh,
db => 'sakila',
tbl => 'actor',
},
dst => {
dbh => $dst_dbh,
db => 'sakila',
tbl => 'actor',
},
lock => 1,
lock_level => 1,
transaction => 1,
);
my $cid = $src_dbh->selectrow_arrayref("SELECT CONNECTION_ID()")->[0];
$src_dbh->do("SELECT * FROM sakila.actor WHERE 1=1 LIMIT 2 FOR UPDATE");
my $idb_status = $src_dbh->selectrow_hashref("SHOW /*!40100 ENGINE*/ INNODB STATUS");
$src_dbh->commit();
like(
$idb_status->{status},
qr/MySQL thread id $cid, .*?query id \d+/,
"Open transaction"
);
# #############################################################################
# Issue 672: mk-table-sync should COALESCE to avoid undef
# #############################################################################
make_plugins();
$sb->load_file('master', "t/lib/samples/empty_tables.sql");
foreach my $sync( $sync_chunk, $sync_nibble, $sync_groupby ) {
sync_table(
src => 'et.et1',
dst => 'et.et1',
plugins => [ $sync ],
);
my $sync_name = ref $sync;
my $algo = $sync_name;
$algo =~ s/TableSync//;
is_deeply(
\@rows,
[],
"Sync empty tables with " . ref $sync,
);
is(
$actions{ALGORITHM},
$algo,
"$algo algo used to sync empty table"
);
}
# #############################################################################
# Retry wait.
# #############################################################################
diag(`/tmp/12346/use -e "stop slave"`);
my $output = '';
{
local *STDERR;
open STDERR, '>', \$output;
throws_ok(
sub {
$syncer->lock_and_wait(
src => {
dbh => $src_dbh,
db => 'sakila',
tbl => 'actor',
misc_dbh => $dbh,
},
dst => {
dbh => $dst_dbh,
db => 'sakila',
tbl => 'actor',
},
lock => 1,
lock_level => 1,
wait => 60,
wait_retry_args => {
wait => 1,
tries => 2,
},
);
},
qr/Slave did not catch up to its master after 2 attempts of waiting 60/,
"Retries wait"
);
}
diag(`/tmp/12347/use -e "stop slave"`);
diag(`/tmp/12346/use -e "start slave"`);
diag(`/tmp/12347/use -e "start slave"`);
# #############################################################################
# Done.
# #############################################################################
{
local *STDERR;
open STDERR, '>', \$output;
$syncer->_d('Complete test coverage');
}
like(
$output,
qr/Complete test coverage/,
'_d() works'
);
$src_dbh->disconnect() if $src_dbh;
$dst_dbh->disconnect() if $dst_dbh;
$sb->wipe_clean($dbh);
ok($sb->ok(), "Sandbox servers") or BAIL_OUT(__FILE__ . " broke the sandbox");
done_testing;
exit;