Partial --resume implementation (work in progress).

This commit is contained in:
Daniel Nichter
2011-09-28 11:33:12 -06:00
parent 25882d49e7
commit 929000cfdd
3 changed files with 154 additions and 13 deletions

View File

@@ -3552,6 +3552,14 @@ sub nibble_number {
return $self->{nibbleno};
}
sub set_nibble_number {
my ($self, $n) = @_;
die "I need a number" unless $n;
$self->{nibbleno} = $n;
MKDEBUG && _d('Set new nibble number:', $n);
return;
}
sub nibble_index {
my ($self) = @_;
return $self->{index};
@@ -3570,10 +3578,9 @@ sub statements {
sub boundaries {
my ($self) = @_;
return {
first_lower => $self->{first_lb},
lower => $self->{lb},
next_lower => $self->{next_lb},
upper => $self->{ub},
next_lower => $self->{next_lb},
last_upper => $self->{last_ub},
};
}
@@ -3591,9 +3598,9 @@ sub chunk_size {
sub set_chunk_size {
my ($self, $limit) = @_;
return if $self->{one_nibble};
MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit);
die "Chunk size must be > 0" unless $limit;
$self->{limit} = $limit - 1;
MKDEBUG && _d('Set new chunk size (LIMIT):', $limit);
return;
}
@@ -3701,8 +3708,7 @@ sub _get_bounds {
my ($self) = @_;
return if $self->{one_nibble};
$self->{first_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql});
$self->{next_lb} = $self->{first_lb};
$self->{next_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql});
MKDEBUG && _d('First lower boundary:', Dumper($self->{next_lb}));
$self->{last_ub} = $self->{dbh}->selectrow_arrayref($self->{last_ub_sql});
@@ -4051,8 +4057,17 @@ sub new {
die "I need either a dbh or file_itr argument"
if (!$dbh && !$file_itr) || ($dbh && $file_itr);
my %resume;
if ( my $table = $args{resume} ) {
MKDEBUG && _d('Will resume from', $table);
my ($db, $tbl) = $args{Quoter}->split_unquote($table);
$resume{db} = $db;
$resume{tbl} = $tbl;
}
my $self = {
%args,
resume => \%resume,
filters => _make_filters(%args),
};
@@ -4113,7 +4128,7 @@ sub _make_filters {
return \%filters;
}
sub next_schema_object {
sub next {
my ( $self ) = @_;
my $schema_obj;
@@ -4163,7 +4178,8 @@ sub _iterate_files {
my $db = $1; # XXX
$db =~ s/^`//; # strip leading `
$db =~ s/`$//; # and trailing `
if ( $self->database_is_allowed($db) ) {
if ( $self->database_is_allowed($db)
&& $self->_resume_from_database($db) ) {
$self->{db} = $db;
}
}
@@ -4176,7 +4192,8 @@ sub _iterate_files {
my ($tbl) = $chunk =~ m/$tbl_name/;
$tbl =~ s/^\s*`//;
$tbl =~ s/`\s*$//;
if ( $self->table_is_allowed($self->{db}, $tbl) ) {
if ( $self->table_is_allowed($self->{db}, $tbl)
&& $self->_resume_from_table($tbl) ) {
my ($ddl) = $chunk =~ m/^(?:$open_comment)?(CREATE TABLE.+?;)$/ms;
if ( !$ddl ) {
warn "Failed to parse CREATE TABLE from\n" . $chunk;
@@ -4220,7 +4237,9 @@ sub _iterate_dbh {
}
if ( !$self->{db} ) {
$self->{db} = shift @{$self->{dbs}};
do {
$self->{db} = shift @{$self->{dbs}};
} until $self->_resume_from_database($self->{db});
MKDEBUG && _d('Next database:', $self->{db});
return unless $self->{db};
}
@@ -4242,6 +4261,7 @@ sub _iterate_dbh {
}
while ( my $tbl = shift @{$self->{tbls}} ) {
next unless $self->_resume_from_table($tbl);
my $engine;
if ( $self->{filters}->{'engines'}
|| $self->{filters}->{'ignore-engines'} ) {
@@ -4386,6 +4406,34 @@ sub engine_is_allowed {
return 1;
}
sub _resume_from_database {
my ($self, $db) = @_;
return 1 unless $self->{resume}->{db};
if ( $db eq $self->{resume}->{db} ) {
MKDEBUG && _d('At resume db', $db);
delete $self->{resume}->{db};
return 1;
}
return 0;
}
sub _resume_from_table {
my ($self, $tbl) = @_;
return 1 unless $self->{resume}->{tbl};
if ( $tbl eq $self->{resume}->{tbl} ) {
MKDEBUG && _d('At resume table', $tbl);
delete $self->{resume}->{tbl};
return 1;
}
return 0;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
@@ -5402,6 +5450,18 @@ sub main {
my $total_rate = 0;
my $limit = $o->get('chunk-size-limit');
# ########################################################################
# Resume
# ########################################################################
my $last_chunk;
if ( $o->get('resume') ) {
$last_chunk = last_chunk(
dbh => $dbh,
repl_table => $repl_table,
Quoter => $q,
);
}
# ########################################################################
# Callbacks for each table's nibble iterator. All checksum work is done
# in these callbacks and the subs that they call.
@@ -5410,11 +5470,38 @@ sub main {
init => sub {
my (%args) = @_;
my $tbl = $args{tbl};
my $oktonibble = 1;
if ( $o->get('empty-replicate-table') ) {
MKDEBUG && _d($delete_sth->{Statement});
$delete_sth->execute($tbl->{db}, $tbl->{tbl});
}
return 1; # continue nibbling
elsif ( $last_chunk ) {
my $nibble_iter = $args{NibbleIterator};
my $next_lb = next_lower_boundary(
%args,
last_chunk => $last_chunk,
);
if ( !$next_lb ) {
# TODO: this will print this table which was already done.
# Need some way to not print table; $tbl->{dont_print}=1?
MKDEBUG && _d('Resuming from last chunk in table;',
'getting next table');
$oktonibble = 0;
}
else {
MKDEBUG && _d('Resuming from chunk', $last_chunk->{chunk});
$nibble_iter->boundaries()->{next_lower} = $next_lb;
$nibble_iter->set_nibble_number($last_chunk->{chunk});
}
# Just need to call us once to kick-start the resume process.
$last_chunk = undef;
}
return $oktonibble; # continue nibbling?
},
next_boundaries => sub {
my (%args) = @_;
@@ -5603,13 +5690,14 @@ sub main {
# ########################################################################
my $schema_iter = new SchemaIterator(
dbh => $dbh,
resume => $last_chunk ? $q->quote(@{$last_chunk}{qw(db tbl)}) : "",
OptionParser => $o,
TableParser => $tp,
Quoter => $q,
);
TABLE:
while ( $oktorun && (my $tbl = $schema_iter->next_schema_object()) ) {
while ( $oktorun && (my $tbl = $schema_iter->next()) ) {
eval {
# Results, stats, and info related to checksuming this table can
# be saved here. print_checksum_results() uses this info.
@@ -6079,6 +6167,51 @@ sub table_progress {
return $pr;
}
sub last_chunk {
my (%args) = @_;
my @required_args = qw(dbh repl_table Quoter);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($dbh, $repl_table, $q) = @args{@required_args};
my $sql = "SELECT MAX(ts) FROM $repl_table";
MKDEBUG && _d($sql);
my ($max_ts) = $dbh->selectrow_array($sql);
if ( !$max_ts ) {
MKDEBUG && _d('Replicate table is empty; will not resume');
return;
}
$sql = "SELECT * FROM $repl_table "
. "WHERE ts='$max_ts' "
. "ORDER BY db DESC, tbl DESC, chunk DESC LIMIT 1";
MKDEBUG && _d($sql);
my $last_chunk = $dbh->selectrow_hashref($sql);
MKDEBUG && _d('Last chunk:', Dumper($last_chunk));
$sql = "SELECT MAX(chunk) FROM $repl_table "
. "WHERE db=" . $q->quote($last_chunk->{db})
. " AND tbl=" . $q->quote($last_chunk->{tbl});
MKDEBUG && _d($sql);
my ($max_chunk) = $dbh->selectrow_array($sql);
if ( ($last_chunk->{chunk} || 0) ne ($max_chunk || 0) ) {
warn "Not resuming from max chunk (" . ($last_chunk->{chunk} || 0)
. " != " . ($max_chunk || 0) > "); resuming may not work correctly.\n";
}
return $last_chunk;
}
sub next_lower_boundary {
my (%args) = @_;
my @required_args = qw();
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
return;
}
# Catches signals so we can exit gracefully.
sub sig_int {
my ( $signal ) = @_;

View File

@@ -287,6 +287,14 @@ sub nibble_number {
return $self->{nibbleno};
}
sub set_nibble_number {
my ($self, $n) = @_;
die "I need a number" unless $n;
$self->{nibbleno} = $n;
MKDEBUG && _d('Set new nibble number:', $n);
return;
}
sub nibble_index {
my ($self) = @_;
return $self->{index};
@@ -325,9 +333,9 @@ sub chunk_size {
sub set_chunk_size {
my ($self, $limit) = @_;
return if $self->{one_nibble};
MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit);
die "Chunk size must be > 0" unless $limit;
$self->{limit} = $limit - 1;
MKDEBUG && _d('Set new chunk size (LIMIT):', $limit);
return;
}

View File

@@ -74,7 +74,7 @@ sub make_nibble_iter {
Schema => $schema,
%common_modules,
);
1 while $si->next_schema_object();
1 while $si->next();
my $ni = new NibbleIterator(
dbh => $dbh,