Fix TableSyncer trace msg, implement --explain, fix locking/committing. Add aux dbh, dbh opts, and disconnect() to Cxn.

This commit is contained in:
Daniel Nichter
2011-12-21 21:12:03 -07:00
parent ec8471ba28
commit 457fa70668
3 changed files with 130 additions and 55 deletions

View File

@@ -54,6 +54,7 @@ use constant PERCONA_TOOLKIT_TEST_USE_DSN_NAMES => $ENV{PERCONA_TOOLKIT_TEST_USE
# Optional Arguments:
# dbh - Pre-created, uninitialized dbh
# set - Callback to set vars on dbh when dbh is first connected
# aux - Create a secondy (auxiliary) dbh, get with <aux_dbh()>.
#
# Returns:
# Cxn object
@@ -101,6 +102,8 @@ sub new {
dsn_name => $dp->as_string($dsn, [qw(h P S)]),
hostname => '',
set => $args{set},
aux => $args{aux},
dbh_opts => $args{dbh_opts} || {AutoCommit => 1},
dbh_set => 0,
OptionParser => $o,
DSNParser => $dp,
@@ -122,13 +125,33 @@ sub connect {
$dsn->{p} = OptionParser::prompt_noecho("Enter MySQL password: ");
$self->{asked_for_pass} = 1;
}
$dbh = $dp->get_dbh($dp->get_cxn_params($dsn), { AutoCommit => 1 });
$dbh = $dp->get_dbh($dp->get_cxn_params($dsn), $self->{dbh_opts});
}
MKDEBUG && _d($dbh, 'Connected dbh to', $self->{name});
if ( $self->{aux} && (!$self->{aux_dbh} || !$self->{aux_dbh}->ping()) ) {
my $aux_dbh = $dp->get_dbh($dp->get_cxn_params($dsn), {AutoCommit => 1});
MKDEBUG && _d($aux_dbh, 'Connected aux dbh to', $self->{name});
$dbh->{FetchHashKeyName} = 'NAME_lc';
$self->{aux_dbh} = $aux_dbh;
}
return $self->set_dbh($dbh);
}
sub disconnect {
my ($self) = @_;
if ( $self->{dbh} ) {
MKDEBUG && _d('Disconnecting dbh', $self->{dbh}, $self->{name});
$self->{dbh}->disconnect();
}
if ( $self->{aux_dbh} ) {
MKDEBUG && _d('Disconnecting aux dbh', $self->{aux_dbh});
$self->{aux_dbh}->disconnect();
}
return;
}
sub set_dbh {
my ($self, $dbh) = @_;
@@ -148,7 +171,7 @@ sub set_dbh {
# Set stuff for this dbh (i.e. initialize it).
$dbh->{FetchHashKeyName} = 'NAME_lc';
# Update the cxn's name. Until we connect, the DSN parts
# h and P are used. Once connected, use @@hostname.
my $sql = 'SELECT @@hostname, @@server_id';
@@ -176,6 +199,11 @@ sub dbh {
return $self->{dbh};
}
sub aux_dbh {
my ($self) = @_;
return $self->{aux_dbh};
}
# Sub: dsn
# Return the cxn's dsn.
sub dsn {
@@ -197,6 +225,10 @@ sub DESTROY {
MKDEBUG && _d('Disconnecting dbh', $self->{dbh}, $self->{name});
$self->{dbh}->disconnect();
}
if ( $self->{aux_dbh} ) {
MKDEBUG && _d('Disconnecting aux dbh', $self->{aux_dbh});
$self->{aux_dbh}->disconnect();
}
return;
}

View File

@@ -77,6 +77,7 @@ sub sync_table {
die "I need a $arg argument" unless $args{$arg};
}
my ($src, $dst, $row_syncer, $changer) = @args{@required_args};
my $changing_src = $args{changing_src};
my $o = $self->{OptionParser};
my $q = $self->{Quoter};
@@ -88,16 +89,14 @@ sub sync_table {
$host->{Cxn}->dbh()->do("USE " . $q->quote($host->{tbl}->{db}));
}
return $changer->get_changes() if $o->get('dry-run');
my $trace;
if ( !defined $args{trace} || $args{trace} ) {
chomp(my $hostname = `hostname`);
$trace = "src_host:" . $src->{Cxn}->name()
. " src_tbl:" . join('.', @{$src->{tbl}}{qw(db tbl)})
. "dst_host:" . $dst->{Cxn}->name()
. " dst_host:" . $dst->{Cxn}->name()
. " dst_tbl:" . join('.', @{$dst->{tbl}}{qw(db tbl)})
. " changing_src: " . ($args{changing_src} ? "yes" : "no")
. " changing_src:" . ($changing_src ? "yes" : "no")
. " " . join(" ", map { "$_:" . ($o->get($_) ? "yes" : "no") }
qw(lock transaction replicate bidirectional))
. " pid:$PID "
@@ -122,7 +121,7 @@ sub sync_table {
$src->{sql_lock} = 'FOR UPDATE';
$dst->{sql_lock} = 'FOR UPDATE';
}
elsif ( $args{changing_src} ) {
elsif ( $changing_src ) {
# Making changes on master (src) which replicate to slave (dst).
$src->{sql_lock} = 'FOR UPDATE';
$dst->{sql_lock} = 'LOCK IN SHARE MODE';
@@ -148,22 +147,48 @@ sub sync_table {
my $callbacks = {
init => sub {
my (%args) = @_;
my $cxn = $args{Cxn};
my $tbl = $args{tbl};
my $nibble_iter = $args{NibbleIterator};
my $sths = $nibble_iter->statements();
my $oktonibble = 1;
if ( $o->get('buffer-to-client') ) {
$host->{sth}->{mysql_use_result} = 1;
if ( $o->get('explain') ) {
# --explain level 1: print the checksum and next boundary
# statements.
print "--\n"
. "-- "
. ($cxn->{is_source} ? "Source" : "Destination")
. " " . $cxn->name()
. " " . "$tbl->{db}.$tbl->{tbl}\n"
. "--\n\n";
my $statements = $nibble_iter->statements();
foreach my $sth ( sort keys %$statements ) {
next if $sth =~ m/^explain/;
if ( $statements->{$sth} ) {
print $statements->{$sth}->{Statement}, "\n\n";
}
}
if ( $o->get('explain') < 2 ) {
$oktonibble = 0; # don't nibble table; next table
}
}
else {
if ( $o->get('buffer-to-client') ) {
$host->{sth}->{mysql_use_result} = 1;
}
# Lock the table.
$self->lock_and_wait(
lock_level => 2,
host => $host,
src => $src,
changing_src => $changing_src,
);
}
# Lock the table.
$self->lock_and_wait(
lock_level => 2,
host => $host,
src => $src,
OptionParser => $o,
);
return 1;
return $oktonibble;
},
exec_nibble => sub {
my (%args) = @_;
@@ -171,12 +196,29 @@ sub sync_table {
my $sths = $nibble_iter->statements();
my $boundary = $nibble_iter->boundaries();
# --explain level 2: print chunk,lower boundary values,upper
# boundary values.
if ( $o->get('explain') > 1 ) {
my $lb_quoted = join(',', @{$boundary->{lower} || []});
my $ub_quoted = join(',', @{$boundary->{upper} || []});
my $chunk = $nibble_iter->nibble_number();
printf "%d %s %s\n",
$chunk,
(defined $lb_quoted ? $lb_quoted : '1=1'),
(defined $ub_quoted ? $ub_quoted : '1=1');
if ( !$nibble_iter->more_boundaries() ) {
print "\n"; # blank line between this table and the next table
}
return 0; # next boundary
}
# Lock the chunk.
$self->lock_and_wait(
%args,
lock_level => 1,
host => $host,
src => $src,
OptionParser => $o,
changing_src => $changing_src,
);
# Execute the chunk checksum statement.
@@ -203,7 +245,7 @@ sub sync_table {
RowChecksum => $self->{RowChecksum},
);
if ( $host->{is_source} ) {
if ( $host->{Cxn}->{is_source} ) {
$src_nibble_iter = $nibble_iter;
}
else {
@@ -265,8 +307,8 @@ sub sync_table {
my $src_chunk = $src_nibble_iter->next();
my $dst_chunk = $dst_nibble_iter->next();
if ( $src_chunk->{cnt} != $dst_chunk->{cnt}
|| $src_chunk->{crc} ne $dst_chunk->{crc} ) {
if ( ($src_chunk->{cnt} || 0) != ($dst_chunk->{cnt} || 0)
|| ($src_chunk->{crc} || '') ne ($dst_chunk->{crc} || '') ) {
MKDEBUG && _d("Chunks differ");
my $boundary = $src_nibble_iter->boundaries();
foreach my $host ($src, $dst) {
@@ -292,6 +334,10 @@ sub sync_table {
# Get next chunks.
$src_nibble_iter->no_more_rows();
$dst_nibble_iter->no_more_rows();
my $changes_dbh = $changing_src ? $src->{Cxn}->dbh()
: $dst->{Cxn}->dbh();
$changes_dbh->commit() unless $changes_dbh->{AutoCommit};
}
$changer->process_rows(0, $trace);
@@ -392,8 +438,8 @@ sub lock_and_wait {
}
# Lock/start xa.
return $host->{is_source} ? $self->_lock_src(%args)
: $self->_lock_dst(%args);
return $host->{Cxn}->{is_source} ? $self->_lock_src(%args)
: $self->_lock_dst(%args);
}
sub _lock_src {
@@ -441,7 +487,7 @@ sub _lock_dst {
eval {
if ( my $timeout = $o->get('wait') ) {
my $ms = $self->{MasterSlave};
my $wait = $args{wait_retry_args}->{wait} || 10;
my $wait;
my $tries = $args{wait_retry_args}->{tries} || 3;
$self->{Retry}->retry(
tries => $tries,
@@ -459,7 +505,7 @@ sub _lock_dst {
# because the main dbh might be in use due to executing
# $src_sth.
$wait = $ms->wait_for_master(
master_status => $ms->get_master_status($src->{misc_dbh}),
master_status => $ms->get_master_status($src->{Cxn}->aux_dbh()),
slave_dbh => $host->{Cxn}->dbh(),
timeout => $timeout,
);