Revert to r243 to undo pt-table-sync changes.

This commit is contained in:
Daniel Nichter
2011-12-22 12:06:25 -07:00
parent 2760322c87
commit 1d8da408fc
10 changed files with 4800 additions and 4285 deletions

View File

@@ -1,4 +1,4 @@
# This program is copyright 2011 Percona Inc.
# This program is copyright 2007-2011 Baron Schwartz, 2011 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
@@ -32,10 +32,16 @@ $Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
# Arguments:
# * MasterSlave A MasterSlave module
# * Quoter A Quoter module
# * VersionParser A VersionParser module
# * TableChecksum A TableChecksum module
# * Retry A Retry module
# * DSNParser (optional)
sub new {
my ( $class, %args ) = @_;
my @required_args = qw(MasterSlave OptionParser Quoter TableParser
TableNibbler RowChecksum RowDiff Retry);
my @required_args = qw(MasterSlave Quoter VersionParser TableChecksum Retry);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
@@ -43,6 +49,29 @@ sub new {
return bless $self, $class;
}
# Return the first plugin from the arrayref of TableSync* plugins
# that can sync the given table struct. plugin->can_sync() usually
# returns a hashref that it wants back when plugin->prepare_to_sync()
# is called. Or, it may return nothing (false) to say that it can't
# sync the table.
sub get_best_plugin {
my ( $self, %args ) = @_;
foreach my $arg ( qw(plugins tbl_struct) ) {
die "I need a $arg argument" unless $args{$arg};
}
MKDEBUG && _d('Getting best plugin');
foreach my $plugin ( @{$args{plugins}} ) {
MKDEBUG && _d('Trying plugin', $plugin->name);
my ($can_sync, %plugin_args) = $plugin->can_sync(%args);
if ( $can_sync ) {
MKDEBUG && _d('Can sync with', $plugin->name, Dumper(\%plugin_args));
return $plugin, %plugin_args;
}
}
MKDEBUG && _d('No plugin can sync the table');
return;
}
# Required arguments:
# * plugins Arrayref of TableSync* modules, in order of preference
# * src Hashref with source (aka left) dbh, db, tbl
@@ -72,371 +101,340 @@ sub new {
# * wait locking
sub sync_table {
my ( $self, %args ) = @_;
my @required_args = qw(src dst RowSyncer ChangeHandler);
my @required_args = qw(plugins src dst tbl_struct cols chunk_size
RowDiff ChangeHandler);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($src, $dst, $row_syncer, $changer) = @args{@required_args};
MKDEBUG && _d('Syncing table with args:',
map { "$_: " . Dumper($args{$_}) }
qw(plugins src dst tbl_struct cols chunk_size));
my $diffs = $args{diffs};
my $changing_src = $args{changing_src};
my ($plugins, $src, $dst, $tbl_struct, $cols, $chunk_size, $rd, $ch)
= @args{@required_args};
my $dp = $self->{DSNParser};
$args{trace} = 1 unless defined $args{trace};
my $o = $self->{OptionParser};
my $q = $self->{Quoter};
my $row_diff = $self->{RowDiff};
my $row_checksum = $self->{RowChecksum};
# USE db on src and dst for cases like when replicate-do-db is being used.
foreach my $host ( $src, $dst ) {
$host->{Cxn}->dbh()->do("USE " . $q->quote($host->{tbl}->{db}));
if ( $args{bidirectional} && $args{ChangeHandler}->{queue} ) {
# This should be checked by the caller but just in case...
die "Queueing does not work with bidirectional syncing";
}
my $trace;
if ( !defined $args{trace} || $args{trace} ) {
chomp(my $hostname = `hostname`);
$trace = "src_host:" . $src->{Cxn}->name()
. " src_tbl:" . join('.', @{$src->{tbl}}{qw(db tbl)})
. " dst_host:" . $dst->{Cxn}->name()
. " dst_tbl:" . join('.', @{$dst->{tbl}}{qw(db tbl)})
. " changing_src:" . ($changing_src ? "yes" : "no")
. " diffs:" . ($diffs ? scalar @$diffs : "0")
. " " . join(" ", map { "$_:" . ($o->get($_) ? "yes" : "no") }
qw(lock transaction replicate bidirectional))
. " pid:$PID "
. ($ENV{USER} ? "user:$ENV{USER} " : "")
. ($hostname ? "host:$hostname" : "");
MKDEBUG && _d("Binlog trace message:", $trace);
}
$args{index_hint} = 1 unless defined $args{index_hint};
$args{lock} ||= 0;
$args{wait} ||= 0;
$args{transaction} ||= 0;
$args{timeout_ok} ||= 0;
# Make NibbleIterator for checksumming chunks of rows to see if
# there are any diffs.
my %crc_args = $row_checksum->get_crc_args(dbh => $src->{Cxn}->dbh());
my $chunk_cols;
if ( $diffs ) {
$chunk_cols = "0 AS cnt, '' AS crc";
}
else {
$chunk_cols = $row_checksum->make_chunk_checksum(
dbh => $src->{Cxn}->dbh(),
tbl => $src->{tbl},
%crc_args
);
}
my $q = $self->{Quoter};
my $vp = $self->{VersionParser};
if ( !defined $src->{select_lock} || !defined $dst->{select_lock} ) {
if ( $o->get('transaction') ) {
if ( $o->get('bidirectional') ) {
# Making changes on src and dst.
$src->{select_lock} = 'FOR UPDATE';
$dst->{select_lock} = 'FOR UPDATE';
}
elsif ( $changing_src ) {
# Making changes on master (src) which replicate to slave (dst).
$src->{select_lock} = 'FOR UPDATE';
$dst->{select_lock} = 'LOCK IN SHARE MODE';
}
else {
# Making changes on slave (dst).
$src->{select_lock} = 'LOCK IN SHARE MODE';
$dst->{select_lock} = 'FOR UPDATE';
}
}
else {
$src->{select_lock} = '';
$dst->{select_lock} = '';
}
MKDEBUG && _d('SELECT lock:', $src->{select_lock});
MKDEBUG && _d('SELECT lock:', $dst->{select_lock});
}
# ########################################################################
# Get and prepare the first plugin that can sync this table.
# ########################################################################
my ($plugin, %plugin_args) = $self->get_best_plugin(%args);
die "No plugin can sync $src->{db}.$src->{tbl}" unless $plugin;
my $user_where = $o->get('where');
my ($src_nibble_iter, $dst_nibble_iter);
foreach my $host ($src, $dst) {
my $callbacks = {
init => sub {
my (%args) = @_;
my $cxn = $args{Cxn};
my $tbl = $args{tbl};
my $nibble_iter = $args{NibbleIterator};
my $sths = $nibble_iter->statements();
my $oktonibble = 1;
if ( $o->get('explain') ) {
# --explain level 1: print the checksum and next boundary
# statements.
print "--\n"
. "-- "
. ($cxn->{is_source} ? "Source" : "Destination")
. " " . $cxn->name()
. " " . "$tbl->{db}.$tbl->{tbl}\n"
. "--\n\n";
my $statements = $nibble_iter->statements();
foreach my $sth ( sort keys %$statements ) {
next if $sth =~ m/^explain/;
if ( $statements->{$sth} ) {
print $statements->{$sth}->{Statement}, "\n\n";
}
}
if ( $o->get('explain') < 2 ) {
$oktonibble = 0; # don't nibble table; next table
}
}
else {
if ( $o->get('buffer-to-client') ) {
$host->{sth}->{mysql_use_result} = 1;
}
# Lock the table.
$self->lock_and_wait(
lock_level => 2,
host => $host,
src => $src,
changing_src => $changing_src,
);
}
return $oktonibble;
},
next_boundaries => sub {
my (%args) = @_;
my $tbl = $args{tbl};
if ( my $diff = $tbl->{diff} ) {
my $nibble_iter = $args{NibbleIterator};
my $boundary = $nibble_iter->boundaries();
$nibble_iter->set_boundary(
'upper', [ split ',', $diff->{upper_boundary} ]);
$nibble_iter->set_boundary(
'lower', [ split ',', $diff->{lower_boundary} ]);
}
return 1;
},
exec_nibble => sub {
my (%args) = @_;
my $tbl = $args{tbl};
my $nibble_iter = $args{NibbleIterator};
my $sths = $nibble_iter->statements();
my $boundary = $nibble_iter->boundaries();
# --explain level 2: print chunk,lower boundary values,upper
# boundary values.
if ( $o->get('explain') > 1 ) {
my $lb_quoted = join(',', @{$boundary->{lower} || []});
my $ub_quoted = join(',', @{$boundary->{upper} || []});
my $chunk = $nibble_iter->nibble_number();
printf "%d %s %s\n",
$chunk,
(defined $lb_quoted ? $lb_quoted : '1=1'),
(defined $ub_quoted ? $ub_quoted : '1=1');
if ( !$nibble_iter->more_boundaries() ) {
print "\n"; # blank line between this table and the next table
}
return 0; # next boundary
}
# Lock the chunk.
$self->lock_and_wait(
%args,
lock_level => 1,
host => $host,
src => $src,
changing_src => $changing_src,
);
# Execute the chunk checksum statement.
# The nibble iter will return the row.
MKDEBUG && _d('nibble', $args{Cxn}->name());
$sths->{nibble}->execute(@{$boundary->{lower}}, @{$boundary->{upper}});
return 1;
},
};
my $nibble_iter = new NibbleIterator(
Cxn => $host->{Cxn},
tbl => $host->{tbl},
chunk_size => $o->get('chunk-size'),
chunk_index => $diffs ? $diffs->[0]->{chunk_index}
: $o->get('chunk-index'),
manual_nibble => $diffs ? 1 : 0,
empty_results => 1,
select => $chunk_cols,
select_lock => $host->{select_lock},
callbacks => $callbacks,
fetch_hashref => 1,
one_nibble => $args{one_nibble},
OptionParser => $self->{OptionParser},
Quoter => $self->{Quoter},
TableNibbler => $self->{TableNibbler},
TableParser => $self->{TableParser},
RowChecksum => $self->{RowChecksum},
);
if ( $host->{Cxn}->{is_source} ) {
$src_nibble_iter = $nibble_iter;
}
else {
$dst_nibble_iter = $nibble_iter;
}
}
my $index = $src_nibble_iter->nibble_index();
my $key_cols = $index ? $src->{tbl}->{tbl_struct}->{keys}->{$index}->{cols}
: $src->{tbl}->{tbl_struct}->{cols};
$row_syncer->set_key_cols($key_cols);
my $crc_col = 'crc';
while ( $src->{tbl}->{tbl_struct}->{is_col}->{$crc_col} ) {
# The row-level (state 2) checksums use __crc, so the table can't use that.
my $crc_col = '__crc';
while ( $tbl_struct->{is_col}->{$crc_col} ) {
$crc_col = "_$crc_col"; # Prepend more _ until not a column.
}
$row_syncer->set_crc_col($crc_col);
MKDEBUG && _d('CRC column:', $crc_col);
my $rows_sql;
my $row_cols = $row_checksum->make_row_checksum(
dbh => $src->{Cxn}->dbh(),
tbl => $src->{tbl},
%crc_args,
);
my $sql_clause = $src_nibble_iter->sql();
foreach my $host ($src, $dst) {
if ( $src_nibble_iter->one_nibble() ) {
$rows_sql
= 'SELECT /*rows in nibble*/ '
. ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
. "$row_cols AS $crc_col"
. " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)})
. " WHERE 1=1 "
. ($user_where ? " AND ($user_where)" : '')
. ($sql_clause->{order_by} ? " ORDER BY " . $sql_clause->{order_by}
: "");
# Make an index hint for either the explicitly given chunk_index
# or the chunk_index chosen by the plugin if index_hint is true.
my $index_hint;
my $hint = ($vp->version_ge($src->{dbh}, '4.0.9')
&& $vp->version_ge($dst->{dbh}, '4.0.9') ? 'FORCE' : 'USE')
. ' INDEX';
if ( $args{chunk_index} ) {
MKDEBUG && _d('Using given chunk index for index hint');
$index_hint = "$hint (" . $q->quote($args{chunk_index}) . ")";
}
elsif ( $plugin_args{chunk_index} && $args{index_hint} ) {
MKDEBUG && _d('Using chunk index chosen by plugin for index hint');
$index_hint = "$hint (" . $q->quote($plugin_args{chunk_index}) . ")";
}
MKDEBUG && _d('Index hint:', $index_hint);
eval {
$plugin->prepare_to_sync(
%args,
%plugin_args,
dbh => $src->{dbh},
db => $src->{db},
tbl => $src->{tbl},
crc_col => $crc_col,
index_hint => $index_hint,
);
};
if ( $EVAL_ERROR ) {
# At present, no plugin should fail to prepare, but just in case...
die 'Failed to prepare TableSync', $plugin->name, ' plugin: ',
$EVAL_ERROR;
}
# Some plugins like TableSyncChunk use checksum queries, others like
# TableSyncGroupBy do not. For those that do, make chunk (state 0)
# and row (state 2) checksum queries.
if ( $plugin->uses_checksum() ) {
eval {
my ($chunk_sql, $row_sql) = $self->make_checksum_queries(%args);
$plugin->set_checksum_queries($chunk_sql, $row_sql);
};
if ( $EVAL_ERROR ) {
# This happens if src and dst are really different and the same
# checksum algo and hash func can't be used on both.
die "Failed to make checksum queries: $EVAL_ERROR";
}
else {
$rows_sql
= 'SELECT /*rows in nibble*/ '
. ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
. "$row_cols AS $crc_col"
. " FROM " . $q->quote(@{$host->{tbl}}{qw(db tbl)})
. " WHERE " . $sql_clause->{boundaries}->{'>='} # lower boundary
. " AND " . $sql_clause->{boundaries}->{'<='} # upper boundary
. ($user_where ? " AND ($user_where)" : '')
. " ORDER BY " . $sql_clause->{order_by};
}
$host->{rows_sth} = $host->{Cxn}->dbh()->prepare($rows_sql);
}
# ########################################################################
# Plugin is ready, return now if this is a dry run.
# ########################################################################
if ( $args{dry_run} ) {
return $ch->get_changes(), ALGORITHM => $plugin->name;
}
# ########################################################################
# Start syncing the table.
# ########################################################################
while ( $src_nibble_iter->more_boundaries()
|| $dst_nibble_iter->more_boundaries() ) {
if ( $diffs ) {
my $diff = shift @$diffs;
if ( !$diff ) {
MKDEBUG && _d('No more checksum diffs');
last;
}
MKDEBUG && _d('Syncing checksum diff', Dumper($diff));
$src->{tbl}->{diff} = $diff;
$dst->{tbl}->{diff} = $diff;
}
my $src_chunk = $src_nibble_iter->next();
my $dst_chunk = $dst_nibble_iter->next();
MKDEBUG && _d('Got chunk');
if ( $diffs
|| ($src_chunk->{cnt} || 0) != ($dst_chunk->{cnt} || 0)
|| ($src_chunk->{crc} || '') ne ($dst_chunk->{crc} || '') ) {
MKDEBUG && _d("Chunks differ");
my $boundary = $src_nibble_iter->boundaries();
foreach my $host ($src, $dst) {
MKDEBUG && _d($host->{Cxn}->name(), $host->{rows_sth}->{Statement},
'params:', @{$boundary->{lower}}, @{$boundary->{upper}});
$host->{rows_sth}->execute(
@{$boundary->{lower}}, @{$boundary->{upper}});
}
$row_diff->compare_sets(
left_dbh => $src->{Cxn}->dbh(),
left_sth => $src->{rows_sth},
right_dbh => $dst->{Cxn}->dbh(),
right_sth => $dst->{rows_sth},
tbl_struct => $src->{tbl}->{tbl_struct},
syncer => $row_syncer,
);
$changer->process_rows(1, $trace);
foreach my $host ($src, $dst) {
$host->{rows_sth}->finish();
}
if ( !$o->get('lock') && $o->get('transaction') ) {
foreach my $host ($src, $dst) {
$host->{Cxn}->dbh()->commit()
unless $host->{Cxn}->dbh()->{AutoCommit};
}
}
}
# Get next chunks.
$src_nibble_iter->no_more_rows();
$dst_nibble_iter->no_more_rows();
# USE db on src and dst for cases like when replicate-do-db is being used.
eval {
$src->{dbh}->do("USE `$src->{db}`");
$dst->{dbh}->do("USE `$dst->{db}`");
};
if ( $EVAL_ERROR ) {
# This shouldn't happen, but just in case. (The db and tbl on src
# and dst should be checked before calling this sub.)
die "Failed to USE database on source or destination: $EVAL_ERROR";
}
$changer->process_rows(0, $trace);
# For bidirectional syncing it's important to know on which dbh
# changes are made or rows are fetched. This identifies the dbhs,
# then you can search for each one by its address like
# "dbh DBI::db=HASH(0x1028b38)".
MKDEBUG && _d('left dbh', $src->{dbh});
MKDEBUG && _d('right dbh', $dst->{dbh});
# Unlock the table.
foreach my $host ($src, $dst) {
$self->unlock(
lock_level => 2,
host => $host,
OptionParser => $o,
chomp(my $hostname = `hostname`);
my $trace_msg
= $args{trace} ? "src_db:$src->{db} src_tbl:$src->{tbl} "
. ($dp && $src->{dsn} ? "src_dsn:".$dp->as_string($src->{dsn}) : "")
. " dst_db:$dst->{db} dst_tbl:$dst->{tbl} "
. ($dp && $dst->{dsn} ? "dst_dsn:".$dp->as_string($dst->{dsn}) : "")
. " " . join(" ", map { "$_:" . ($args{$_} || 0) }
qw(lock transaction changing_src replicate bidirectional))
. " pid:$PID "
. ($ENV{USER} ? "user:$ENV{USER} " : "")
. ($hostname ? "host:$hostname" : "")
: "";
MKDEBUG && _d("Binlog trace message:", $trace_msg);
$self->lock_and_wait(%args, lock_level => 2); # per-table lock
my $callback = $args{callback};
my $cycle = 0;
while ( !$plugin->done() ) {
# Do as much of the work as possible before opening a transaction or
# locking the tables.
MKDEBUG && _d('Beginning sync cycle', $cycle);
my $src_sql = $plugin->get_sql(
database => $src->{db},
table => $src->{tbl},
where => $args{where},
);
my $dst_sql = $plugin->get_sql(
database => $dst->{db},
table => $dst->{tbl},
where => $args{where},
);
if ( $args{transaction} ) {
if ( $args{bidirectional} ) {
# Making changes on src and dst.
$src_sql .= ' FOR UPDATE';
$dst_sql .= ' FOR UPDATE';
}
elsif ( $args{changing_src} ) {
# Making changes on master (src) which replicate to slave (dst).
$src_sql .= ' FOR UPDATE';
$dst_sql .= ' LOCK IN SHARE MODE';
}
else {
# Making changes on slave (dst).
$src_sql .= ' LOCK IN SHARE MODE';
$dst_sql .= ' FOR UPDATE';
}
}
MKDEBUG && _d('src:', $src_sql);
MKDEBUG && _d('dst:', $dst_sql);
# Give callback a chance to do something with the SQL statements.
$callback->($src_sql, $dst_sql) if $callback;
# Prepare each host for next sync cycle. This does stuff
# like reset/init MySQL accumulator vars, etc.
$plugin->prepare_sync_cycle($src);
$plugin->prepare_sync_cycle($dst);
# Prepare SQL statements on host. These aren't real prepared
# statements (i.e. no ? placeholders); we just need sths to
# pass to compare_sets(). Also, we can control buffering
# (mysql_use_result) on the sths.
my $src_sth = $src->{dbh}->prepare($src_sql);
my $dst_sth = $dst->{dbh}->prepare($dst_sql);
if ( $args{buffer_to_client} ) {
$src_sth->{mysql_use_result} = 1;
$dst_sth->{mysql_use_result} = 1;
}
# The first cycle should lock to begin work; after that, unlock only if
# the plugin says it's OK (it may want to dig deeper on the rows it
# currently has locked).
my $executed_src = 0;
if ( !$cycle || !$plugin->pending_changes() ) {
# per-sync cycle lock
$executed_src
= $self->lock_and_wait(%args, src_sth => $src_sth, lock_level => 1);
}
# The source sth might have already been executed by lock_and_wait().
$src_sth->execute() unless $executed_src;
$dst_sth->execute();
# Compare rows in the two sths. If any differences are found
# (same_row, not_in_left, not_in_right), the appropriate $syncer
# methods are called to handle them. Changes may be immediate, or...
$rd->compare_sets(
left_sth => $src_sth,
right_sth => $dst_sth,
left_dbh => $src->{dbh},
right_dbh => $dst->{dbh},
syncer => $plugin,
tbl_struct => $tbl_struct,
);
# ...changes may be queued and executed now.
$ch->process_rows(1, $trace_msg);
MKDEBUG && _d('Finished sync cycle', $cycle);
$cycle++;
}
return $changer->get_changes();
$ch->process_rows(0, $trace_msg);
$self->unlock(%args, lock_level => 2);
return $ch->get_changes(), ALGORITHM => $plugin->name;
}
sub lock_table {
sub make_checksum_queries {
my ( $self, %args ) = @_;
my @required_args = qw(host mode);
my @required_args = qw(src dst tbl_struct);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($host, $mode) = @args{@required_args};
my $q = $self->{Quoter};
my $sql = "LOCK TABLES "
. $q->quote(@{$host->{tbl}}{qw(db tbl)})
. " $mode";
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
return;
my ($src, $dst, $tbl_struct) = @args{@required_args};
my $checksum = $self->{TableChecksum};
# Decide on checksumming strategy and store checksum query prototypes for
# later.
my $src_algo = $checksum->best_algorithm(
algorithm => 'BIT_XOR',
dbh => $src->{dbh},
where => 1,
chunk => 1,
count => 1,
);
my $dst_algo = $checksum->best_algorithm(
algorithm => 'BIT_XOR',
dbh => $dst->{dbh},
where => 1,
chunk => 1,
count => 1,
);
if ( $src_algo ne $dst_algo ) {
die "Source and destination checksum algorithms are different: ",
"$src_algo on source, $dst_algo on destination"
}
MKDEBUG && _d('Chosen algo:', $src_algo);
my $src_func = $checksum->choose_hash_func(dbh => $src->{dbh}, %args);
my $dst_func = $checksum->choose_hash_func(dbh => $dst->{dbh}, %args);
if ( $src_func ne $dst_func ) {
die "Source and destination hash functions are different: ",
"$src_func on source, $dst_func on destination";
}
MKDEBUG && _d('Chosen hash func:', $src_func);
# Since the checksum algo and hash func are the same on src and dst
# it doesn't matter if we use src_algo/func or dst_algo/func.
my $crc_wid = $checksum->get_crc_wid($src->{dbh}, $src_func);
my ($crc_type) = $checksum->get_crc_type($src->{dbh}, $src_func);
my $opt_slice;
if ( $src_algo eq 'BIT_XOR' && $crc_type !~ m/int$/ ) {
$opt_slice = $checksum->optimize_xor(
dbh => $src->{dbh},
function => $src_func
);
}
my $chunk_sql = $checksum->make_checksum_query(
%args,
db => $src->{db},
tbl => $src->{tbl},
algorithm => $src_algo,
function => $src_func,
crc_wid => $crc_wid,
crc_type => $crc_type,
opt_slice => $opt_slice,
replicate => undef, # replicate means something different to this sub
); # than what we use it for; do not pass it!
MKDEBUG && _d('Chunk sql:', $chunk_sql);
my $row_sql = $checksum->make_row_checksum(
%args,
function => $src_func,
);
MKDEBUG && _d('Row sql:', $row_sql);
return $chunk_sql, $row_sql;
}
sub lock_table {
my ( $self, $dbh, $where, $db_tbl, $mode ) = @_;
my $query = "LOCK TABLES $db_tbl $mode";
MKDEBUG && _d($query);
$dbh->do($query);
MKDEBUG && _d('Acquired table lock on', $where, 'in', $mode, 'mode');
}
# Doesn't work quite the same way as lock_and_wait. It will unlock any LOWER
# priority lock level, not just the exact same one.
sub unlock {
my ( $self, %args ) = @_;
my @required_args = qw(lock_level host);
foreach my $arg ( @required_args ) {
foreach my $arg ( qw(src dst lock transaction lock_level) ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my ($lock_level, $host) = @args{@required_args};
my $o = $self->{OptionParser};
my $src = $args{src};
my $dst = $args{dst};
my $lock = $o->get('lock');
return unless $lock && $lock <= $lock_level;
MKDEBUG && _d('Unlocking level', $lock);
return unless $args{lock} && $args{lock} <= $args{lock_level};
if ( $o->get('transaction') ) {
MKDEBUG && _d('Committing', $host->{Cxn}->name());
$host->{Cxn}->dbh()->commit();
}
else {
my $sql = 'UNLOCK TABLES';
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
# First, unlock/commit.
foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) {
if ( $args{transaction} ) {
MKDEBUG && _d('Committing', $dbh);
$dbh->commit();
}
else {
my $sql = 'UNLOCK TABLES';
MKDEBUG && _d($dbh, $sql);
$dbh->do($sql);
}
}
return;
@@ -460,86 +458,73 @@ sub unlock {
# $src_sth was executed.
sub lock_and_wait {
my ( $self, %args ) = @_;
my @required_args = qw(lock_level host src);
foreach my $arg ( @required_args ) {
my $result = 0;
foreach my $arg ( qw(src dst lock lock_level) ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my ($lock_level, $host, $src) = @args{@required_args};
my $o = $self->{OptionParser};
my $src = $args{src};
my $dst = $args{dst};
my $lock = $o->get('lock');
return unless $lock && $lock == $lock_level;
return unless $args{lock} && $args{lock} == $args{lock_level};
MKDEBUG && _d('lock and wait, lock level', $args{lock});
# First, commit/unlock the previous transaction/lock.
if ( $o->get('transaction') ) {
MKDEBUG && _d('Committing', $host->{Cxn}->name());
$host->{Cxn}->dbh()->commit();
}
else {
my $sql = 'UNLOCK TABLES';
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
}
# Lock/start xa.
return $host->{Cxn}->{is_source} ? $self->_lock_src(%args)
: $self->_lock_dst(%args);
}
sub _lock_src {
my ( $self, %args ) = @_;
my @required_args = qw(lock_level host src);
my ($lock_level, $host, $src) = @args{@required_args};
my $o = $self->{OptionParser};
my $lock = $o->get('lock');
MKDEBUG && _d('Locking', $host->{Cxn}->name(), 'level', $lock);
if ( $lock == 3 ) {
my $sql = 'FLUSH TABLES WITH READ LOCK';
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
}
else {
# Lock level 2 (per-table) or 1 (per-chunk).
if ( $o->get('transaction') ) {
my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */";
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) {
if ( $args{transaction} ) {
MKDEBUG && _d('Committing', $dbh);
$dbh->commit();
}
else {
$self->lock_table(
host => $host,
mode => $args{changing_src} ? 'WRITE' : 'READ',
);
my $sql = 'UNLOCK TABLES';
MKDEBUG && _d($dbh, $sql);
$dbh->do($sql);
}
}
return;
}
sub _lock_dst {
my ( $self, %args ) = @_;
my @required_args = qw(lock_level host src);
my ($lock_level, $host, $src) = @args{@required_args};
# User wants us to lock for consistency. But lock only on source initially;
# might have to wait for the slave to catch up before locking on the dest.
if ( $args{lock} == 3 ) {
my $sql = 'FLUSH TABLES WITH READ LOCK';
MKDEBUG && _d($src->{dbh}, $sql);
$src->{dbh}->do($sql);
}
else {
# Lock level 2 (per-table) or 1 (per-sync cycle)
if ( $args{transaction} ) {
if ( $args{src_sth} ) {
# Execute the $src_sth on the source, so LOCK IN SHARE MODE/FOR
# UPDATE will lock the rows examined.
MKDEBUG && _d('Executing statement on source to lock rows');
my $o = $self->{OptionParser};
my $lock = $o->get('lock');
MKDEBUG && _d('Locking', $host->{Cxn}->name(), 'level', $lock);
my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */";
MKDEBUG && _d($src->{dbh}, $sql);
$src->{dbh}->do($sql);
$args{src_sth}->execute();
$result = 1;
}
}
else {
$self->lock_table($src->{dbh}, 'source',
$self->{Quoter}->quote($src->{db}, $src->{tbl}),
$args{changing_src} ? 'WRITE' : 'READ');
}
}
# Wait for the dest to catchup to the source, then lock the dest.
# If there is any error beyond this point, we need to unlock/commit.
eval {
if ( my $timeout = $o->get('wait') ) {
if ( my $timeout = $args{wait} ) {
my $ms = $self->{MasterSlave};
my $wait;
my $tries = $args{wait_retry_args}->{tries} || 3;
my $wait;
$self->{Retry}->retry(
tries => $tries,
wait => sub { sleep 5; },
wait => sub { sleep $args{wait_retry_args}->{wait} || 10 },
try => sub {
my ( %args ) = @_;
# Be careful using $args{...} in this callback! %args in
# here are the passed-in args, not the args to the sub.
# here are the passed-in args, not the args to lock_and_wait().
if ( $args{tryno} > 1 ) {
warn "Retrying MASTER_POS_WAIT() for --wait $timeout...";
@@ -549,8 +534,8 @@ sub _lock_dst {
# because the main dbh might be in use due to executing
# $src_sth.
$wait = $ms->wait_for_master(
master_status => $ms->get_master_status($src->{Cxn}->aux_dbh()),
slave_dbh => $host->{Cxn}->dbh(),
master_status => $ms->get_master_status($src->{misc_dbh}),
slave_dbh => $dst->{dbh},
timeout => $timeout,
);
if ( defined $wait->{result} && $wait->{result} != -1 ) {
@@ -607,24 +592,27 @@ sub _lock_dst {
'(syncing via replication or sync-to-master)');
}
else {
if ( $lock == 3 ) {
if ( $args{lock} == 3 ) {
my $sql = 'FLUSH TABLES WITH READ LOCK';
MKDEBUG && _d($host->{Cxn}->name(), $sql);
$host->{Cxn}->dbh()->do($sql);
MKDEBUG && _d($dst->{dbh}, ',', $sql);
$dst->{dbh}->do($sql);
}
elsif ( !$o->get('transaction') ) {
$self->lock_table(
host => $host,
mode => 'READ', # $args{execute} ? 'WRITE' : 'READ')
);
elsif ( !$args{transaction} ) {
$self->lock_table($dst->{dbh}, 'dest',
$self->{Quoter}->quote($dst->{db}, $dst->{tbl}),
$args{execute} ? 'WRITE' : 'READ');
}
}
};
if ( $EVAL_ERROR ) {
# Must abort/unlock/commit so that we don't interfere with any further
# tables we try to do.
foreach my $dbh ( $host->{Cxn}->dbh(), $src->{Cxn}->dbh() ) {
MKDEBUG && _d('Caught error, unlocking/committing', $dbh);
if ( $args{src_sth}->{Active} ) {
$args{src_sth}->finish();
}
foreach my $dbh ( $src->{dbh}, $dst->{dbh}, $src->{misc_dbh} ) {
next unless $dbh;
MKDEBUG && _d('Caught error, unlocking/committing on', $dbh);
$dbh->do('UNLOCK TABLES');
$dbh->commit() unless $dbh->{AutoCommit};
}
@@ -632,7 +620,7 @@ sub _lock_dst {
die $EVAL_ERROR;
}
return;
return $result;
}
# This query will check all needed privileges on the table without actually
@@ -663,7 +651,6 @@ sub have_all_privs {
return 0;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }