Finish rewriting TableSyncer and updating its test. Add RowSyncerBidirectional.pm. Add GENLOG switch to start-sandbox.

This commit is contained in:
Daniel Nichter
2011-12-21 12:10:57 -07:00
parent 4c8fd5c080
commit ec8471ba28
5 changed files with 512 additions and 252 deletions

View File

@@ -143,6 +143,7 @@ sub sync_table {
my $user_where = $o->get('where');
my ($src_nibble_iter, $dst_nibble_iter);
foreach my $host ($src, $dst) {
my $callbacks = {
init => sub {
@@ -186,7 +187,7 @@ sub sync_table {
},
};
$host->{nibble_iter} = new NibbleIterator(
my $nibble_iter = new NibbleIterator(
Cxn => $host->{Cxn},
tbl => $host->{tbl},
chunk_size => $o->get('chunk-size'),
@@ -194,15 +195,23 @@ sub sync_table {
select => $chunk_cols,
callbacks => $callbacks,
fetch_hashref => 1,
one_nibble => $args{one_nibble},
OptionParser => $self->{OptionParser},
Quoter => $self->{Quoter},
TableNibbler => $self->{TableNibbler},
TableParser => $self->{TableParser},
RowChecksum => $self->{RowChecksum},
);
if ( $host->{is_source} ) {
$src_nibble_iter = $nibble_iter;
}
else {
$dst_nibble_iter = $nibble_iter;
}
}
my $index = $src->{nibble_iter}->nibble_index();
my $index = $src_nibble_iter->nibble_index();
my $key_cols = $index ? $src->{tbl}->{tbl_struct}->{keys}->{$index}->{cols}
: $src->{tbl}->{tbl_struct}->{cols};
$row_syncer->set_key_cols($key_cols);
@@ -214,59 +223,62 @@ sub sync_table {
$row_syncer->set_crc_col($crc_col);
MKDEBUG && _d('CRC column:', $crc_col);
my $rows_sql;
my $row_cols = $row_checksum->make_row_checksum(
dbh => $src->{Cxn}->dbh(),
tbl => $src->{tbl},
%crc_args,
);
my $sql_clause = $src_nibble_iter->sql();
foreach my $host ($src, $dst) {
my $row_cols = $row_checksum->make_row_checksum(
dbh => $host->{Cxn}->dbh(),
tbl => $host->{tbl},
%crc_args,
);
my $nibble_iter = $host->{nibble_iter};
if ( $nibble_iter->one_nibble() ) {
my $rows_sql
if ( $src_nibble_iter->one_nibble() ) {
$rows_sql
= 'SELECT /*rows in nibble*/ '
. ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
. "$row_cols AS $crc_col"
. " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)})
. " WHERE 1=1 "
. ($user_where ? " AND ($user_where)" : '');
$host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql);
. ($user_where ? " AND ($user_where)" : '')
. ($sql_clause->{order_by} ? " ORDER BY " . $sql_clause->{order_by}
: "");
}
else {
my $sql = $nibble_iter->sql();
my $rows_sql
$rows_sql
= 'SELECT /*rows in nibble*/ '
. ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
. "$row_cols AS $crc_col"
. " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)})
. " WHERE " . $sql->{boundaries}->{'>='} # lower boundary
. " AND " . $sql->{boundaries}->{'<='} # upper boundary
. " WHERE " . $sql_clause->{boundaries}->{'>='} # lower boundary
. " AND " . $sql_clause->{boundaries}->{'<='} # upper boundary
. ($user_where ? " AND ($user_where)" : '')
. " ORDER BY " . $sql->{order_by};
$host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql);
. " ORDER BY " . $sql_clause->{order_by};
}
$host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql);
}
# ########################################################################
# Start syncing the table.
# ########################################################################
while ( grep { $_->{nibble_iter}->more_boundaries() } ($src, $dst) ) {
my $src_chunk = $src->{nibble_iter}->next();
my $dst_chunk = $dst->{nibble_iter}->next();
while ( $src_nibble_iter->more_boundaries()
|| $dst_nibble_iter->more_boundaries() ) {
my $src_chunk = $src_nibble_iter->next();
my $dst_chunk = $dst_nibble_iter->next();
if ( $src_chunk->{cnt} != $dst_chunk->{cnt}
|| $src_chunk->{crc} ne $dst_chunk->{crc} ) {
MKDEBUG && _d("Chunks differ");
my $boundary = $src_nibble_iter->boundaries();
foreach my $host ($src, $dst) {
my $nibble_iter = $host->{nibble_iter};
my $boundary = $nibble_iter->boundaries();
MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement},
'params:', @{$boundary->{lower}}, @{$boundary->{upper}});
$host->{rows_sth}->execute(
@{$boundary->{lower}}, @{$boundary->{upper}});
}
$row_diff->compare_sets(
left_dbh => $src->{Cxn}->dbh(),
left_sth => $src->{rows_sth},
right_dbh => $dst->{Cxn}->dbh(),
right_sth => $dst->{rows_sth},
tbl_struct => $src->{tbl}->{tbl_struct},
syncer => $row_syncer,
@@ -277,18 +289,9 @@ sub sync_table {
}
}
# Unlock the chunks.
foreach my $host ($src, $dst) {
$self->unlock(
lock_level => 1,
host => $host,
OptionParser => $o,
);
}
# Get next chunks.
$src->{nibble_iter}->no_more_rows();
$dst->{nibble_iter}->no_more_rows();
$src_nibble_iter->no_more_rows();
$dst_nibble_iter->no_more_rows();
}
$changer->process_rows(0, $trace);
@@ -337,12 +340,12 @@ sub unlock {
MKDEBUG && _d('Unlocking level', $lock);
if ( $o->get('transaction') ) {
MKDEBUG && _d('Committing', $host->name());
MKDEBUG && _d('Committing', $host->{Cxn}->name());
$host->{Cxn}->dbh()->commit();
}
else {
my $sql = 'UNLOCK TABLES';
MKDEBUG && _d($host->name(), $sql);
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
}
@@ -377,6 +380,18 @@ sub lock_and_wait {
my $lock = $o->get('lock');
return unless $lock && $lock == $lock_level;
# First, commit/unlock the previous transaction/lock.
if ( $o->get('transaction') ) {
MKDEBUG && _d('Committing', $host->{Cxn}->name());
$host->{Cxn}->dbh()->commit();
}
else {
my $sql = 'UNLOCK TABLES';
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
}
# Lock/start xa.
return $host->{is_source} ? $self->_lock_src(%args)
: $self->_lock_dst(%args);
}
@@ -426,8 +441,8 @@ sub _lock_dst {
eval {
if ( my $timeout = $o->get('wait') ) {
my $ms = $self->{MasterSlave};
my $tries = 3;
my $wait;
my $wait = $args{wait_retry_args}->{wait} || 10;
my $tries = $args{wait_retry_args}->{tries} || 3;
$self->{Retry}->retry(
tries => $tries,
wait => sub { sleep 5; },