From ec8471ba28d329b662212c4883070bb5a776a1c2 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Wed, 21 Dec 2011 12:10:57 -0700 Subject: [PATCH] Finish rewriting TableSyncer and updating its test. Add RowSyncerBidirectional.pm. Add GENLOG switch to start-sandbox. --- lib/NibbleIterator.pm | 32 +-- lib/RowSyncerBidirectional.pm | 249 ++++++++++++++++++++++ lib/TableSyncer.pm | 93 ++++---- sandbox/start-sandbox | 3 + t/lib/TableSyncer.t | 387 ++++++++++++++++------------------ 5 files changed, 512 insertions(+), 252 deletions(-) create mode 100644 lib/RowSyncerBidirectional.pm diff --git a/lib/NibbleIterator.pm b/lib/NibbleIterator.pm index 61208884..5dad815a 100644 --- a/lib/NibbleIterator.pm +++ b/lib/NibbleIterator.pm @@ -84,6 +84,11 @@ sub new { if ( !$index && !$one_nibble ) { die "There is no good index and the table is oversized."; } + my ($index_cols, $order_by); + if ( $index ) { + $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols}; + $order_by = join(', ', map {$q->quote($_)} @{$index_cols}); + } my $tbl_struct = $tbl->{tbl_struct}; my $ignore_col = $o->get('ignore-columns') || {}; @@ -93,20 +98,20 @@ sub new { if ( $one_nibble ) { # If the chunk size is >= number of rows in table, then we don't # need to chunk; we can just select all rows, in order, at once. + my $cols = ($args{select} ? $args{select} + : join(', ', map { $q->quote($_) } @cols)); + my $from = $q->quote(@{$tbl}{qw(db tbl)}); + my $nibble_sql = ($args{dml} ? "$args{dml} " : "SELECT ") - . ($args{select} ? $args{select} - : join(', ', map { $q->quote($_) } @cols)) - . " FROM " . $q->quote(@{$tbl}{qw(db tbl)}) + . $cols + . " FROM $from " . ($where ? " AND ($where)" : '') . " /*checksum table*/"; MKDEBUG && _d('One nibble statement:', $nibble_sql); my $explain_nibble_sql - = "EXPLAIN SELECT " - . ($args{select} ? $args{select} - : join(', ', map { $q->quote($_) } @cols)) - . " FROM " . $q->quote(@{$tbl}{qw(db tbl)}) + = "EXPLAIN SELECT $cols FROM $from" . ($where ? " AND ($where)" : '') . " /*explain checksum table*/"; MKDEBUG && _d('Explain one nibble statement:', $explain_nibble_sql); @@ -117,11 +122,15 @@ sub new { limit => 0, nibble_sql => $nibble_sql, explain_nibble_sql => $explain_nibble_sql, + sql => { + columns => $cols, + from => $from, + where => $where, + order_by => $order_by, + }, }; } else { - my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols}; - # Figure out how to nibble the table with the index. my $asc = $args{TableNibbler}->generate_asc_stmt( %args, @@ -135,8 +144,7 @@ sub new { # Make SQL statements, prepared on first call to next(). FROM and # ORDER BY are the same for all statements. FORCE IDNEX and ORDER BY # are needed to ensure deterministic nibbling. - my $from = $q->quote(@{$tbl}{qw(db tbl)}) . " FORCE INDEX(`$index`)"; - my $order_by = join(', ', map {$q->quote($_)} @{$index_cols}); + my $from = $q->quote(@{$tbl}{qw(db tbl)}) . " FORCE INDEX(`$index`)"; # The real first row in the table. Usually we start nibbling from # this row. Called once in _get_bounds(). @@ -230,7 +238,6 @@ sub new { $self = { %args, - index => $index, limit => $limit, first_lb_sql => $first_lb_sql, last_ub_sql => $last_ub_sql, @@ -249,6 +256,7 @@ sub new { }; } + $self->{index} = $index; $self->{row_est} = $row_est; $self->{nibbleno} = 0; $self->{have_rows} = 0; diff --git a/lib/RowSyncerBidirectional.pm b/lib/RowSyncerBidirectional.pm new file mode 100644 index 00000000..bcf7fb78 --- /dev/null +++ b/lib/RowSyncerBidirectional.pm @@ -0,0 +1,249 @@ +# This program is copyright 2011 Percona Inc. +# Feedback and improvements are welcome. +# +# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED +# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF +# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar +# systems, you can issue `man perlgpl' or `man perlartistic' to read these +# licenses. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 59 Temple +# Place, Suite 330, Boston, MA 02111-1307 USA. +# ########################################################################### +# RowSyncerBidirectional package +# ########################################################################### +{ +# Package: RowSyncerBidirectional +# RowSyncerBidirectional syncs a destination row to a source row. +package RowSyncerBidirectional; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant MKDEBUG => $ENV{MKDEBUG} || 0; + +use Data::Dumper; +$Data::Dumper::Indent = 1; +$Data::Dumper::Sortkeys = 1; +$Data::Dumper::Quotekeys = 0; + +use constant UPDATE_LEFT => -1; +use constant UPDATE_RIGHT => 1; +use constant UPDATE_NEITHER => 0; # neither value equals/matches +use constant FAILED_THRESHOLD => 2; # failed to exceed threshold + +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(OptionParser ChangeHandler); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + my $self = { + crc_col => 'crc', + %args, + }; + return bless $self, $class; +} + +sub set_crc_col { + my ($self, $crc_col) = @_; + $self->{crc_col} = $crc_col; + return; +} + +sub set_key_cols { + my ($self, $key_cols) = @_; + $self->{key_cols} = $key_cols; + return; +} + +sub key_cols { + my ($self) = @_; + return $self->{key_cols}; +} + +# Sub: cmd_conflict_col +# Compare --conflict-column values for --bidirectional. This sub is +# used as a callback in . +# +# Parameters: +# $left_val - Column value from left (usually the source host) +# $right_val - Column value from right (usually the destination host) +# $cmp - Type of conflict comparison, --conflict-comparison +# $val - Value for certain types of comparisons, --conflict-value +# $thr - Threshold for certain types of comparisons, +# --conflict-threshold +# +# Returns: +# One of the constants above, UPDATE_* or FAILED_THRESHOLD +sub cmp_conflict_col { + my ( $left_val, $right_val, $cmp, $val, $thr ) = @_; + MKDEBUG && _d('Compare', @_); + my $res; + if ( $cmp eq 'newest' || $cmp eq 'oldest' ) { + $res = $cmp eq 'newest' ? ($left_val || '') cmp ($right_val || '') + : ($right_val || '') cmp ($left_val || ''); + + if ( $thr ) { + $thr = Transformers::time_to_secs($thr); + my $lts = Transformers::any_unix_timestamp($left_val); + my $rts = Transformers::any_unix_timestamp($right_val); + my $diff = abs($lts - $rts); + MKDEBUG && _d('Check threshold, lts rts thr abs-diff:', + $lts, $rts, $thr, $diff); + if ( $diff < $thr ) { + MKDEBUG && _d("Failed threshold"); + return FAILED_THRESHOLD; + } + } + } + elsif ( $cmp eq 'greatest' || $cmp eq 'least' ) { + $res = $cmp eq 'greatest' ? (($left_val ||0) > ($right_val ||0) ? 1 : -1) + : (($left_val ||0) < ($right_val ||0) ? 1 : -1); + $res = 0 if ($left_val || 0) == ($right_val || 0); + if ( $thr ) { + my $diff = abs($left_val - $right_val); + MKDEBUG && _d('Check threshold, abs-diff:', $diff); + if ( $diff < $thr ) { + MKDEBUG && _d("Failed threshold"); + return FAILED_THRESHOLD; + } + } + } + elsif ( $cmp eq 'equals' ) { + $res = ($left_val || '') eq $val ? 1 + : ($right_val || '') eq $val ? -1 + : 0; + } + elsif ( $cmp eq 'matches' ) { + $res = ($left_val || '') =~ m/$val/ ? 1 + : ($right_val || '') =~ m/$val/ ? -1 + : 0; + } + else { + # Should happen; caller should have verified this. + die "Invalid comparison: $cmp"; + } + + return $res; +} + +sub same_row { + my ($self, %args) = @_; + my ($lr, $rr, $syncer) = @args{qw(lr rr syncer)}; + + my $ch = $self->{ChangeHandler}; + my $action = 'UPDATE'; + my $auth_row = $lr; + my $change_dbh; + my $err; + + my $o = $self->{OptionParser}; + my $col = $o->get('conflict-column'); + my $cmp = $o->get('conflict-comparison'); + my $val = $o->get('conflict-value'); + my $thr = $o->get('conflict-threshold'); + + my $left_val = $lr->{$col} || ''; + my $right_val = $rr->{$col} || ''; + MKDEBUG && _d('left', $col, 'value:', $left_val); + MKDEBUG && _d('right', $col, 'value:', $right_val); + + my $res = cmp_conflict_col($left_val, $right_val, $cmp, $val, $thr); + if ( $res == UPDATE_LEFT ) { + MKDEBUG && _d("right dbh $args{right_dbh} $cmp; " + . "update left dbh $args{left_dbh}"); + $ch->set_src('right', $args{right_dbh}); + $auth_row = $args{rr}; + $change_dbh = $args{left_dbh}; + } + elsif ( $res == UPDATE_RIGHT ) { + MKDEBUG && _d("left dbh $args{left_dbh} $cmp; " + . "update right dbh $args{right_dbh}"); + $ch->set_src('left', $args{left_dbh}); + $auth_row = $args{lr}; + $change_dbh = $args{right_dbh}; + } + elsif ( $res == UPDATE_NEITHER ) { + if ( $cmp eq 'equals' || $cmp eq 'matches' ) { + $err = "neither `$col` value $cmp $val"; + } + else { + $err = "`$col` values are the same" + } + } + elsif ( $res == FAILED_THRESHOLD ) { + $err = "`$col` values do not differ by the threhold, $thr." + } + else { + # Shouldn't happen. + die "cmp_conflict_col() returned an invalid result: $res." + } + + if ( $err ) { + $action = undef; # skip change in case we just warn + my $where = $ch->make_where_clause($lr, $self->key_cols()); + $err = "# Cannot resolve conflict WHERE $where: $err\n"; + + # die here is caught in sync_a_table(). We're deeply nested: + # sync_a_table > sync_table > compare_sets > syncer > here + my $print_err = $o->get('conflict-error'); + $print_err =~ m/warn/i ? warn $err + : $print_err =~ m/die/i ? die $err + : $print_err =~ m/ignore/i ? MKDEBUG && _d("Conflict error:", $err) + : die "Invalid --conflict-error: $print_err"; + return; + } + + return $ch->change( + $action, # Execute the action + $auth_row, # with these row values + $self->key_cols(), # identified by these key cols + $change_dbh, # on this dbh + ); +} + +sub not_in_right { + my ( $self, %args ) = @_; + $self->{ChangeHandler}->set_src('left', $args{left_dbh}); + return $self->{ChangeHandler}->change( + 'INSERT', # Execute the action + $args{lr}, # with these row values + $self->key_cols(), # identified by these key cols + $args{right_dbh}, # on this dbh + ); +} + +sub not_in_left { + my ( $self, %args ) = @_; + $self->{ChangeHandler}->set_src('right', $args{right_dbh}); + return $self->{ChangeHandler}->change( + 'INSERT', # Execute the action + $args{rr}, # with these row values + $self->key_cols(), # identified by these key cols + $args{left_dbh}, # on this dbh + ); +} + +sub done_with_rows { + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End RowSyncerBidirectional package +# ########################################################################### diff --git a/lib/TableSyncer.pm b/lib/TableSyncer.pm index c7b65191..154e5a88 100644 --- a/lib/TableSyncer.pm +++ b/lib/TableSyncer.pm @@ -143,6 +143,7 @@ sub sync_table { my $user_where = $o->get('where'); + my ($src_nibble_iter, $dst_nibble_iter); foreach my $host ($src, $dst) { my $callbacks = { init => sub { @@ -186,7 +187,7 @@ sub sync_table { }, }; - $host->{nibble_iter} = new NibbleIterator( + my $nibble_iter = new NibbleIterator( Cxn => $host->{Cxn}, tbl => $host->{tbl}, chunk_size => $o->get('chunk-size'), @@ -194,15 +195,23 @@ sub sync_table { select => $chunk_cols, callbacks => $callbacks, fetch_hashref => 1, + one_nibble => $args{one_nibble}, OptionParser => $self->{OptionParser}, Quoter => $self->{Quoter}, TableNibbler => $self->{TableNibbler}, TableParser => $self->{TableParser}, RowChecksum => $self->{RowChecksum}, ); + + if ( $host->{is_source} ) { + $src_nibble_iter = $nibble_iter; + } + else { + $dst_nibble_iter = $nibble_iter; + } } - my $index = $src->{nibble_iter}->nibble_index(); + my $index = $src_nibble_iter->nibble_index(); my $key_cols = $index ? $src->{tbl}->{tbl_struct}->{keys}->{$index}->{cols} : $src->{tbl}->{tbl_struct}->{cols}; $row_syncer->set_key_cols($key_cols); @@ -214,59 +223,62 @@ sub sync_table { $row_syncer->set_crc_col($crc_col); MKDEBUG && _d('CRC column:', $crc_col); + my $rows_sql; + my $row_cols = $row_checksum->make_row_checksum( + dbh => $src->{Cxn}->dbh(), + tbl => $src->{tbl}, + %crc_args, + ); + my $sql_clause = $src_nibble_iter->sql(); foreach my $host ($src, $dst) { - my $row_cols = $row_checksum->make_row_checksum( - dbh => $host->{Cxn}->dbh(), - tbl => $host->{tbl}, - %crc_args, - ); - my $nibble_iter = $host->{nibble_iter}; - - if ( $nibble_iter->one_nibble() ) { - my $rows_sql + if ( $src_nibble_iter->one_nibble() ) { + $rows_sql = 'SELECT /*rows in nibble*/ ' . ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '') . "$row_cols AS $crc_col" . " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)}) . " WHERE 1=1 " - . ($user_where ? " AND ($user_where)" : ''); - $host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql); + . ($user_where ? " AND ($user_where)" : '') + . ($sql_clause->{order_by} ? " ORDER BY " . $sql_clause->{order_by} + : ""); } else { - my $sql = $nibble_iter->sql(); - my $rows_sql + $rows_sql = 'SELECT /*rows in nibble*/ ' . ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '') . "$row_cols AS $crc_col" . " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)}) - . " WHERE " . $sql->{boundaries}->{'>='} # lower boundary - . " AND " . $sql->{boundaries}->{'<='} # upper boundary + . " WHERE " . $sql_clause->{boundaries}->{'>='} # lower boundary + . " AND " . $sql_clause->{boundaries}->{'<='} # upper boundary . ($user_where ? " AND ($user_where)" : '') - . " ORDER BY " . $sql->{order_by}; - $host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql); + . " ORDER BY " . $sql_clause->{order_by}; } + $host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql); } # ######################################################################## # Start syncing the table. # ######################################################################## - while ( grep { $_->{nibble_iter}->more_boundaries() } ($src, $dst) ) { - my $src_chunk = $src->{nibble_iter}->next(); - my $dst_chunk = $dst->{nibble_iter}->next(); + while ( $src_nibble_iter->more_boundaries() + || $dst_nibble_iter->more_boundaries() ) { + + my $src_chunk = $src_nibble_iter->next(); + my $dst_chunk = $dst_nibble_iter->next(); if ( $src_chunk->{cnt} != $dst_chunk->{cnt} || $src_chunk->{crc} ne $dst_chunk->{crc} ) { MKDEBUG && _d("Chunks differ"); + my $boundary = $src_nibble_iter->boundaries(); foreach my $host ($src, $dst) { - my $nibble_iter = $host->{nibble_iter}; - my $boundary = $nibble_iter->boundaries(); MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement}, 'params:', @{$boundary->{lower}}, @{$boundary->{upper}}); $host->{rows_sth}->execute( @{$boundary->{lower}}, @{$boundary->{upper}}); } $row_diff->compare_sets( + left_dbh => $src->{Cxn}->dbh(), left_sth => $src->{rows_sth}, + right_dbh => $dst->{Cxn}->dbh(), right_sth => $dst->{rows_sth}, tbl_struct => $src->{tbl}->{tbl_struct}, syncer => $row_syncer, @@ -277,18 +289,9 @@ sub sync_table { } } - # Unlock the chunks. - foreach my $host ($src, $dst) { - $self->unlock( - lock_level => 1, - host => $host, - OptionParser => $o, - ); - } - # Get next chunks. - $src->{nibble_iter}->no_more_rows(); - $dst->{nibble_iter}->no_more_rows(); + $src_nibble_iter->no_more_rows(); + $dst_nibble_iter->no_more_rows(); } $changer->process_rows(0, $trace); @@ -337,12 +340,12 @@ sub unlock { MKDEBUG && _d('Unlocking level', $lock); if ( $o->get('transaction') ) { - MKDEBUG && _d('Committing', $host->name()); + MKDEBUG && _d('Committing', $host->{Cxn}->name()); $host->{Cxn}->dbh()->commit(); } else { my $sql = 'UNLOCK TABLES'; - MKDEBUG && _d($host->name(), $sql); + MKDEBUG && _d($host->{Cxn}->name(), $sql); $host->{Cxn}->dbh()->do($sql); } @@ -377,6 +380,18 @@ sub lock_and_wait { my $lock = $o->get('lock'); return unless $lock && $lock == $lock_level; + # First, commit/unlock the previous transaction/lock. + if ( $o->get('transaction') ) { + MKDEBUG && _d('Committing', $host->{Cxn}->name()); + $host->{Cxn}->dbh()->commit(); + } + else { + my $sql = 'UNLOCK TABLES'; + MKDEBUG && _d($host->{Cxn}->name(), $sql); + $host->{Cxn}->dbh()->do($sql); + } + + # Lock/start xa. return $host->{is_source} ? $self->_lock_src(%args) : $self->_lock_dst(%args); } @@ -426,8 +441,8 @@ sub _lock_dst { eval { if ( my $timeout = $o->get('wait') ) { my $ms = $self->{MasterSlave}; - my $tries = 3; - my $wait; + my $wait = $args{wait_retry_args}->{wait} || 10; + my $tries = $args{wait_retry_args}->{tries} || 3; $self->{Retry}->retry( tries => $tries, wait => sub { sleep 5; }, diff --git a/sandbox/start-sandbox b/sandbox/start-sandbox index 44e3cd1b..a2e31c28 100755 --- a/sandbox/start-sandbox +++ b/sandbox/start-sandbox @@ -40,6 +40,9 @@ make_sandbox() { if [ -n "$SQL_MODE" ]; then echo "sql-mode=$SQL_MODE" >> /tmp/$port/my.sandbox.cnf fi + if [ -n "$GENLOG" ]; then + echo "log=genlog" >> /tmp/$port/my.sandbox.cnf + fi # Start the sandbox and check that it has InnoDB. /tmp/$port/start diff --git a/t/lib/TableSyncer.t b/t/lib/TableSyncer.t index ed4e2d31..ebbe14bc 100644 --- a/t/lib/TableSyncer.t +++ b/t/lib/TableSyncer.t @@ -25,9 +25,11 @@ use TableParser; use ChangeHandler; use RowDiff; use RowSyncer; +use RowSyncerBidirectional; use RowChecksum; use DSNParser; use Cxn; +use Transformers; use Sandbox; use PerconaTest; @@ -46,7 +48,7 @@ elsif ( !$dst_dbh ) { plan skip_all => 'Cannot connect to sandbox slave'; } else { - plan tests => 61; + plan tests => 33; } $sb->create_dbs($dbh, ['test']); @@ -82,14 +84,14 @@ $o->get_opts(); my $src_cxn = new Cxn( DSNParser => $dp, OptionParser => $o, - dsn => "h=127,P=12345", + dsn_string => "h=127.1,P=12345,u=msandbox,p=msandbox", dbh => $src_dbh, ); my $dst_cxn = new Cxn( DSNParser => $dp, OptionParser => $o, - dsn => "h=127,P=12346", + dsn_string => "h=127.1,P=12346,u=msandbox,p=msandbox", dbh => $dst_dbh, ); @@ -99,12 +101,10 @@ my $dst; my $tbl_struct; my %actions; my @rows; -my $ch; -my $rs; sub new_ch { my ( $dbh, $queue ) = @_; - $ch = new ChangeHandler( + my $ch = new ChangeHandler( Quoter => $q, left_db => $src->{tbl}->{db}, left_tbl => $src->{tbl}->{tbl}, @@ -124,7 +124,7 @@ sub new_ch { } else { # default dst dbh for this test script - $dst_dbh->do($sql); + $dst_cxn->dbh()->do($sql); } } ], @@ -132,6 +132,7 @@ sub new_ch { queue => defined $queue ? $queue : 1, ); $ch->fetch_back($src_cxn->dbh()); + return $ch; } # Shortens/automates a lot of the setup needed for calling @@ -148,10 +149,10 @@ sub sync_table { $o->get_opts(); $tbl_struct = $tp->parse( - $tp->get_create_table($src_dbh, $src_db, $src_tbl)); + $tp->get_create_table($src_cxn->dbh(), $src_db, $src_tbl)); $src = { Cxn => $src_cxn, - misc_dbh => $dbh, + misc_dbh => $src_cxn->dbh(), is_source => 1, tbl => { db => $src_db, @@ -161,7 +162,7 @@ sub sync_table { }; $dst = { Cxn => $dst_cxn, - misc_dbh => $dbh, + misc_dbh => $src_cxn->dbh(), tbl => { db => $dst_db, tbl => $dst_tbl, @@ -169,19 +170,20 @@ sub sync_table { }, }; @rows = (); - new_ch(); - $rs = new RowSyncer( - ChangeHandler => $ch, - ); + my $ch = $args{ChangeHandler} || new_ch(); + my $rs = $args{RowSyncer} || new RowSyncer(ChangeHandler => $ch, + OptionParser => $o); + return if $args{fake}; %actions = $syncer->sync_table( src => $src, dst => $dst, RowSyncer => $rs, ChangeHandler => $ch, trace => 0, + changing_src => $args{changing_src}, + one_nibble => $args{one_nibble}, ); - - return; + return \%actions; } # ########################################################################### @@ -268,8 +270,10 @@ is_deeply( # Check that the plugins can resolve unique key violations. # ############################################################################# sync_table( - src => "test.test3", - dst => "test.test4", + src => "test.test3", + dst => "test.test4", + argv => [qw(--chunk-size 1)], + one_nibble => 0, ); is_deeply( @@ -281,6 +285,19 @@ is_deeply( # ########################################################################### # Test locking. # ########################################################################### +sub clear_genlogs { + my ($msg) = @_; + if ( $msg ) { + `echo "xxx $msg" >> /tmp/12345/data/genlog`; + `echo "xxx $msg" >> /tmp/12346/data/genlog`; + } + else { + `echo > /tmp/12345/data/genlog`; + `echo > /tmp/12346/data/genlog`; + } + warn "cleared" +} + sync_table( src => "test.test1", dst => "test.test2", @@ -337,28 +354,16 @@ throws_ok ( # Kill the DBHs it in the right order: there's a connection waiting on # a lock. -$src_cxn = undef; -$dst_cxn = undef; -$src_dbh = $sb->get_dbh_for('master'); -$dst_dbh = $sb->get_dbh_for('slave1'); -$src_cxn = new Cxn( - DSNParser => $dp, - OptionParser => $o, - dsn => "h=127,P=12345", - dbh => $src_dbh, -); -$dst_cxn = new Cxn( - DSNParser => $dp, - OptionParser => $o, - dsn => "h=127,P=12346", - dbh => $dst_dbh, -); +$src_cxn->dbh()->disconnect(); +$dst_cxn->dbh()->disconnect(); +$dst_cxn->connect(); +$src_cxn->connect(); # ########################################################################### # Test TableSyncGroupBy. # ########################################################################### $sb->load_file('master', 't/lib/samples/before-TableSyncGroupBy.sql'); -sleep 1; +PerconaTest::wait_for_table($dst_cxn->dbh(), "test.test2", "a=4"); sync_table( src => "test.test1", @@ -366,7 +371,7 @@ sync_table( ); is_deeply( - $dst_dbh->selectall_arrayref('select * from test.test2 order by a, b, c', { Slice => {}} ), + $dst_cxn->dbh()->selectall_arrayref('select * from test.test2 order by a, b, c', { Slice => {}} ), [ { a => 1, b => 2, c => 3 }, { a => 1, b => 2, c => 3 }, @@ -381,16 +386,16 @@ is_deeply( ], 'Table synced with GroupBy', ); -exit; + # ############################################################################# # Issue 96: mk-table-sync: Nibbler infinite loop # ############################################################################# $sb->load_file('master', 't/lib/samples/issue_96.sql'); -sleep 1; +PerconaTest::wait_for_table($dst_cxn->dbh(), "issue_96.t2", "from_city='jr'"); # Make paranoid-sure that the tables differ. -my $r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4'); -my $r2 = $dst_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t2 WHERE package_id=4'); +my $r1 = $src_cxn->dbh()->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4'); +my $r2 = $dst_cxn->dbh()->selectall_arrayref('SELECT from_city FROM issue_96.t2 WHERE package_id=4'); is_deeply( [ $r1->[0]->[0], $r2->[0]->[0] ], [ 'ta', 'zz' ], @@ -398,12 +403,12 @@ is_deeply( ); sync_table( - src => "issue_96.t", - dst => "issue_96.t2", + src => "issue_96.t", + dst => "issue_96.t2", ); -$r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4'); -$r2 = $dst_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t2 WHERE package_id=4'); +$r1 = $src_cxn->dbh()->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4'); +$r2 = $dst_cxn->dbh()->selectall_arrayref('SELECT from_city FROM issue_96.t2 WHERE package_id=4'); # Other tests below rely on this table being synced, so die # if it fails to sync. @@ -417,13 +422,9 @@ is( # Test check_permissions(). # ############################################################################# -SKIP: { - skip "Not tested on MySQL $sandbox_version", 5 - unless $sandbox_version gt '4.0'; - # Re-using issue_96.t from above. is( - $syncer->have_all_privs($src->{dbh}, 'issue_96', 't'), + $syncer->have_all_privs($src_cxn->dbh(), 'issue_96', 't'), 1, 'Have all privs' ); @@ -462,19 +463,20 @@ is( ); diag(`/tmp/12345/use -u root -e "DROP USER 'bob'"`); -} # ########################################################################### # Test that the calback gives us the src and dst sql. # ########################################################################### # Re-using issue_96.t from above. The tables are already in sync so there # should only be 1 sync cycle. +SKIP: { + skip "TODO", 1; my @sqls; sync_table( - src => "issue_96.t", - dst => "issue_96.t2", - chunk_size => 1000, - callback => sub { push @sqls, @_; }, + src => "issue_96.t", + dst => "issue_96.t2", + argv => [qw(--chunk-size 1000)], + callback => sub { push @sqls, @_; }, ); my $queries = ($sandbox_version gt '4.0' ? @@ -492,36 +494,7 @@ is_deeply( $queries, 'Callback gives src and dst sql' ); - -# ############################################################################# -# Test that make_checksum_queries() doesn't pass replicate. -# ############################################################################# - -# Re-using issue_96.* tables from above. - -$queries = ($sandbox_version gt '4.0' ? - [ - 'SELECT /*PROGRESS_COMMENT*//*CHUNK_NUM*/ COUNT(*) AS cnt, COALESCE(LOWER(CONCAT(LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 1, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc, 17, 16), 16, 10) AS UNSIGNED)), 10, 16), 16, \'0\'), LPAD(CONV(BIT_XOR(CAST(CONV(SUBSTRING(@crc := SHA1(CONCAT_WS(\'#\', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))), 33, 8), 16, 10) AS UNSIGNED)), 10, 16), 8, \'0\'))), 0) AS crc FROM /*DB_TBL*//*INDEX_HINT*//*WHERE*/', - "`package_id`, `location`, `from_city`, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`))))", - ] : - [ - "SELECT /*PROGRESS_COMMENT*//*CHUNK_NUM*/ COUNT(*) AS cnt, COALESCE(RIGHT(MAX(\@crc := CONCAT(LPAD(\@cnt := \@cnt + 1, 16, '0'), SHA1(CONCAT(\@crc, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`)))))))), 40), 0) AS crc FROM /*DB_TBL*//*INDEX_HINT*//*WHERE*/", - "`package_id`, `location`, `from_city`, SHA1(CONCAT_WS('#', `package_id`, `location`, `from_city`, CONCAT(ISNULL(`package_id`), ISNULL(`location`), ISNULL(`from_city`))))", - ], -); - -@sqls = $syncer->make_checksum_queries( - replicate => 'bad', - src => $src, - dst => $dst, - tbl_struct => $tbl_struct, - function => 'SHA1', -); -is_deeply( - \@sqls, - $queries, - 'make_checksum_queries() does not pass replicate arg' -); +}; # ############################################################################# # Issue 464: Make mk-table-sync do two-way sync @@ -533,45 +506,13 @@ SKIP: { skip 'Cannot connect to second sandbox master', 7 unless $dbh3; my $sync_chunk; - sub set_bidi_callbacks { - $sync_chunk->set_callback('same_row', sub { - my ( %args ) = @_; - my ($lr, $rr, $syncer) = @args{qw(lr rr syncer)}; - my $ch = $syncer->{ChangeHandler}; - my $change_dbh; - my $auth_row; - - my $left_ts = $lr->{ts}; - my $right_ts = $rr->{ts}; - MKDEBUG && TableSyncer::_d("left ts: $left_ts"); - MKDEBUG && TableSyncer::_d("right ts: $right_ts"); - - my $cmp = ($left_ts || '') cmp ($right_ts || ''); - if ( $cmp == -1 ) { - MKDEBUG && TableSyncer::_d("right dbh $dbh3 is newer; update left dbh $src_dbh"); - $ch->set_src('right', $dbh3); - $auth_row = $args{rr}; - $change_dbh = $src_dbh; - } - elsif ( $cmp == 1 ) { - MKDEBUG && TableSyncer::_d("left dbh $src_dbh is newer; update right dbh $dbh3"); - $ch->set_src('left', $src_dbh); - $auth_row = $args{lr}; - $change_dbh = $dbh3; - } - return ('UPDATE', $auth_row, $change_dbh); - }); - $sync_chunk->set_callback('not_in_right', sub { - my ( %args ) = @_; - $args{syncer}->{ChangeHandler}->set_src('left', $src_dbh); - return 'INSERT', $args{lr}, $dbh3; - }); - $sync_chunk->set_callback('not_in_left', sub { - my ( %args ) = @_; - $args{syncer}->{ChangeHandler}->set_src('right', $dbh3); - return 'INSERT', $args{rr}, $src_dbh; - }); - }; + # Switch "source" to master2 (12348). + $dst_cxn = new Cxn( + DSNParser => $dp, + OptionParser => $o, + dsn_string => "h=127.1,P=12345,u=msandbox,p=msandbox", + dbh => $dbh3, + ); # Proper data on both tables after bidirectional sync. my $bidi_data = @@ -604,29 +545,33 @@ SKIP: { # Load remote data. $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql'); - set_bidi_callbacks(); - $tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'bidi', 't')); - $src->{db} = 'bidi'; - $src->{tbl} = 't'; - $dst->{db} = 'bidi'; - $dst->{tbl} = 't'; - $dst->{dbh} = $dbh3; # Must set $dbh3 here and - - my %args = ( - src => $src, - dst => $dst, - tbl_struct => $tbl_struct, - cols => [qw(ts)], # Compare only ts col when chunks differ. - ChangeHandler => new_ch($dbh3, 0), # here to override $dst_dbh. - RowDiff => $rd, - chunk_size => 2, + # This is hack to get things setup correctly. + sync_table( + src => "bidi.t", + dst => "bidi.t", + ChangeHandler => 1, + RowSyncer => 1, + fake => 1, + ); + my $ch = new_ch($dbh3, 0); + my $rs = new RowSyncerBidirectional( + ChangeHandler => $ch, + OptionParser => $o, + ); + sync_table( + src => "bidi.t", + dst => "bidi.t", + changing_src => 1, + argv => [qw(--chunk-size 2 + --conflict-error ignore + --conflict-column ts + --conflict-comparison newest)], + ChangeHandler => $ch, + RowSyncer => $rs, ); - @rows = (); - $syncer->sync_table(%args); - - my $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id'); + my $res = $src_cxn->dbh()->selectall_arrayref('select * from bidi.t order by id'); is_deeply( $res, $bidi_data, @@ -647,13 +592,33 @@ SKIP: { $sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql'); - set_bidi_callbacks(); - $args{ChangeHandler} = new_ch($dbh3, 0); - @rows = (); - $syncer->sync_table(%args, chunk_size => 10); + # This is hack to get things setup correctly. + sync_table( + src => "bidi.t", + dst => "bidi.t", + ChangeHandler => 1, + RowSyncer => 1, + fake => 1, + ); + $ch = new_ch($dbh3, 0); + $rs = new RowSyncerBidirectional( + ChangeHandler => $ch, + OptionParser => $o, + ); + sync_table( + src => "bidi.t", + dst => "bidi.t", + changing_src => 1, + argv => [qw(--chunk-size 10 + --conflict-error ignore + --conflict-column ts + --conflict-comparison newest)], + ChangeHandler => $ch, + RowSyncer => $rs, + ); - $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id'); + $res = $src_cxn->dbh()->selectall_arrayref('select * from bidi.t order by id'); is_deeply( $res, $bidi_data, @@ -674,13 +639,33 @@ SKIP: { $sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql'); - set_bidi_callbacks(); - $args{ChangeHandler} = new_ch($dbh3, 0); - @rows = (); + + # This is hack to get things setup correctly. + sync_table( + src => "bidi.t", + dst => "bidi.t", + ChangeHandler => 1, + RowSyncer => 1, + fake => 1, + ); + $ch = new_ch($dbh3, 0); + $rs = new RowSyncerBidirectional( + ChangeHandler => $ch, + OptionParser => $o, + ); + sync_table( + src => "bidi.t", + dst => "bidi.t", + changing_src => 1, + argv => [qw(--chunk-size 1000 + --conflict-error ignore + --conflict-column ts + --conflict-comparison newest)], + ChangeHandler => $ch, + RowSyncer => $rs, + ); - $syncer->sync_table(%args, chunk_size => 100000); - - $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id'); + $res = $src_cxn->dbh()->selectall_arrayref('select * from bidi.t order by id'); is_deeply( $res, $bidi_data, @@ -697,14 +682,22 @@ SKIP: { # ######################################################################## # See TableSyncer.pm for why this is so. # ######################################################################## - $args{ChangeHandler} = new_ch($dbh3, 1); - throws_ok( - sub { $syncer->sync_table(%args, bidirectional => 1) }, - qr/Queueing does not work with bidirectional syncing/, - 'Queueing does not work with bidirectional syncing' - ); + # $args{ChangeHandler} = new_ch($dbh3, 1); + # throws_ok( + # sub { $syncer->sync_table(%args, bidirectional => 1) }, + # qr/Queueing does not work with bidirectional syncing/, + # 'Queueing does not work with bidirectional syncing' + #); diag(`$trunk/sandbox/stop-sandbox 12348 >/dev/null &`); + + # Set dest back to slave1 (12346). + $dst_cxn = new Cxn( + DSNParser => $dp, + OptionParser => $o, + dsn_string => "h=127.1,P=12346,u=msandbox,p=msandbox", + dbh => $dst_dbh, + ); } # ############################################################################# @@ -712,15 +705,16 @@ SKIP: { # ############################################################################# # Sandbox::get_dbh_for() defaults to AutoCommit=1. Autocommit must # be off else commit() will cause an error. -$dbh = $sb->get_dbh_for('master', {AutoCommit=>0}); -$src_dbh = $sb->get_dbh_for('master', {AutoCommit=>0}); -$dst_dbh = $sb->get_dbh_for('slave1', {AutoCommit=>0}); +$dbh = $sb->get_dbh_for('master', {AutoCommit=>0}); +$src_cxn->dbh()->disconnect(); +$dst_cxn->dbh()->disconnect(); +$src_cxn->set_dbh($sb->get_dbh_for('master', {AutoCommit=>0})); +$dst_cxn->set_dbh($sb->get_dbh_for('slave1', {AutoCommit=>0})); sync_table( - src => "test.test1", - dst => "test.test1", - transaction => 1, - lock => 1, + src => "test.test1", + dst => "test.test1", + argv => [qw(--transaction --lock 1)], ); # There are no diffs. This just tests that the code doesn't crash @@ -731,27 +725,22 @@ is_deeply( "Sync with transaction" ); +sync_table( + src => "sakila.actor", + dst => "sakila.actor", + fake => 1, # don't actually sync +); $syncer->lock_and_wait( - src => { - dbh => $src_dbh, - db => 'sakila', - tbl => 'actor', - }, - dst => { - dbh => $dst_dbh, - db => 'sakila', - tbl => 'actor', - }, - lock => 1, lock_level => 1, - transaction => 1, + host => $src, + src => $src, ); -my $cid = $src_dbh->selectrow_arrayref("SELECT CONNECTION_ID()")->[0]; -$src_dbh->do("SELECT * FROM sakila.actor WHERE 1=1 LIMIT 2 FOR UPDATE"); -my $idb_status = $src_dbh->selectrow_hashref("SHOW /*!40100 ENGINE*/ INNODB STATUS"); -$src_dbh->commit(); +my $cid = $src_cxn->dbh()->selectrow_arrayref("SELECT CONNECTION_ID()")->[0]; +$src_cxn->dbh()->do("SELECT * FROM sakila.actor WHERE 1=1 LIMIT 2 FOR UPDATE"); +my $idb_status = $src_cxn->dbh()->selectrow_hashref("SHOW /*!40100 ENGINE*/ INNODB STATUS"); +$src_cxn->dbh()->commit(); like( $idb_status->{status}, qr/MySQL thread id $cid, query id \d+/, @@ -762,10 +751,11 @@ like( # Issue 672: mk-table-sync should COALESCE to avoid undef # ############################################################################# $sb->load_file('master', "t/lib/samples/empty_tables.sql"); +PerconaTest::wait_for_table($dst_cxn->dbh(), 'et.et1'); sync_table( - src => 'et.et1', - dst => 'et.et1', + src => 'et.et1', + dst => 'et.et1', ); is_deeply( @@ -782,23 +772,18 @@ my $output = ''; { local *STDERR; open STDERR, '>', \$output; + sync_table( + src => "sakila.actor", + dst => "sakila.actor", + fake => 1, # don't actually sync + argv => [qw(--lock 1 --wait 60)], + ); throws_ok( sub { $syncer->lock_and_wait( - src => { - dbh => $src_dbh, - db => 'sakila', - tbl => 'actor', - misc_dbh => $dbh, - }, - dst => { - dbh => $dst_dbh, - db => 'sakila', - tbl => 'actor', - }, - lock => 1, - lock_level => 1, - wait => 60, + lock_level => 1, + host => $dst, + src => $src, wait_retry_args => { wait => 1, tries => 2, @@ -824,6 +809,6 @@ like( qr/Complete test coverage/, '_d() works' ); -$sb->wipe_clean($src_dbh); -$sb->wipe_clean($dst_dbh); +$sb->wipe_clean($src_cxn->dbh()); +$sb->wipe_clean($dst_cxn->dbh()); exit;