Re-implement --explain.

This commit is contained in:
Daniel Nichter
2011-10-11 12:33:36 -06:00
parent 2de29214b9
commit ce9d7d461a
2 changed files with 215 additions and 167 deletions

View File

@@ -5350,6 +5350,8 @@ sub main {
my $set_on_connect = sub {
my ($dbh) = @_;
return if $o->get('explain');
my $sql = 'SET /*!50108 @@binlog_format := "STATEMENT"*/';
MKDEBUG && _d($dbh, $sql);
$dbh->do($sql);
@@ -5413,177 +5415,203 @@ sub main {
return $cxn;
};
# The dbh and dsn can be used before checksumming starts, but once
# inside the main TABLE loop, only use the master cxn because its
# dbh may be recreated.
my $master_cxn = $make_cxn->(dsn_string => shift @ARGV);
my $dbh = $master_cxn->dbh(); # just for brevity
my $dsn = $master_cxn->dsn(); # just for brevity
my $master_dbh = $master_cxn->dbh(); # just for brevity
my $master_dsn = $master_cxn->dsn(); # just for brevity
# ########################################################################
# Find and connect to slaves.
# If this is not a dry run (--explain was not specified), then we're
# going to checksum the tables, so do the necessary preparations and
# checks. Else, this all can be skipped because all we need for a
# dry run is a connection to the master.
# ########################################################################
my $q = new Quoter();
my $vp = new VersionParser();
my $ms = new MasterSlave(VersionParser => $vp);
my $slaves = $ms->get_slaves(
dbh => $dbh,
dsn => $dsn,
OptionParser => $o,
DSNParser => $dp,
Quoter => $q,
make_cxn => $make_cxn,
);
MKDEBUG && _d(scalar @$slaves, 'slaves found');
my $q = new Quoter();
my $tp = new TableParser(Quoter => $q);
my $rc = new RowChecksum(Quoter=> $q, OptionParser => $o);
my $vp = new VersionParser();
my $ms = new MasterSlave(VersionParser => $vp);
my $slave_lag_cxns;
if ( $o->get('check-slave-lag') ) {
MKDEBUG && _d('Will use --check-slave-lag to check for slave lag');
# OptionParser can't auto-copy DSN vals from a cmd line DSN
# to an opt DSN, so we copy them manually.
my $dsn = $dp->copy($master_cxn->dsn(), $o->get('check-slave-lag'));
my $cxn = $make_cxn->(dsn => $dsn);
$slave_lag_cxns = [ $cxn ];
}
else {
MKDEBUG && _d('Will check slave lag on all slaves');
$slave_lag_cxns = $slaves;
}
my $slaves; # all slaves (that we can find)
my $slave_lag_cxns; # slaves whose lag we'll check
my $replica_lag; # ReplicaLagWaiter object
# ########################################################################
# Check replication slaves and possibly exit.
# ########################################################################
my $rc = new RowChecksum(Quoter=> $q, OptionParser => $o);
my $repl_table = $q->quote($q->split_unquote($o->get('replicate')));
my $fetch_sth; # fetch chunk from repl table
my $update_sth; # update master_cnt and master_cnt in repl table
my $delete_sth; # delete checksums for one db.tbl from repl table
if ( $o->get('replicate-check') && !$o->get('recheck') ) {
MKDEBUG && _d('Will --replicate-check and exit');
if ( !$o->get('explain') ) {
# #####################################################################
# Find and connect to slaves.
# #####################################################################
$slaves = $ms->get_slaves(
dbh => $master_dbh,
dsn => $master_dsn,
OptionParser => $o,
DSNParser => $dp,
Quoter => $q,
make_cxn => $make_cxn,
);
MKDEBUG && _d(scalar @$slaves, 'slaves found');
foreach my $slave ( @$slaves ) {
my $diffs = $rc->find_replication_differences(
dbh => $slave->{dbh},
repl_table => $repl_table,
);
MKDEBUG && _d(scalar @$diffs, 'checksum diffs on', $slave->{dsn}->{n});
if ( @$diffs ) {
$exit_status |= 1;
if ( $o->get('quiet') < 2 ) {
print_checksum_diffs(
cxn => $slave,
diffs => $diffs,
);
if ( $o->get('check-slave-lag') ) {
MKDEBUG && _d('Will use --check-slave-lag to check for slave lag');
# OptionParser can't auto-copy DSN vals from a cmd line DSN
# to an opt DSN, so we copy them manually.
my $dsn = $dp->copy($master_dsn, $o->get('check-slave-lag'));
my $cxn = $make_cxn->(dsn => $dsn);
$slave_lag_cxns = [ $cxn ];
}
else {
MKDEBUG && _d('Will check slave lag on all slaves');
$slave_lag_cxns = $slaves;
}
# #####################################################################
# Check replication slaves and possibly exit.
# #####################################################################
if ( $o->get('replicate-check') && !$o->get('recheck') ) {
MKDEBUG && _d('Will --replicate-check and exit');
foreach my $slave ( @$slaves ) {
my $diffs = $rc->find_replication_differences(
dbh => $slave->dbh(),
repl_table => $repl_table,
);
MKDEBUG && _d(scalar @$diffs, 'checksum diffs on',
$slave->dsn()->{n});
if ( @$diffs ) {
$exit_status |= 1;
if ( $o->get('quiet') < 2 ) {
print_checksum_diffs(
cxn => $slave,
diffs => $diffs,
);
}
}
}
MKDEBUG && _d('Exit status', $exit_status, 'oktorun', $oktorun);
return $exit_status;
}
MKDEBUG && _d('Exit status', $exit_status, 'oktorun', $oktorun);
return $exit_status;
}
# #####################################################################
# Check that the replication table exists, or possibly create it.
# #####################################################################
check_repl_table(
dbh => $master_dbh,
repl_table => $repl_table,
OptionParser => $o,
TableParser => $tp,
Quoter => $q,
);
# ########################################################################
# Check that the replication table exists, or possibly create it.
# ########################################################################
my $tp = new TableParser(Quoter => $q);
check_repl_table(
dbh => $dbh,
repl_table => $repl_table,
OptionParser => $o,
TableParser => $tp,
Quoter => $q,
);
# ########################################################################
# Check for replication filters.
# ########################################################################
if ( $o->get('check-replication-filters') ) {
MKDEBUG && _d("Checking slave replication filters");
my @all_repl_filters;
foreach my $host ( @$slaves ) {
my $repl_filters = $ms->get_replication_filters(dbh=>$host->{dbh});
if ( keys %$repl_filters ) {
my $host = $dp->as_string($host->{dsn});
push @all_repl_filters,
{ name => $host,
filters => $repl_filters,
};
# #####################################################################
# Check for replication filters.
# #####################################################################
if ( $o->get('check-replication-filters') ) {
MKDEBUG && _d("Checking slave replication filters");
my @all_repl_filters;
foreach my $slave ( @$slaves ) {
my $repl_filters = $ms->get_replication_filters(
dbh => $slave->dbh(),
);
if ( keys %$repl_filters ) {
push @all_repl_filters,
{ name => $slave->dsn()->{n},
filters => $repl_filters,
};
}
}
if ( @all_repl_filters ) {
my $msg = "Replication filters are set on these hosts:\n";
foreach my $host ( @all_repl_filters ) {
my $filters = $host->{filters};
$msg .= " $host->{name}\n"
. join("\n", map { " $_ = $host->{filters}->{$_}" }
keys %{$host->{filters}})
. "\n";
}
$msg .= "Please read the --check-replication-filters documentation "
. "to learn how to solve this problem.";
die ts($msg);
}
}
if ( @all_repl_filters ) {
my $msg = "Replication filters are set on these hosts:\n";
foreach my $host ( @all_repl_filters ) {
my $filters = $host->{filters};
$msg .= " $host->{name}\n"
. join("\n", map { " $_ = $host->{filters}->{$_}" }
keys %{$host->{filters}})
. "\n";
# #####################################################################
# Make a ReplicaLagWaiter to help wait for slaves after each chunk.
# #####################################################################
my $sleep = sub {
# Don't let the master dbh die while waiting for slaves because we
# may wait a very long time for slaves.
# This is called from within the main TABLE loop, so use the
# master cxn; do not use $master_dbh.
my $dbh = $master_cxn->dbh();
if ( !$dbh || !$dbh->ping() ) {
MKDEBUG && _d('Lost connection to master while waiting for slave lag');
eval { $dbh = $master_cxn->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
$oktorun = 0; # Fatal error
chomp $EVAL_ERROR;
die "Lost connection to master while waiting for replica lag "
. "($EVAL_ERROR)";
}
}
$msg .= "Please read the --check-replication-filters documentation "
. "to learn how to solve this problem.";
die ts($msg);
}
}
$dbh->do("SELECT 'pt-table-checksum keepalive'");
sleep $o->get('check-interval');
return;
};
my $get_lag = sub {
my ($cxn) = @_;
my $dbh = $cxn->dbh();
if ( !$dbh || !$dbh->ping() ) {
MKDEBUG && _d('Lost connection to slave', $cxn->dsn()->{n},
'while waiting for slave lag');
eval { $dbh = $cxn->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
$oktorun = 0; # Fatal error
chomp $EVAL_ERROR;
die "Lost connection to replica " . $cxn->dsn()->{n}
. " while attempting to get its lag ($EVAL_ERROR)";
}
}
return $ms->get_slave_lag($dbh);
};
$replica_lag = new ReplicaLagWaiter(
slaves => $slave_lag_cxns,
max_lag => $o->get('max-lag'),
oktorun => sub { return $oktorun },
get_lag => $get_lag,
sleep => $sleep,
);
# #####################################################################
# Prepare statement handles to update the repl table on the master.
# #####################################################################
$fetch_sth = $master_dbh->prepare(
"SELECT this_crc, this_cnt FROM $repl_table "
. "WHERE db = ? AND tbl = ? AND chunk = ?");
$update_sth = $master_dbh->prepare(
"UPDATE $repl_table SET chunk_time = ?, master_crc = ?, master_cnt = ? "
. "WHERE db = ? AND tbl = ? AND chunk = ?");
$delete_sth = $master_dbh->prepare(
"DELETE FROM $repl_table WHERE db = ? AND tbl = ?");
} # !$o->get('explain')
# ########################################################################
# Checksum query statementn and sths to update the checksum table.
# Checksum args and the DMS part of the checksum query for each table.
# ########################################################################
my %crc_args = $rc->get_crc_args(dbh => $dbh);
my %crc_args = $rc->get_crc_args(dbh => $master_dbh);
my $checksum_dms = "REPLACE INTO $repl_table "
. "(db, tbl, chunk, chunk_index,"
. " lower_boundary, upper_boundary, this_cnt, this_crc) "
. "SELECT ?, ?, ?, ?, ?, ?,";
my $fetch_sth = $dbh->prepare(
"SELECT this_crc, this_cnt FROM $repl_table "
. "WHERE db = ? AND tbl = ? AND chunk = ?");
my $update_sth = $dbh->prepare(
"UPDATE $repl_table SET chunk_time = ?, master_crc = ?, master_cnt = ? "
. "WHERE db = ? AND tbl = ? AND chunk = ?");
my $delete_sth = $dbh->prepare(
"DELETE FROM $repl_table WHERE db = ? AND tbl = ?");
# ########################################################################
# Make a ReplicaLagWaiter to help wait for slaves after each chunk.
# ########################################################################
my $sleep = sub {
# Don't let the master dbh die while waiting for slaves because we
# may wait a very long time for slaves.
my $dbh = $master_cxn->dbh();
if ( !$dbh || !$dbh->ping() ) {
MKDEBUG && _d('Lost connection to master while waiting for slave lag');
eval { $dbh = $master_cxn->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
$oktorun = 0; # Fatal error
chomp $EVAL_ERROR;
die "Lost connection to master while waiting for replica lag "
. "($EVAL_ERROR)";
}
}
$dbh->do("SELECT 'pt-table-checksum keepalive'");
sleep $o->get('check-interval');
return;
};
my $get_lag = sub {
my ($cxn) = @_;
my $dbh = $cxn->dbh();
if ( !$dbh || !$dbh->ping() ) {
MKDEBUG && _d('Lost connection to slave', $cxn->dsn()->{n},
'while waiting for slave lag');
eval { $dbh = $cxn->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
$oktorun = 0; # Fatal error
chomp $EVAL_ERROR;
die "Lost connection to replica " . $cxn->dsn()->{n}
. " while attempting to get its lag ($EVAL_ERROR)";
}
}
return $ms->get_slave_lag($dbh);
};
my $replica_lag = new ReplicaLagWaiter(
slaves => $slave_lag_cxns,
max_lag => $o->get('max-lag'),
oktorun => sub { return $oktorun },
get_lag => $get_lag,
sleep => $sleep,
);
# ########################################################################
# Get last chunk for --resume.
@@ -5591,7 +5619,7 @@ sub main {
my $last_chunk;
if ( $o->get('resume') ) {
$last_chunk = last_chunk(
dbh => $dbh,
dbh => $master_dbh,
repl_table => $repl_table,
);
}
@@ -5613,17 +5641,27 @@ sub main {
my $callbacks = {
init => sub {
my (%args) = @_;
my $tbl = $args{tbl};
my $tbl = $args{tbl};
my $nibble_iter = $args{NibbleIterator};
my $oktonibble = 1;
my $oktonibble = 1;
if ( $o->get('explain') ) { # dry run (--explain)
# We're not going to checksum the table, just report the
# statements that we would use to checksum it.
print "--\n",
"-- Statements for table $tbl->{db}.$tbl->{tbl}:\n",
"--\n\n";
my $statements = $nibble_iter->statements();
foreach my $sth ( sort keys %$statements ) {
if ( $statements->{$sth} ) {
print $statements->{$sth}->{Statement}, "\n\n";
}
}
if ( $o->get('empty-replicate-table') ) {
MKDEBUG && _d($delete_sth->{Statement});
$delete_sth->execute($tbl->{db}, $tbl->{tbl});
$oktonibble = 0; # don't nibble table; next table
}
elsif ( $last_chunk ) {
my $nibble_iter = $args{NibbleIterator};
my $next_lb = next_lower_boundary(
elsif ( $last_chunk ) { # resuming
my $next_lb = next_lower_boundary(
%args,
last_chunk => $last_chunk,
Quoter => $q,
@@ -5633,7 +5671,7 @@ sub main {
# of a table. So we just start with the next table.
MKDEBUG && _d('Resuming from last chunk in table;',
'getting next table');
$oktonibble = 0; # stop nibbling table
$oktonibble = 0; # don't nibbling table; next table
}
else {
$nibble_iter->set_nibble_number($last_chunk->{chunk});
@@ -5649,6 +5687,10 @@ sub main {
# Just need to call us once to kick-start the resume process.
$last_chunk = undef;
}
elsif ( $o->get('empty-replicate-table') ) {
MKDEBUG && _d($delete_sth->{Statement});
$delete_sth->execute($tbl->{db}, $tbl->{tbl});
}
return $oktonibble; # continue nibbling table?
},
@@ -5871,8 +5913,8 @@ sub main {
while ( $chunks[0] < $max_chunk ) {
for my $i ( 0..$n_slaves ) {
my $slave = $slaves->[$i];
my ($chunk) = $slave->{dbh}->selectrow_array($sql);
MKDEBUG && _d($slave->{dsn}->{n}, 'max chunk:', $chunk);
my ($chunk) = $slave->dbh()->selectrow_array($sql);
MKDEBUG && _d($slave->dsn()->{n}, 'max chunk:', $chunk);
$chunks[$i] = $chunk || 0;
}
@chunks = sort { $a <=> $b } @chunks;
@@ -5889,12 +5931,12 @@ sub main {
foreach my $slave ( @$slaves ) {
my $diffs = $rc->find_replication_differences(
dbh => $slave->{dbh},
dbh => $slave->dbh(),
repl_table => $repl_table,
where => "db='$tbl->{db}' AND tbl='$tbl->{tbl}'",
);
MKDEBUG && _d(scalar @$diffs, 'checksum diffs on',
$slave->{dsn}->{n});
$slave->dsn()->{n});
if ( @$diffs ) {
$tbl->{checksum_results}->{diffs} = scalar @$diffs;
}
@@ -5917,7 +5959,7 @@ sub main {
# Checksum each table.
# ########################################################################
my $schema_iter = new SchemaIterator(
dbh => $dbh,
dbh => $master_dbh,
resume => $last_chunk ? $q->quote(@{$last_chunk}{qw(db tbl)}) : "",
OptionParser => $o,
TableParser => $tp,
@@ -5934,7 +5976,7 @@ sub main {
# USE the correct db while checksumming this table. The "correct"
# db is a complicated subject; see sub for comments.
use_repl_db(
dbh => $dbh,
dbh => $master_cxn->dbh(),
tbl => $tbl,
repl_table => $repl_table,
OptionParser => $o,
@@ -5956,7 +5998,7 @@ sub main {
# if the table has no indexes and is too large to checksum in
# one chunk.
my $checksum_cols = $rc->make_chunk_checksum(
dbh => $dbh,
dbh => $master_cxn->dbh(),
tbl => $tbl,
%crc_args
);
@@ -5993,7 +6035,7 @@ sub main {
# is, etc. But just in case, all tables have a Progress obj.
if ( $o->get('progress') ) {
$tbl->{progress} = table_progress(
dbh => $dbh,
dbh => $master_cxn->dbh(),
tbl => $tbl,
OptionParser => $o,
Quoter => $q,
@@ -6252,7 +6294,7 @@ sub print_checksum_diffs {
}
my ($cxn, $diffs) = @args{@required_args};
print "Differences on $cxn->{dsn}->{n}\n";
print "Differences on ", $cxn->dsn()->{n}, "\n";
print join(' ', map { uc $_ } @headers), "\n";
foreach my $diff ( @$diffs ) {
print join(' ', map { defined $_ ? $_ : '' } @{$diff}{@headers}), "\n";
@@ -6914,6 +6956,12 @@ short form: -e; type: hash; group: Filter
Do only this comma-separated list of storage engines.
=item --explain
group: Output
Show, but do not execute, checksum queries (disables L<"--[no]empty-replicate-table">).
=item --float-precision
type: int

View File

@@ -28,8 +28,8 @@ else {
}
my $output;
my $cnf='/tmp/12345/my.sandbox.cnf';
my @args = ('-F', $cnf, 'h=127.1', qw(-t test.ascii --chunk-column c));
my $cnf ='/tmp/12345/my.sandbox.cnf';
my @args = ("F=$cnf", qw(--lock-wait-timeout 3 --chunk-time 0 --chunk-size-limit 0 --tables test.ascii));
$sb->create_dbs($dbh, ['test']);
$sb->load_file('master', "t/lib/samples/char-chunking/ascii.sql", 'test');