From 4c8fd5c08089accbcbad4b307234d14e38122368 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Wed, 21 Dec 2011 01:19:50 -0700 Subject: [PATCH] Rewrite TableSyncer to use only NibbleIterator. Add RowSyncer. Return hashref from NibbleIterator. --- lib/NibbleIterator.pm | 28 +- lib/RowChecksum.pm | 2 +- lib/RowSyncer.pm | 104 +++++++ lib/TableSyncer.pm | 664 ++++++++++++++++++------------------------ t/lib/TableSyncer.t | 480 ++++++++---------------------- 5 files changed, 526 insertions(+), 752 deletions(-) create mode 100644 lib/RowSyncer.pm diff --git a/lib/NibbleIterator.pm b/lib/NibbleIterator.pm index 88d4d6a2..61208884 100644 --- a/lib/NibbleIterator.pm +++ b/lib/NibbleIterator.pm @@ -64,8 +64,11 @@ sub new { my $where = $o->get('where'); my ($row_est, $mysql_index) = get_row_estimate(%args, where => $where); + my $chunk_size_limit = $o->has('chunk-size-limit') + ? $o->get('chunk-size-limit') + : 1; my $one_nibble = !defined $args{one_nibble} || $args{one_nibble} - ? $row_est <= $chunk_size * $o->get('chunk-size-limit') + ? $row_est <= $chunk_size * $chunk_size_limit : 0; MKDEBUG && _d('One nibble:', $one_nibble ? 'yes' : 'no'); @@ -246,11 +249,12 @@ sub new { }; } - $self->{row_est} = $row_est; - $self->{nibbleno} = 0; - $self->{have_rows} = 0; - $self->{rowno} = 0; - $self->{oktonibble} = 1; + $self->{row_est} = $row_est; + $self->{nibbleno} = 0; + $self->{have_rows} = 0; + $self->{rowno} = 0; + $self->{oktonibble} = 1; + $self->{no_more_boundaries} = 0; return bless $self, $class; } @@ -307,12 +311,14 @@ sub next { if ( $self->{have_rows} ) { # Return rows in nibble. sth->{Active} is always true with # DBD::mysql v3, so we track the status manually. - my $row = $self->{nibble_sth}->fetchrow_arrayref(); + my $row = $self->{fetch_hashref} + ? $self->{nibble_sth}->fetchrow_hashref() + : $self->{nibble_sth}->fetchrow_arrayref(); if ( $row ) { $self->{rowno}++; MKDEBUG && _d('Row', $self->{rowno}, 'in nibble',$self->{nibbleno}); # fetchrow_arraryref re-uses an internal arrayref, so we must copy. - return [ @$row ]; + return $self->{fetch_hashref} ? $row : [ @$row ]; } } @@ -413,6 +419,12 @@ sub more_boundaries { return !$self->{no_more_boundaries}; } +sub no_more_rows { + my ($self) = @_; + $self->{have_rows} = 0; + return; +} + sub row_estimate { my ($self) = @_; return $self->{row_est}; diff --git a/lib/RowChecksum.pm b/lib/RowChecksum.pm index 03b5dd58..955c3826 100644 --- a/lib/RowChecksum.pm +++ b/lib/RowChecksum.pm @@ -93,7 +93,7 @@ sub make_row_checksum { } if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) { - my $sep = $o->get('separator') || '#'; + my $sep = ($o->has('separator') && $o->get('separator')) || '#'; $sep =~ s/'//g; $sep ||= '#'; diff --git a/lib/RowSyncer.pm b/lib/RowSyncer.pm new file mode 100644 index 00000000..70de326e --- /dev/null +++ b/lib/RowSyncer.pm @@ -0,0 +1,104 @@ +# This program is copyright 2011 Percona Inc. +# Feedback and improvements are welcome. +# +# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED +# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF +# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar +# systems, you can issue `man perlgpl' or `man perlartistic' to read these +# licenses. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 59 Temple +# Place, Suite 330, Boston, MA 02111-1307 USA. +# ########################################################################### +# RowSyncer package +# ########################################################################### +{ +# Package: RowSyncer +# RowSyncer syncs a destination row to a source row. +package RowSyncer; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant MKDEBUG => $ENV{MKDEBUG} || 0; + +use Data::Dumper; +$Data::Dumper::Indent = 1; +$Data::Dumper::Sortkeys = 1; +$Data::Dumper::Quotekeys = 0; + +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(ChangeHandler); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless defined $args{$arg}; + } + my $self = { + crc_col => 'crc', + %args, + }; + return bless $self, $class; +} + +sub set_crc_col { + my ($self, $crc_col) = @_; + $self->{crc_col} = $crc_col; + return; +} + +sub set_key_cols { + my ($self, $key_cols) = @_; + $self->{key_cols} = $key_cols; + return; +} + +sub key_cols { + my ($self) = @_; + return $self->{key_cols}; +} + +sub same_row { + my ($self, %args) = @_; + my ($lr, $rr) = @args{qw(lr rr)}; + if ( $lr->{$self->{crc_col}} ne $rr->{$self->{crc_col}} ) { + $self->{ChangeHandler}->change('UPDATE', $lr, $self->key_cols()); + } + return; +} + +sub not_in_right { + my ( $self, %args ) = @_; + # Row isn't in the dest, re-insert it in the source. + $self->{ChangeHandler}->change('INSERT', $args{lr}, $self->key_cols()); + return; +} + +sub not_in_left { + my ( $self, %args ) = @_; + # Row isn't in source, delete it from the dest. + $self->{ChangeHandler}->change('DELETE', $args{rr}, $self->key_cols()); + return; +} + +sub done_with_rows { + return; +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End RowSyncer package +# ########################################################################### diff --git a/lib/TableSyncer.pm b/lib/TableSyncer.pm index 7b7afc6a..c7b65191 100644 --- a/lib/TableSyncer.pm +++ b/lib/TableSyncer.pm @@ -1,4 +1,4 @@ -# This program is copyright 2007-2011 Baron Schwartz, 2011 Percona Inc. +# This program is copyright 2011 Percona Inc. # Feedback and improvements are welcome. # # THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED @@ -32,16 +32,10 @@ $Data::Dumper::Indent = 1; $Data::Dumper::Sortkeys = 1; $Data::Dumper::Quotekeys = 0; -# Arguments: -# * MasterSlave A MasterSlave module -# * Quoter A Quoter module -# * VersionParser A VersionParser module -# * TableChecksum A TableChecksum module -# * Retry A Retry module -# * DSNParser (optional) sub new { my ( $class, %args ) = @_; - my @required_args = qw(MasterSlave Quoter VersionParser TableChecksum Retry); + my @required_args = qw(MasterSlave OptionParser Quoter TableParser + TableNibbler RowChecksum RowDiff Retry); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } @@ -49,29 +43,6 @@ sub new { return bless $self, $class; } -# Return the first plugin from the arrayref of TableSync* plugins -# that can sync the given table struct. plugin->can_sync() usually -# returns a hashref that it wants back when plugin->prepare_to_sync() -# is called. Or, it may return nothing (false) to say that it can't -# sync the table. -sub get_best_plugin { - my ( $self, %args ) = @_; - foreach my $arg ( qw(plugins tbl_struct) ) { - die "I need a $arg argument" unless $args{$arg}; - } - MKDEBUG && _d('Getting best plugin'); - foreach my $plugin ( @{$args{plugins}} ) { - MKDEBUG && _d('Trying plugin', $plugin->name); - my ($can_sync, %plugin_args) = $plugin->can_sync(%args); - if ( $can_sync ) { - MKDEBUG && _d('Can sync with', $plugin->name, Dumper(\%plugin_args)); - return $plugin, %plugin_args; - } - } - MKDEBUG && _d('No plugin can sync the table'); - return; -} - # Required arguments: # * plugins Arrayref of TableSync* modules, in order of preference # * src Hashref with source (aka left) dbh, db, tbl @@ -101,340 +72,278 @@ sub get_best_plugin { # * wait locking sub sync_table { my ( $self, %args ) = @_; - my @required_args = qw(plugins src dst tbl_struct cols chunk_size - RowDiff ChangeHandler); + my @required_args = qw(src dst RowSyncer ChangeHandler); foreach my $arg ( @required_args ) { die "I need a $arg argument" unless $args{$arg}; } - MKDEBUG && _d('Syncing table with args:', - map { "$_: " . Dumper($args{$_}) } - qw(plugins src dst tbl_struct cols chunk_size)); + my ($src, $dst, $row_syncer, $changer) = @args{@required_args}; - my ($plugins, $src, $dst, $tbl_struct, $cols, $chunk_size, $rd, $ch) - = @args{@required_args}; - my $dp = $self->{DSNParser}; - $args{trace} = 1 unless defined $args{trace}; + my $o = $self->{OptionParser}; + my $q = $self->{Quoter}; + my $row_diff = $self->{RowDiff}; + my $row_checksum = $self->{RowChecksum}; - if ( $args{bidirectional} && $args{ChangeHandler}->{queue} ) { - # This should be checked by the caller but just in case... - die "Queueing does not work with bidirectional syncing"; + # USE db on src and dst for cases like when replicate-do-db is being used. + foreach my $host ( $src, $dst ) { + $host->{Cxn}->dbh()->do("USE " . $q->quote($host->{tbl}->{db})); } - $args{index_hint} = 1 unless defined $args{index_hint}; - $args{lock} ||= 0; - $args{wait} ||= 0; - $args{transaction} ||= 0; - $args{timeout_ok} ||= 0; + return $changer->get_changes() if $o->get('dry-run'); - my $q = $self->{Quoter}; - my $vp = $self->{VersionParser}; + my $trace; + if ( !defined $args{trace} || $args{trace} ) { + chomp(my $hostname = `hostname`); + $trace = "src_host:" . $src->{Cxn}->name() + . " src_tbl:" . join('.', @{$src->{tbl}}{qw(db tbl)}) + . "dst_host:" . $dst->{Cxn}->name() + . " dst_tbl:" . join('.', @{$dst->{tbl}}{qw(db tbl)}) + . " changing_src: " . ($args{changing_src} ? "yes" : "no") + . " " . join(" ", map { "$_:" . ($o->get($_) ? "yes" : "no") } + qw(lock transaction replicate bidirectional)) + . " pid:$PID " + . ($ENV{USER} ? "user:$ENV{USER} " : "") + . ($hostname ? "host:$hostname" : ""); + MKDEBUG && _d("Binlog trace message:", $trace); + } - # ######################################################################## - # Get and prepare the first plugin that can sync this table. - # ######################################################################## - my ($plugin, %plugin_args) = $self->get_best_plugin(%args); - die "No plugin can sync $src->{db}.$src->{tbl}" unless $plugin; + # Make NibbleIterator for checksumming chunks of rows to see if + # there are any diffs. + my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh()); + my $chunk_cols = $row_checksum->make_chunk_checksum( + dbh => $src->{Cxn}->dbh(), + tbl => $src->{tbl}, + %crc_args + ); - # The row-level (state 2) checksums use __crc, so the table can't use that. - my $crc_col = '__crc'; - while ( $tbl_struct->{is_col}->{$crc_col} ) { + if ( !defined $src->{sql_lock} || !defined $dst->{dst_lock} ) { + if ( $o->get('transaction') ) { + if ( $o->get('bidirectional') ) { + # Making changes on src and dst. + $src->{sql_lock} = 'FOR UPDATE'; + $dst->{sql_lock} = 'FOR UPDATE'; + } + elsif ( $args{changing_src} ) { + # Making changes on master (src) which replicate to slave (dst). + $src->{sql_lock} = 'FOR UPDATE'; + $dst->{sql_lock} = 'LOCK IN SHARE MODE'; + } + else { + # Making changes on slave (dst). + $src->{sql_lock} = 'LOCK IN SHARE MODE'; + $dst->{sql_lock} = 'FOR UPDATE'; + } + } + else { + $src->{sql_lock} = ''; + $dst->{sql_lock} = ''; + } + MKDEBUG && _d('src sql lock:', $src->{sql_lock}); + MKDEBUG && _d('dst sql lock:', $dst->{sql_lock}); + } + + my $user_where = $o->get('where'); + + foreach my $host ($src, $dst) { + my $callbacks = { + init => sub { + my (%args) = @_; + my $nibble_iter = $args{NibbleIterator}; + my $sths = $nibble_iter->statements(); + + if ( $o->get('buffer-to-client') ) { + $host->{sth}->{mysql_use_result} = 1; + } + + # Lock the table. + $self->lock_and_wait( + lock_level => 2, + host => $host, + src => $src, + OptionParser => $o, + ); + + return 1; + }, + exec_nibble => sub { + my (%args) = @_; + my $nibble_iter = $args{NibbleIterator}; + my $sths = $nibble_iter->statements(); + my $boundary = $nibble_iter->boundaries(); + + # Lock the chunk. + $self->lock_and_wait( + lock_level => 1, + host => $host, + src => $src, + OptionParser => $o, + ); + + # Execute the chunk checksum statement. + # The nibble iter will return the row. + MKDEBUG && _d('nibble', $args{Cxn}->name()); + $sths->{nibble}->execute(@{$boundary->{lower}}, @{$boundary->{upper}}); + return $sths->{nibble}->rows(); + }, + }; + + $host->{nibble_iter} = new NibbleIterator( + Cxn => $host->{Cxn}, + tbl => $host->{tbl}, + chunk_size => $o->get('chunk-size'), + chunk_index => $o->get('chunk-index'), + select => $chunk_cols, + callbacks => $callbacks, + fetch_hashref => 1, + OptionParser => $self->{OptionParser}, + Quoter => $self->{Quoter}, + TableNibbler => $self->{TableNibbler}, + TableParser => $self->{TableParser}, + RowChecksum => $self->{RowChecksum}, + ); + } + + my $index = $src->{nibble_iter}->nibble_index(); + my $key_cols = $index ? $src->{tbl}->{tbl_struct}->{keys}->{$index}->{cols} + : $src->{tbl}->{tbl_struct}->{cols}; + $row_syncer->set_key_cols($key_cols); + + my $crc_col = 'crc'; + while ( $src->{tbl}->{tbl_struct}->{is_col}->{$crc_col} ) { $crc_col = "_$crc_col"; # Prepend more _ until not a column. } + $row_syncer->set_crc_col($crc_col); MKDEBUG && _d('CRC column:', $crc_col); - # Make an index hint for either the explicitly given chunk_index - # or the chunk_index chosen by the plugin if index_hint is true. - my $index_hint; - my $hint = ($vp->version_ge($src->{dbh}, '4.0.9') - && $vp->version_ge($dst->{dbh}, '4.0.9') ? 'FORCE' : 'USE') - . ' INDEX'; - if ( $args{chunk_index} ) { - MKDEBUG && _d('Using given chunk index for index hint'); - $index_hint = "$hint (" . $q->quote($args{chunk_index}) . ")"; - } - elsif ( $plugin_args{chunk_index} && $args{index_hint} ) { - MKDEBUG && _d('Using chunk index chosen by plugin for index hint'); - $index_hint = "$hint (" . $q->quote($plugin_args{chunk_index}) . ")"; - } - MKDEBUG && _d('Index hint:', $index_hint); - - eval { - $plugin->prepare_to_sync( - %args, - %plugin_args, - dbh => $src->{dbh}, - db => $src->{db}, - tbl => $src->{tbl}, - crc_col => $crc_col, - index_hint => $index_hint, + foreach my $host ($src, $dst) { + my $row_cols = $row_checksum->make_row_checksum( + dbh => $host->{Cxn}->dbh(), + tbl => $host->{tbl}, + %crc_args, ); - }; - if ( $EVAL_ERROR ) { - # At present, no plugin should fail to prepare, but just in case... - die 'Failed to prepare TableSync', $plugin->name, ' plugin: ', - $EVAL_ERROR; - } + my $nibble_iter = $host->{nibble_iter}; - # Some plugins like TableSyncChunk use checksum queries, others like - # TableSyncGroupBy do not. For those that do, make chunk (state 0) - # and row (state 2) checksum queries. - if ( $plugin->uses_checksum() ) { - eval { - my ($chunk_sql, $row_sql) = $self->make_checksum_queries(%args); - $plugin->set_checksum_queries($chunk_sql, $row_sql); - }; - if ( $EVAL_ERROR ) { - # This happens if src and dst are really different and the same - # checksum algo and hash func can't be used on both. - die "Failed to make checksum queries: $EVAL_ERROR"; + if ( $nibble_iter->one_nibble() ) { + my $rows_sql + = 'SELECT /*rows in nibble*/ ' + . ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '') + . "$row_cols AS $crc_col" + . " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)}) + . " WHERE 1=1 " + . ($user_where ? " AND ($user_where)" : ''); + $host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql); + } + else { + my $sql = $nibble_iter->sql(); + my $rows_sql + = 'SELECT /*rows in nibble*/ ' + . ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '') + . "$row_cols AS $crc_col" + . " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)}) + . " WHERE " . $sql->{boundaries}->{'>='} # lower boundary + . " AND " . $sql->{boundaries}->{'<='} # upper boundary + . ($user_where ? " AND ($user_where)" : '') + . " ORDER BY " . $sql->{order_by}; + $host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql); } - } - - # ######################################################################## - # Plugin is ready, return now if this is a dry run. - # ######################################################################## - if ( $args{dry_run} ) { - return $ch->get_changes(), ALGORITHM => $plugin->name; } # ######################################################################## # Start syncing the table. # ######################################################################## + while ( grep { $_->{nibble_iter}->more_boundaries() } ($src, $dst) ) { + my $src_chunk = $src->{nibble_iter}->next(); + my $dst_chunk = $dst->{nibble_iter}->next(); - # USE db on src and dst for cases like when replicate-do-db is being used. - eval { - $src->{dbh}->do("USE `$src->{db}`"); - $dst->{dbh}->do("USE `$dst->{db}`"); - }; - if ( $EVAL_ERROR ) { - # This shouldn't happen, but just in case. (The db and tbl on src - # and dst should be checked before calling this sub.) - die "Failed to USE database on source or destination: $EVAL_ERROR"; - } - - # For bidirectional syncing it's important to know on which dbh - # changes are made or rows are fetched. This identifies the dbhs, - # then you can search for each one by its address like - # "dbh DBI::db=HASH(0x1028b38)". - MKDEBUG && _d('left dbh', $src->{dbh}); - MKDEBUG && _d('right dbh', $dst->{dbh}); - - chomp(my $hostname = `hostname`); - my $trace_msg - = $args{trace} ? "src_db:$src->{db} src_tbl:$src->{tbl} " - . ($dp && $src->{dsn} ? "src_dsn:".$dp->as_string($src->{dsn}) : "") - . " dst_db:$dst->{db} dst_tbl:$dst->{tbl} " - . ($dp && $dst->{dsn} ? "dst_dsn:".$dp->as_string($dst->{dsn}) : "") - . " " . join(" ", map { "$_:" . ($args{$_} || 0) } - qw(lock transaction changing_src replicate bidirectional)) - . " pid:$PID " - . ($ENV{USER} ? "user:$ENV{USER} " : "") - . ($hostname ? "host:$hostname" : "") - : ""; - MKDEBUG && _d("Binlog trace message:", $trace_msg); - - $self->lock_and_wait(%args, lock_level => 2); # per-table lock - - my $callback = $args{callback}; - my $cycle = 0; - while ( !$plugin->done() ) { - - # Do as much of the work as possible before opening a transaction or - # locking the tables. - MKDEBUG && _d('Beginning sync cycle', $cycle); - my $src_sql = $plugin->get_sql( - database => $src->{db}, - table => $src->{tbl}, - where => $args{where}, - ); - my $dst_sql = $plugin->get_sql( - database => $dst->{db}, - table => $dst->{tbl}, - where => $args{where}, - ); - - if ( $args{transaction} ) { - if ( $args{bidirectional} ) { - # Making changes on src and dst. - $src_sql .= ' FOR UPDATE'; - $dst_sql .= ' FOR UPDATE'; + if ( $src_chunk->{cnt} != $dst_chunk->{cnt} + || $src_chunk->{crc} ne $dst_chunk->{crc} ) { + MKDEBUG && _d("Chunks differ"); + foreach my $host ($src, $dst) { + my $nibble_iter = $host->{nibble_iter}; + my $boundary = $nibble_iter->boundaries(); + MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement}, + 'params:', @{$boundary->{lower}}, @{$boundary->{upper}}); + $host->{rows_sth}->execute( + @{$boundary->{lower}}, @{$boundary->{upper}}); } - elsif ( $args{changing_src} ) { - # Making changes on master (src) which replicate to slave (dst). - $src_sql .= ' FOR UPDATE'; - $dst_sql .= ' LOCK IN SHARE MODE'; - } - else { - # Making changes on slave (dst). - $src_sql .= ' LOCK IN SHARE MODE'; - $dst_sql .= ' FOR UPDATE'; + $row_diff->compare_sets( + left_sth => $src->{rows_sth}, + right_sth => $dst->{rows_sth}, + tbl_struct => $src->{tbl}->{tbl_struct}, + syncer => $row_syncer, + ); + $changer->process_rows(1, $trace); + foreach my $host ($src, $dst) { + $host->{rows_sth}->finish(); } } - MKDEBUG && _d('src:', $src_sql); - MKDEBUG && _d('dst:', $dst_sql); - # Give callback a chance to do something with the SQL statements. - $callback->($src_sql, $dst_sql) if $callback; - - # Prepare each host for next sync cycle. This does stuff - # like reset/init MySQL accumulator vars, etc. - $plugin->prepare_sync_cycle($src); - $plugin->prepare_sync_cycle($dst); - - # Prepare SQL statements on host. These aren't real prepared - # statements (i.e. no ? placeholders); we just need sths to - # pass to compare_sets(). Also, we can control buffering - # (mysql_use_result) on the sths. - my $src_sth = $src->{dbh}->prepare($src_sql); - my $dst_sth = $dst->{dbh}->prepare($dst_sql); - if ( $args{buffer_to_client} ) { - $src_sth->{mysql_use_result} = 1; - $dst_sth->{mysql_use_result} = 1; + # Unlock the chunks. + foreach my $host ($src, $dst) { + $self->unlock( + lock_level => 1, + host => $host, + OptionParser => $o, + ); } - # The first cycle should lock to begin work; after that, unlock only if - # the plugin says it's OK (it may want to dig deeper on the rows it - # currently has locked). - my $executed_src = 0; - if ( !$cycle || !$plugin->pending_changes() ) { - # per-sync cycle lock - $executed_src - = $self->lock_and_wait(%args, src_sth => $src_sth, lock_level => 1); - } - - # The source sth might have already been executed by lock_and_wait(). - $src_sth->execute() unless $executed_src; - $dst_sth->execute(); - - # Compare rows in the two sths. If any differences are found - # (same_row, not_in_left, not_in_right), the appropriate $syncer - # methods are called to handle them. Changes may be immediate, or... - $rd->compare_sets( - left_sth => $src_sth, - right_sth => $dst_sth, - left_dbh => $src->{dbh}, - right_dbh => $dst->{dbh}, - syncer => $plugin, - tbl_struct => $tbl_struct, - ); - # ...changes may be queued and executed now. - $ch->process_rows(1, $trace_msg); - - MKDEBUG && _d('Finished sync cycle', $cycle); - $cycle++; + # Get next chunks. + $src->{nibble_iter}->no_more_rows(); + $dst->{nibble_iter}->no_more_rows(); } - $ch->process_rows(0, $trace_msg); + $changer->process_rows(0, $trace); - $self->unlock(%args, lock_level => 2); - - return $ch->get_changes(), ALGORITHM => $plugin->name; -} - -sub make_checksum_queries { - my ( $self, %args ) = @_; - my @required_args = qw(src dst tbl_struct); - foreach my $arg ( @required_args ) { - die "I need a $arg argument" unless $args{$arg}; - } - my ($src, $dst, $tbl_struct) = @args{@required_args}; - my $checksum = $self->{TableChecksum}; - - # Decide on checksumming strategy and store checksum query prototypes for - # later. - my $src_algo = $checksum->best_algorithm( - algorithm => 'BIT_XOR', - dbh => $src->{dbh}, - where => 1, - chunk => 1, - count => 1, - ); - my $dst_algo = $checksum->best_algorithm( - algorithm => 'BIT_XOR', - dbh => $dst->{dbh}, - where => 1, - chunk => 1, - count => 1, - ); - if ( $src_algo ne $dst_algo ) { - die "Source and destination checksum algorithms are different: ", - "$src_algo on source, $dst_algo on destination" - } - MKDEBUG && _d('Chosen algo:', $src_algo); - - my $src_func = $checksum->choose_hash_func(dbh => $src->{dbh}, %args); - my $dst_func = $checksum->choose_hash_func(dbh => $dst->{dbh}, %args); - if ( $src_func ne $dst_func ) { - die "Source and destination hash functions are different: ", - "$src_func on source, $dst_func on destination"; - } - MKDEBUG && _d('Chosen hash func:', $src_func); - - # Since the checksum algo and hash func are the same on src and dst - # it doesn't matter if we use src_algo/func or dst_algo/func. - - my $crc_wid = $checksum->get_crc_wid($src->{dbh}, $src_func); - my ($crc_type) = $checksum->get_crc_type($src->{dbh}, $src_func); - my $opt_slice; - if ( $src_algo eq 'BIT_XOR' && $crc_type !~ m/int$/ ) { - $opt_slice = $checksum->optimize_xor( - dbh => $src->{dbh}, - function => $src_func + # Unlock the table. + foreach my $host ($src, $dst) { + $self->unlock( + lock_level => 2, + host => $host, + OptionParser => $o, ); } - my $chunk_sql = $checksum->make_checksum_query( - %args, - db => $src->{db}, - tbl => $src->{tbl}, - algorithm => $src_algo, - function => $src_func, - crc_wid => $crc_wid, - crc_type => $crc_type, - opt_slice => $opt_slice, - replicate => undef, # replicate means something different to this sub - ); # than what we use it for; do not pass it! - MKDEBUG && _d('Chunk sql:', $chunk_sql); - my $row_sql = $checksum->make_row_checksum( - %args, - function => $src_func, - ); - MKDEBUG && _d('Row sql:', $row_sql); - return $chunk_sql, $row_sql; + return $changer->get_changes(); } sub lock_table { - my ( $self, $dbh, $where, $db_tbl, $mode ) = @_; - my $query = "LOCK TABLES $db_tbl $mode"; - MKDEBUG && _d($query); - $dbh->do($query); - MKDEBUG && _d('Acquired table lock on', $where, 'in', $mode, 'mode'); + my ( $self, %args ) = @_; + my @required_args = qw(host mode); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my ($host, $mode) = @args{@required_args}; + my $q = $self->{Quoter}; + my $sql = "LOCK TABLES " + . $q->quote(@{$host->{tbl}}{qw(db tbl)}) + . " $mode"; + MKDEBUG && _d($host->{Cxn}->name(), $sql); + $host->{Cxn}->dbh()->do($sql); + return; } # Doesn't work quite the same way as lock_and_wait. It will unlock any LOWER # priority lock level, not just the exact same one. sub unlock { my ( $self, %args ) = @_; - - foreach my $arg ( qw(src dst lock transaction lock_level) ) { + my @required_args = qw(lock_level host); + foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } - my $src = $args{src}; - my $dst = $args{dst}; + my ($lock_level, $host) = @args{@required_args}; + my $o = $self->{OptionParser}; - return unless $args{lock} && $args{lock} <= $args{lock_level}; + my $lock = $o->get('lock'); + return unless $lock && $lock <= $lock_level; + MKDEBUG && _d('Unlocking level', $lock); - # First, unlock/commit. - foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) { - if ( $args{transaction} ) { - MKDEBUG && _d('Committing', $dbh); - $dbh->commit(); - } - else { - my $sql = 'UNLOCK TABLES'; - MKDEBUG && _d($dbh, $sql); - $dbh->do($sql); - } + if ( $o->get('transaction') ) { + MKDEBUG && _d('Committing', $host->name()); + $host->{Cxn}->dbh()->commit(); + } + else { + my $sql = 'UNLOCK TABLES'; + MKDEBUG && _d($host->name(), $sql); + $host->{Cxn}->dbh()->do($sql); } return; @@ -458,73 +367,74 @@ sub unlock { # $src_sth was executed. sub lock_and_wait { my ( $self, %args ) = @_; - my $result = 0; - - foreach my $arg ( qw(src dst lock lock_level) ) { + my @required_args = qw(lock_level host src); + foreach my $arg ( @required_args ) { die "I need a $arg argument" unless defined $args{$arg}; } - my $src = $args{src}; - my $dst = $args{dst}; + my ($lock_level, $host, $src) = @args{@required_args}; + my $o = $self->{OptionParser}; - return unless $args{lock} && $args{lock} == $args{lock_level}; - MKDEBUG && _d('lock and wait, lock level', $args{lock}); + my $lock = $o->get('lock'); + return unless $lock && $lock == $lock_level; - # First, commit/unlock the previous transaction/lock. - foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) { - if ( $args{transaction} ) { - MKDEBUG && _d('Committing', $dbh); - $dbh->commit(); - } - else { - my $sql = 'UNLOCK TABLES'; - MKDEBUG && _d($dbh, $sql); - $dbh->do($sql); - } - } + return $host->{is_source} ? $self->_lock_src(%args) + : $self->_lock_dst(%args); +} - # User wants us to lock for consistency. But lock only on source initially; - # might have to wait for the slave to catch up before locking on the dest. - if ( $args{lock} == 3 ) { +sub _lock_src { + my ( $self, %args ) = @_; + my @required_args = qw(lock_level host src); + my ($lock_level, $host, $src) = @args{@required_args}; + + my $o = $self->{OptionParser}; + my $lock = $o->get('lock'); + MKDEBUG && _d('Locking', $host->{Cxn}->name(), 'level', $lock); + + if ( $lock == 3 ) { my $sql = 'FLUSH TABLES WITH READ LOCK'; - MKDEBUG && _d($src->{dbh}, $sql); - $src->{dbh}->do($sql); + MKDEBUG && _d($host->{Cxn}->name(), $sql); + $host->{Cxn}->dbh()->do($sql); } else { - # Lock level 2 (per-table) or 1 (per-sync cycle) - if ( $args{transaction} ) { - if ( $args{src_sth} ) { - # Execute the $src_sth on the source, so LOCK IN SHARE MODE/FOR - # UPDATE will lock the rows examined. - MKDEBUG && _d('Executing statement on source to lock rows'); - - my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */"; - MKDEBUG && _d($src->{dbh}, $sql); - $src->{dbh}->do($sql); - - $args{src_sth}->execute(); - $result = 1; - } + # Lock level 2 (per-table) or 1 (per-chunk). + if ( $o->get('transaction') ) { + my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */"; + MKDEBUG && _d($host->{Cxn}->name(), $sql); + $host->{Cxn}->dbh()->do($sql); } else { - $self->lock_table($src->{dbh}, 'source', - $self->{Quoter}->quote($src->{db}, $src->{tbl}), - $args{changing_src} ? 'WRITE' : 'READ'); + $self->lock_table( + host => $host, + mode => $args{changing_src} ? 'WRITE' : 'READ', + ); } } + return; +} +sub _lock_dst { + my ( $self, %args ) = @_; + my @required_args = qw(lock_level host src); + my ($lock_level, $host, $src) = @args{@required_args}; + + my $o = $self->{OptionParser}; + my $lock = $o->get('lock'); + MKDEBUG && _d('Locking', $host->{Cxn}->name(), 'level', $lock); + + # Wait for the dest to catchup to the source, then lock the dest. # If there is any error beyond this point, we need to unlock/commit. eval { - if ( my $timeout = $args{wait} ) { + if ( my $timeout = $o->get('wait') ) { my $ms = $self->{MasterSlave}; - my $tries = $args{wait_retry_args}->{tries} || 3; + my $tries = 3; my $wait; $self->{Retry}->retry( tries => $tries, - wait => sub { sleep $args{wait_retry_args}->{wait} || 10 }, + wait => sub { sleep 5; }, try => sub { my ( %args ) = @_; # Be careful using $args{...} in this callback! %args in - # here are the passed-in args, not the args to lock_and_wait(). + # here are the passed-in args, not the args to the sub. if ( $args{tryno} > 1 ) { warn "Retrying MASTER_POS_WAIT() for --wait $timeout..."; @@ -535,7 +445,7 @@ sub lock_and_wait { # $src_sth. $wait = $ms->wait_for_master( master_status => $ms->get_master_status($src->{misc_dbh}), - slave_dbh => $dst->{dbh}, + slave_dbh => $host->{Cxn}->dbh(), timeout => $timeout, ); if ( defined $wait->{result} && $wait->{result} != -1 ) { @@ -592,27 +502,24 @@ sub lock_and_wait { '(syncing via replication or sync-to-master)'); } else { - if ( $args{lock} == 3 ) { + if ( $lock == 3 ) { my $sql = 'FLUSH TABLES WITH READ LOCK'; - MKDEBUG && _d($dst->{dbh}, ',', $sql); - $dst->{dbh}->do($sql); + MKDEBUG && _d($host->{Cxn}->name(), $sql); + $host->{Cxn}->dbh()->do($sql); } - elsif ( !$args{transaction} ) { - $self->lock_table($dst->{dbh}, 'dest', - $self->{Quoter}->quote($dst->{db}, $dst->{tbl}), - $args{execute} ? 'WRITE' : 'READ'); + elsif ( !$o->get('transaction') ) { + $self->lock_table( + host => $host, + mode => 'READ', # $args{execute} ? 'WRITE' : 'READ') + ); } } }; if ( $EVAL_ERROR ) { # Must abort/unlock/commit so that we don't interfere with any further # tables we try to do. - if ( $args{src_sth}->{Active} ) { - $args{src_sth}->finish(); - } - foreach my $dbh ( $src->{dbh}, $dst->{dbh}, $src->{misc_dbh} ) { - next unless $dbh; - MKDEBUG && _d('Caught error, unlocking/committing on', $dbh); + foreach my $dbh ( $host->{Cxn}->dbh(), $src->{Cxn}->dbh() ) { + MKDEBUG && _d('Caught error, unlocking/committing', $dbh); $dbh->do('UNLOCK TABLES'); $dbh->commit() unless $dbh->{AutoCommit}; } @@ -620,7 +527,7 @@ sub lock_and_wait { die $EVAL_ERROR; } - return $result; + return; } # This query will check all needed privileges on the table without actually @@ -651,6 +558,7 @@ sub have_all_privs { return 0; } + sub _d { my ($package, undef, $line) = caller 0; @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } diff --git a/t/lib/TableSyncer.t b/t/lib/TableSyncer.t index 3ac42961..ed4e2d31 100644 --- a/t/lib/TableSyncer.t +++ b/t/lib/TableSyncer.t @@ -12,26 +12,22 @@ use English qw(-no_match_vars); use Test::More; # TableSyncer and its required modules: +use OptionParser; +use NibbleIterator; use TableSyncer; use MasterSlave; use Quoter; -use TableChecksum; -use VersionParser; +use RowChecksum; use Retry; -# The sync plugins: -use TableSyncChunk; -use TableSyncNibble; -use TableSyncGroupBy; -use TableSyncStream; -# Helper modules for the sync plugins: -use TableChunker; +use TableParser; use TableNibbler; -# Modules for sync(): +use TableParser; use ChangeHandler; use RowDiff; -# And other modules: -use TableParser; +use RowSyncer; +use RowChecksum; use DSNParser; +use Cxn; use Sandbox; use PerconaTest; @@ -56,53 +52,46 @@ else { $sb->create_dbs($dbh, ['test']); $sb->load_file('master', 't/lib/samples/before-TableSyncChunk.sql'); -my $q = new Quoter(); -my $tp = new TableParser(Quoter=>$q); - # ########################################################################### # Make a TableSyncer object. # ########################################################################### -throws_ok( - sub { new TableSyncer() }, - qr/I need a MasterSlave/, - 'MasterSlave required' -); -throws_ok( - sub { new TableSyncer(MasterSlave=>1) }, - qr/I need a Quoter/, - 'Quoter required' -); -throws_ok( - sub { new TableSyncer(MasterSlave=>1, Quoter=>1) }, - qr/I need a VersionParser/, - 'VersionParser required' -); -throws_ok( - sub { new TableSyncer(MasterSlave=>1, Quoter=>1, VersionParser=>1) }, - qr/I need a TableChecksum/, - 'TableChecksum required' -); +my $ms = new MasterSlave(); +my $o = new OptionParser(description => 'TableSyncer'); +my $q = new Quoter(); +my $tp = new TableParser(Quoter => $q); +my $tn = new TableNibbler(TableParser => $tp, Quoter => $q); +my $rc = new RowChecksum(OptionParser => $o, Quoter => $q); +my $rd = new RowDiff(dbh=>$dbh); +my $rt = new Retry(); -my $rd = new RowDiff(dbh=>$src_dbh); -my $ms = new MasterSlave(); -my $vp = new VersionParser(); -my $rt = new Retry(); -my $checksum = new TableChecksum( - Quoter => $q, - VersionParser => $vp, -); my $syncer = new TableSyncer( MasterSlave => $ms, + OptionParser => $o, Quoter => $q, - TableChecksum => $checksum, - VersionParser => $vp, - DSNParser => $dp, + TableParser => $tp, + TableNibbler => $tn, + RowChecksum => $rc, + RowDiff => $rd, Retry => $rt, ); isa_ok($syncer, 'TableSyncer'); -my $chunker = new TableChunker( Quoter => $q, TableParser => $tp ); -my $nibbler = new TableNibbler( Quoter => $q, TableParser => $tp ); +$o->get_specs("$trunk/bin/pt-table-sync"); +$o->get_opts(); + +my $src_cxn = new Cxn( + DSNParser => $dp, + OptionParser => $o, + dsn => "h=127,P=12345", + dbh => $src_dbh, +); + +my $dst_cxn = new Cxn( + DSNParser => $dp, + OptionParser => $o, + dsn => "h=127,P=12346", + dbh => $dst_dbh, +); # Global vars used/set by the subs below and accessed throughout the tests. my $src; @@ -110,37 +99,17 @@ my $dst; my $tbl_struct; my %actions; my @rows; -my ($sync_chunk, $sync_nibble, $sync_groupby, $sync_stream); -my $plugins = []; - -# Call this func to re-make/reset the plugins. -sub make_plugins { - $sync_chunk = new TableSyncChunk( - TableChunker => $chunker, - Quoter => $q, - ); - $sync_nibble = new TableSyncNibble( - TableNibbler => $nibbler, - TableChunker => $chunker, - TableParser => $tp, - Quoter => $q, - ); - $sync_groupby = new TableSyncGroupBy( Quoter => $q ); - $sync_stream = new TableSyncStream( Quoter => $q ); - - $plugins = [$sync_chunk, $sync_nibble, $sync_groupby, $sync_stream]; - - return; -} +my $ch; +my $rs; sub new_ch { my ( $dbh, $queue ) = @_; - return new ChangeHandler( + $ch = new ChangeHandler( Quoter => $q, - left_db => $src->{db}, - left_tbl => $src->{tbl}, - right_db => $dst->{db}, - right_tbl => $dst->{tbl}, + left_db => $src->{tbl}->{db}, + left_tbl => $src->{tbl}->{tbl}, + right_db => $dst->{tbl}->{db}, + right_tbl => $dst->{tbl}->{tbl}, actions => [ sub { my ( $sql, $change_dbh ) = @_; @@ -162,6 +131,7 @@ sub new_ch { replace => 0, queue => defined $queue ? $queue : 1, ); + $ch->fetch_back($src_cxn->dbh()); } # Shortens/automates a lot of the setup needed for calling @@ -173,99 +143,47 @@ sub sync_table { my ($src_db_tbl, $dst_db_tbl) = @args{qw(src dst)}; my ($src_db, $src_tbl) = $q->split_unquote($src_db_tbl); my ($dst_db, $dst_tbl) = $q->split_unquote($dst_db_tbl); - if ( $args{plugins} ) { - $plugins = $args{plugins}; - } - else { - make_plugins(); - } + + @ARGV = $args{argv} ? @{$args{argv}} : (); + $o->get_opts(); + $tbl_struct = $tp->parse( $tp->get_create_table($src_dbh, $src_db, $src_tbl)); $src = { - dbh => $src_dbh, - dsn => {h=>'127.1',P=>'12345',}, - misc_dbh => $dbh, - db => $src_db, - tbl => $src_tbl, + Cxn => $src_cxn, + misc_dbh => $dbh, + is_source => 1, + tbl => { + db => $src_db, + tbl => $src_tbl, + tbl_struct => $tbl_struct, + }, }; $dst = { - dbh => $dst_dbh, - dsn => {h=>'127.1',P=>'12346',}, - db => $dst_db, - tbl => $dst_tbl, + Cxn => $dst_cxn, + misc_dbh => $dbh, + tbl => { + db => $dst_db, + tbl => $dst_tbl, + tbl_struct => $tbl_struct, + }, }; @rows = (); + new_ch(); + $rs = new RowSyncer( + ChangeHandler => $ch, + ); %actions = $syncer->sync_table( - plugins => $plugins, src => $src, dst => $dst, - tbl_struct => $tbl_struct, - cols => $tbl_struct->{cols}, - chunk_size => $args{chunk_size} || 5, - dry_run => $args{dry_run}, - function => $args{function} || 'SHA1', - lock => $args{lock}, - transaction => $args{transaction}, - callback => $args{callback}, - RowDiff => $rd, - ChangeHandler => new_ch(), + RowSyncer => $rs, + ChangeHandler => $ch, trace => 0, ); return; } -# ########################################################################### -# Test get_best_plugin() (formerly best_algorithm()). -# ########################################################################### -make_plugins(); -$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test5')); -is_deeply( - [ - $syncer->get_best_plugin( - plugins => $plugins, - tbl_struct => $tbl_struct, - ) - ], - [ $sync_groupby ], - 'Best plugin GroupBy' -); - -$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test3')); -my ($plugin, %plugin_args) = $syncer->get_best_plugin( - plugins => $plugins, - tbl_struct => $tbl_struct, -); -is_deeply( - [ $plugin, \%plugin_args, ], - [ $sync_chunk, { chunk_index => 'PRIMARY', chunk_col => 'a', } ], - 'Best plugin Chunk' -); - -# With the introduction of char chunking (issue 568), test6 can be chunked -# with Chunk or Nibble. Chunk will be prefered. - -$tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'test', 'test6')); -($plugin, %plugin_args) = $syncer->get_best_plugin( - plugins => $plugins, - tbl_struct => $tbl_struct, -); -is_deeply( - [ $plugin, \%plugin_args, ], - [ $sync_chunk, { chunk_index => 'a', chunk_col => 'a'} ], - 'Best plugin Chunk (char chunking)' -); -# Remove TableSyncChunk to test that it can chunk that char col with Nibble too. -($plugin, %plugin_args) = $syncer->get_best_plugin( - plugins => [$sync_nibble, $sync_groupby, $sync_stream], - tbl_struct => $tbl_struct, -); -is_deeply( - [ $plugin, \%plugin_args, ], - [ $sync_nibble,{ chunk_index => 'a', key_cols => [qw(a)], small_table=>0 } ], - 'Best plugin Nibble' -); - # ########################################################################### # Test sync_table() for each plugin with a basic, 4 row data set. # ########################################################################### @@ -289,9 +207,9 @@ my $inserts = [ $dst_dbh->do('TRUNCATE TABLE test.test2'); sync_table( - src => "test.test1", - dst => "test.test2", - dry_run => 1, + src => "test.test1", + dst => "test.test2", + argv => [qw(--dry-run)], ); is_deeply( \%actions, @@ -300,9 +218,8 @@ is_deeply( INSERT => 0, REPLACE => 0, UPDATE => 0, - ALGORITHM => 'Chunk', }, - 'Dry run, no changes, Chunk plugin' + 'Dry run, no changes' ); is_deeply( @@ -319,42 +236,10 @@ is_deeply( # Now do the real syncs that should insert 4 rows into test2. -# Sync with Chunk. sync_table( src => "test.test1", dst => "test.test2", ); -is_deeply( - \%actions, - { - DELETE => 0, - INSERT => 4, - REPLACE => 0, - UPDATE => 0, - ALGORITHM => 'Chunk', - }, - 'Sync with Chunk, 4 INSERTs' -); - -is_deeply( - \@rows, - $inserts, - 'Sync with Chunk, ChangeHandler made INSERT statements' -); - -is_deeply( - $dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'), - $test1_rows, - 'Sync with Chunk, dst rows match src rows' -); - -# Sync with Chunk again, but use chunk_size = 1k which should be converted. -$dst_dbh->do('TRUNCATE TABLE test.test2'); -sync_table( - src => "test.test1", - dst => "test.test2", - chunk_size => '1k', -); is_deeply( \%actions, @@ -363,166 +248,52 @@ is_deeply( INSERT => 4, REPLACE => 0, UPDATE => 0, - ALGORITHM => 'Chunk', }, - 'Sync with Chunk chunk size 1k, 4 INSERTs' + 'Basic sync 4 INSERT' ); is_deeply( \@rows, $inserts, - 'Sync with Chunk chunk size 1k, ChangeHandler made INSERT statements' + 'Basic sync ChangeHandler INSERT statements' ); is_deeply( $dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'), $test1_rows, - 'Sync with Chunk chunk size 1k, dst rows match src rows' -); - -# Sync with Nibble. -$dst_dbh->do('TRUNCATE TABLE test.test2'); -sync_table( - src => "test.test1", - dst => "test.test2", - plugins => [ $sync_nibble ], -); - -is_deeply( - \%actions, - { - DELETE => 0, - INSERT => 4, - REPLACE => 0, - UPDATE => 0, - ALGORITHM => 'Nibble', - }, - 'Sync with Nibble, 4 INSERTs' -); - -is_deeply( - \@rows, - $inserts, - 'Sync with Nibble, ChangeHandler made INSERT statements' -); - -is_deeply( - $dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'), - $test1_rows, - 'Sync with Nibble, dst rows match src rows' -); - -# Sync with GroupBy. -$dst_dbh->do('TRUNCATE TABLE test.test2'); -sync_table( - src => "test.test1", - dst => "test.test2", - plugins => [ $sync_groupby ], -); - -is_deeply( - \%actions, - { - DELETE => 0, - INSERT => 4, - REPLACE => 0, - UPDATE => 0, - ALGORITHM => 'GroupBy', - }, - 'Sync with GroupBy, 4 INSERTs' -); - -is_deeply( - \@rows, - $inserts, - 'Sync with GroupBy, ChangeHandler made INSERT statements' -); - -is_deeply( - $dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'), - $test1_rows, - 'Sync with GroupBy, dst rows match src rows' -); - -# Sync with Stream. -$dst_dbh->do('TRUNCATE TABLE test.test2'); -sync_table( - src => "test.test1", - dst => "test.test2", - plugins => [ $sync_stream ], -); - -is_deeply( - \%actions, - { - DELETE => 0, - INSERT => 4, - REPLACE => 0, - UPDATE => 0, - ALGORITHM => 'Stream', - }, - 'Sync with Stream, 4 INSERTs' -); - -is_deeply( - \@rows, - $inserts, - 'Sync with Stream, ChangeHandler made INSERT statements' -); - -is_deeply( - $dst_dbh->selectall_arrayref('SELECT * FROM test.test2 ORDER BY a, b'), - $test1_rows, - 'Sync with Stream, dst rows match src rows' + 'Basic sync dst rows match src rows' ); # ############################################################################# # Check that the plugins can resolve unique key violations. # ############################################################################# -make_plugins(); - sync_table( - src => "test.test3", - dst => "test.test4", - plugins => [ $sync_stream ], + src => "test.test3", + dst => "test.test4", ); is_deeply( $dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ), [ { a => 1, b => 2 }, { a => 2, b => 1 } ], - 'Resolves unique key violations with Stream' -); - -sync_table( - src => "test.test3", - dst => "test.test4", - plugins => [ $sync_chunk ], -); - -is_deeply( - $dst_dbh->selectall_arrayref('select * from test.test4 order by a', { Slice => {}} ), - [ { a => 1, b => 2 }, { a => 2, b => 1 } ], - 'Resolves unique key violations with Chunk' + 'Resolves unique key violations' ); # ########################################################################### # Test locking. # ########################################################################### -make_plugins(); - sync_table( src => "test.test1", dst => "test.test2", - lock => 1, + argv => [qw(--lock 1)], ); # The locks should be released. -ok($src_dbh->do('select * from test.test4'), 'Cycle locks released'); +ok($src_dbh->do('select * from test.test4'), 'Chunk locks released'); sync_table( src => "test.test1", dst => "test.test2", - lock => 2, + argv => [qw(--lock 2)], ); # The locks should be released. @@ -531,7 +302,7 @@ ok($src_dbh->do('select * from test.test4'), 'Table locks released'); sync_table( src => "test.test1", dst => "test.test2", - lock => 3, + argv => [qw(--lock 3)], ); ok( @@ -541,14 +312,9 @@ ok( eval { $syncer->lock_and_wait( - src => $src, - dst => $dst, - lock => 3, lock_level => 3, - replicate => 0, - timeout_ok => 1, - transaction => 0, - wait => 60, + host => $src, + src => $src, ); }; is($EVAL_ERROR, '', 'Locks in level 3'); @@ -571,25 +337,32 @@ throws_ok ( # Kill the DBHs it in the right order: there's a connection waiting on # a lock. -$src_dbh->disconnect(); -$dst_dbh->disconnect(); +$src_cxn = undef; +$dst_cxn = undef; $src_dbh = $sb->get_dbh_for('master'); $dst_dbh = $sb->get_dbh_for('slave1'); - -$src->{dbh} = $src_dbh; -$dst->{dbh} = $dst_dbh; +$src_cxn = new Cxn( + DSNParser => $dp, + OptionParser => $o, + dsn => "h=127,P=12345", + dbh => $src_dbh, +); +$dst_cxn = new Cxn( + DSNParser => $dp, + OptionParser => $o, + dsn => "h=127,P=12346", + dbh => $dst_dbh, +); # ########################################################################### # Test TableSyncGroupBy. # ########################################################################### -make_plugins(); $sb->load_file('master', 't/lib/samples/before-TableSyncGroupBy.sql'); sleep 1; sync_table( src => "test.test1", dst => "test.test2", - plugins => [ $sync_groupby ], ); is_deeply( @@ -608,11 +381,10 @@ is_deeply( ], 'Table synced with GroupBy', ); - +exit; # ############################################################################# # Issue 96: mk-table-sync: Nibbler infinite loop # ############################################################################# -make_plugins(); $sb->load_file('master', 't/lib/samples/issue_96.sql'); sleep 1; @@ -628,7 +400,6 @@ is_deeply( sync_table( src => "issue_96.t", dst => "issue_96.t2", - plugins => [ $sync_nibble ], ); $r1 = $src_dbh->selectall_arrayref('SELECT from_city FROM issue_96.t WHERE package_id=4'); @@ -696,7 +467,6 @@ diag(`/tmp/12345/use -u root -e "DROP USER 'bob'"`); # ########################################################################### # Test that the calback gives us the src and dst sql. # ########################################################################### -make_plugins; # Re-using issue_96.t from above. The tables are already in sync so there # should only be 1 sync cycle. my @sqls; @@ -704,7 +474,6 @@ sync_table( src => "issue_96.t", dst => "issue_96.t2", chunk_size => 1000, - plugins => [ $sync_nibble ], callback => sub { push @sqls, @_; }, ); @@ -762,6 +531,7 @@ my $dbh3 = $sb->get_dbh_for('master1'); SKIP: { skip 'Cannot connect to sandbox master', 7 unless $dbh; skip 'Cannot connect to second sandbox master', 7 unless $dbh3; + my $sync_chunk; sub set_bidi_callbacks { $sync_chunk->set_callback('same_row', sub { @@ -834,7 +604,6 @@ SKIP: { # Load remote data. $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql'); - make_plugins(); set_bidi_callbacks(); $tbl_struct = $tp->parse($tp->get_create_table($src_dbh, 'bidi', 't')); @@ -849,15 +618,13 @@ SKIP: { dst => $dst, tbl_struct => $tbl_struct, cols => [qw(ts)], # Compare only ts col when chunks differ. - plugins => $plugins, - function => 'SHA1', ChangeHandler => new_ch($dbh3, 0), # here to override $dst_dbh. RowDiff => $rd, chunk_size => 2, ); @rows = (); - $syncer->sync_table(%args, plugins => [$sync_chunk]); + $syncer->sync_table(%args); my $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id'); is_deeply( @@ -880,12 +647,11 @@ SKIP: { $sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql'); - make_plugins(); set_bidi_callbacks(); $args{ChangeHandler} = new_ch($dbh3, 0); @rows = (); - $syncer->sync_table(%args, plugins => [$sync_chunk], chunk_size => 10); + $syncer->sync_table(%args, chunk_size => 10); $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id'); is_deeply( @@ -908,12 +674,11 @@ SKIP: { $sb->load_file('master', 't/pt-table-sync/samples/bidirectional/master-data.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/table.sql'); $sb->load_file('master1', 't/pt-table-sync/samples/bidirectional/remote-1.sql'); - make_plugins(); set_bidi_callbacks(); $args{ChangeHandler} = new_ch($dbh3, 0); @rows = (); - $syncer->sync_table(%args, plugins => [$sync_chunk], chunk_size => 100000); + $syncer->sync_table(%args, chunk_size => 100000); $res = $src_dbh->selectall_arrayref('select * from bidi.t order by id'); is_deeply( @@ -934,7 +699,7 @@ SKIP: { # ######################################################################## $args{ChangeHandler} = new_ch($dbh3, 1); throws_ok( - sub { $syncer->sync_table(%args, bidirectional => 1, plugins => [$sync_chunk]) }, + sub { $syncer->sync_table(%args, bidirectional => 1) }, qr/Queueing does not work with bidirectional syncing/, 'Queueing does not work with bidirectional syncing' ); @@ -945,7 +710,6 @@ SKIP: { # ############################################################################# # Test with transactions. # ############################################################################# -make_plugins(); # Sandbox::get_dbh_for() defaults to AutoCommit=1. Autocommit must # be off else commit() will cause an error. $dbh = $sb->get_dbh_for('master', {AutoCommit=>0}); @@ -997,32 +761,18 @@ like( # ############################################################################# # Issue 672: mk-table-sync should COALESCE to avoid undef # ############################################################################# -make_plugins(); $sb->load_file('master', "t/lib/samples/empty_tables.sql"); -foreach my $sync( $sync_chunk, $sync_nibble, $sync_groupby ) { - sync_table( - src => 'et.et1', - dst => 'et.et1', - plugins => [ $sync ], - ); - my $sync_name = ref $sync; - my $algo = $sync_name; - $algo =~ s/TableSync//; - - is_deeply( - \@rows, - [], - "Sync empty tables with " . ref $sync, - ); - - is( - $actions{ALGORITHM}, - $algo, - "$algo algo used to sync empty table" - ); -} +sync_table( + src => 'et.et1', + dst => 'et.et1', +); +is_deeply( + \@rows, + [], + "Sync empty tables" +); # ############################################################################# # Retry wait.