Working --resume.

This commit is contained in:
Daniel Nichter
2011-09-29 09:17:36 -06:00
parent e2739d8a25
commit 206ac93214

View File

@@ -3474,6 +3474,13 @@ sub new {
nibble_sql => $nibble_sql,
explain_ub_sql => "EXPLAIN $ub_sql",
explain_nibble_sql => $explain_nibble_sql,
sql => {
columns => $asc->{scols},
from => $from,
where => $args{where},
boundaries => $asc->{boundaries},
order_by => $order_by,
},
};
}
@@ -3511,12 +3518,12 @@ sub next {
if ( !$self->{have_rows} ) {
$self->{nibbleno}++;
MKDEBUG && _d($self->{nibble_sth}->{Statement}, 'params:',
join(', ', (@{$self->{lb}}, @{$self->{ub}})));
join(', ', (@{$self->{lower}}, @{$self->{upper}})));
if ( my $callback = $self->{callbacks}->{exec_nibble} ) {
$self->{have_rows} = $callback->(%callback_args);
}
else {
$self->{nibble_sth}->execute(@{$self->{lb}}, @{$self->{ub}});
$self->{nibble_sth}->execute(@{$self->{lower}}, @{$self->{upper}});
$self->{have_rows} = $self->{nibble_sth}->rows();
}
MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno});
@@ -3578,13 +3585,26 @@ sub statements {
sub boundaries {
my ($self) = @_;
return {
lower => $self->{lb},
upper => $self->{ub},
next_lower => $self->{next_lb},
last_upper => $self->{last_ub},
lower => $self->{lower},
upper => $self->{upper},
next_lower => $self->{next_lower},
last_upper => $self->{last_upper},
};
}
sub set_boundary {
my ($self, $boundary, $values) = @_;
die "I need a boundary parameter"
unless $boundary;
die "Invalid boundary: $boundary"
unless $boundary =~ m/^(?:lower|upper|next_lower|last_upper)$/;
die "I need a values arrayref parameter"
unless $values && ref $values eq 'ARRAY';
$self->{$boundary} = $values;
MKDEBUG && _d('Set new', $boundary, 'boundary:', Dumper($values));
return;
}
sub one_nibble {
my ($self) = @_;
return $self->{one_nibble};
@@ -3604,6 +3624,11 @@ sub set_chunk_size {
return;
}
sub sql {
my ($self) = @_;
return $self->{sql};
}
sub _find_best_index {
my (%args) = @_;
my @required_args = qw(tbl TableParser dbh Quoter);
@@ -3708,12 +3733,12 @@ sub _get_bounds {
my ($self) = @_;
return if $self->{one_nibble};
$self->{next_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql});
MKDEBUG && _d('First lower boundary:', Dumper($self->{next_lb}));
$self->{last_ub} = $self->{dbh}->selectrow_arrayref($self->{last_ub_sql});
MKDEBUG && _d('Last upper boundary:', Dumper($self->{last_ub}));
$self->{next_lower} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql});
MKDEBUG && _d('First lower boundary:', Dumper($self->{next_lower}));
$self->{last_upper} = $self->{dbh}->selectrow_arrayref($self->{last_ub_sql});
MKDEBUG && _d('Last upper boundary:', Dumper($self->{last_upper}));
return;
}
@@ -3726,12 +3751,12 @@ sub _next_boundaries {
}
if ( $self->{one_nibble} ) {
$self->{lb} = $self->{ub} = [];
$self->{lower} = $self->{upper} = [];
$self->{no_more_boundaries} = 1; # for next call
return 1; # continue nibbling
}
if ( $self->identical_boundaries($self->{lb}, $self->{next_lb}) ) {
if ( $self->identical_boundaries($self->{lower}, $self->{next_lower}) ) {
MKDEBUG && _d('Infinite loop detected');
my $tbl = $self->{tbl};
my $index = $tbl->{tbl_struct}->{keys}->{$self->{index}};
@@ -3739,17 +3764,16 @@ sub _next_boundaries {
my $chunkno = $self->{nibbleno};
die "Possible infinite loop detected! "
. "The lower boundary for chunk $chunkno is "
. "<" . join(', ', @{$self->{lb}}) . "> and the lower "
. "<" . join(', ', @{$self->{lower}}) . "> and the lower "
. "boundary for chunk " . ($chunkno + 1) . " is also "
. "<" . join(', ', @{$self->{next_lb}}) . ">. "
. "<" . join(', ', @{$self->{next_lower}}) . ">. "
. "This usually happens when using a non-unique single "
. "column index. The current chunk index for table "
. "$tbl->{db}.$tbl->{tbl} is $self->{index} which is"
. ($index->{is_unique} ? '' : ' not') . " unique and covers "
. ($n_cols > 1 ? "$n_cols columns" : "1 column") . ".\n";
}
$self->{lb} = $self->{next_lb};
$self->{lower} = $self->{next_lower};
if ( my $callback = $self->{callbacks}->{next_boundaries} ) {
my $oktonibble = $callback->(
@@ -3765,14 +3789,14 @@ sub _next_boundaries {
}
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
join(', ', @{$self->{lb}}), $self->{limit});
$self->{ub_sth}->execute(@{$self->{lb}}, $self->{limit});
join(', ', @{$self->{lower}}), $self->{limit});
$self->{ub_sth}->execute(@{$self->{lower}}, $self->{limit});
my $boundary = $self->{ub_sth}->fetchall_arrayref();
MKDEBUG && _d('Next boundary:', Dumper($boundary));
if ( $boundary && @$boundary ) {
$self->{ub} = $boundary->[0]; # this nibble
$self->{upper} = $boundary->[0]; # this nibble
if ( $boundary->[1] ) {
$self->{next_lb} = $boundary->[1]; # next nibble
$self->{next_lower} = $boundary->[1]; # next nibble
}
else {
$self->{no_more_boundaries} = 1; # for next call
@@ -3781,8 +3805,8 @@ sub _next_boundaries {
}
else {
$self->{no_more_boundaries} = 1; # for next call
$self->{ub} = $self->{last_ub};
MKDEBUG && _d('Last upper boundary:', Dumper($self->{ub}));
$self->{upper} = $self->{last_upper};
MKDEBUG && _d('Last upper boundary:', Dumper($self->{upper}));
}
$self->{ub_sth}->finish();
@@ -5458,7 +5482,6 @@ sub main {
$last_chunk = last_chunk(
dbh => $dbh,
repl_table => $repl_table,
Quoter => $q,
);
}
@@ -5479,22 +5502,20 @@ sub main {
}
elsif ( $last_chunk ) {
my $nibble_iter = $args{NibbleIterator};
my $next_lb = next_lower_boundary(
my $next_lb = next_lower_boundary(
%args,
last_chunk => $last_chunk,
Quoter => $q,
);
if ( !$next_lb ) {
# TODO: this will print this table which was already done.
# Need some way to not print table; $tbl->{dont_print}=1?
MKDEBUG && _d('Resuming from last chunk in table;',
'getting next table');
$oktonibble = 0;
}
else {
MKDEBUG && _d('Resuming from chunk', $last_chunk->{chunk});
$nibble_iter->boundaries()->{next_lower} = $next_lb;
$nibble_iter->set_nibble_number($last_chunk->{chunk});
$nibble_iter->set_boundary('next_lower', $next_lb);
}
# Just need to call us once to kick-start the resume process.
@@ -5808,8 +5829,8 @@ sub exec_nibble {
my $sth = $nibble_iter->statements();
my $boundary = $nibble_iter->boundaries();
my $lb_quoted = join(',', map { $q->quote_val($_) } @{$boundary->{lower}});
my $ub_quoted = join(',', map { $q->quote_val($_) } @{$boundary->{upper}});
my $lb_quoted = join(',', @{$boundary->{lower}});
my $ub_quoted = join(',', @{$boundary->{upper}});
my $chunk = $nibble_iter->nibble_number();
my $chunk_index = $nibble_iter->nibble_index();
@@ -6169,7 +6190,7 @@ sub table_progress {
sub last_chunk {
my (%args) = @_;
my @required_args = qw(dbh repl_table Quoter);
my @required_args = qw(dbh repl_table);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
@@ -6191,8 +6212,7 @@ sub last_chunk {
MKDEBUG && _d('Last chunk:', Dumper($last_chunk));
$sql = "SELECT MAX(chunk) FROM $repl_table "
. "WHERE db=" . $q->quote($last_chunk->{db})
. " AND tbl=" . $q->quote($last_chunk->{tbl});
. "WHERE db='$last_chunk->{db}' AND tbl='$last_chunk->{tbl}'";
MKDEBUG && _d($sql);
my ($max_chunk) = $dbh->selectrow_array($sql);
if ( ($last_chunk->{chunk} || 0) ne ($max_chunk || 0) ) {
@@ -6205,11 +6225,40 @@ sub last_chunk {
sub next_lower_boundary {
my (%args) = @_;
my @required_args = qw();
my @required_args = qw(dbh tbl last_chunk NibbleIterator Quoter);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
return;
my ($dbh, $tbl, $last_chunk, $nibble_iter, $q) = @args{@required_args};
if ( $nibble_iter->nibble_index() ne ($last_chunk->{chunk_index} || '') ) {
warn "Cannot resume from table $tbl->{db}.$tbl->{tbl} chunk "
. "$last_chunk->{chunk} because the chunk index are different: "
. "$last_chunk->{chunk_index} was used originally but "
. $nibble_iter->nibble_index() . " is used now.\n";
$tbl->{checksum_results}->{skipped}++;
return;
}
my $sql = $nibble_iter->sql();
my $next_lb_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$sql->{columns}})
. " FROM $sql->{from}"
. " WHERE " . $sql->{boundaries}->{'>'}
. ($sql->{where} ? " AND ($sql->{where})" : '')
. " ORDER BY $sql->{order_by}"
. " LIMIT 1"
. " /*resume next lower boundary*/";
MKDEBUG && _d($next_lb_sql);
my $sth = $dbh->prepare($next_lb_sql);
my @ub = split ',', $last_chunk->{upper_boundary};
MKDEBUG && _d($sth->{Statement}, 'params:', @ub);
$sth->execute(@ub);
my $next_lb = $sth->fetchrow_arrayref();
$sth->finish();
return $next_lb;
}
# Catches signals so we can exit gracefully.