First working NibbleIterator.

This commit is contained in:
Daniel Nichter
2011-09-09 17:29:28 -06:00
parent 7fde36a65b
commit c989bd29dd
2 changed files with 306 additions and 69 deletions

View File

@@ -56,25 +56,34 @@ sub new {
asc_only => 1,
);
# Make SQL statements, prepared on first call to next(). The preamble
# and ORDER BY are the same for all statements. FORCE IDNEX and ORDER BY
# Make SQL statements, prepared on first call to next(). FROM and
# ORDER BY are the same for all statements. FORCE IDNEX and ORDER BY
# are needed to ensure deterministic nibbling.
my $nibble_sql_preamble
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{cols}})
. " FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. " FORCE INDEX(`$index`)";
my $from = " FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. " FORCE INDEX(`$index`)";
my $order_by = "ORDER BY " . join(', ', map {$q->quote($_)} @{$index_cols});
# This statement is only executed once, so it doesn't use a sth.
# These statements are only executed once, so they don't use sths.
my $first_lb_sql
= $nibble_sql_preamble
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$index_cols})
. " $from "
. ($args{where} ? " WHERE $args{where}" : '')
. " $order_by "
. " LIMIT 1"
. " /*first lower boundary*/";
MKDEBUG && _d('First lower boundary statement:', $first_lb_sql);
my $last_ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$index_cols})
. " $from "
. ($args{where} ? " WHERE $args{where}" : '')
. " $order_by DESC "
. " LIMIT 1"
. " /*last upper boundary*/";
MKDEBUG && _d('Last upper boundary statement:', $last_ub_sql);
# Nibbles are inclusive, so for a..z, the nibbles are: a-e, f-j, k-o, p-t,
# u-y, and z. This complicates getting the next upper boundary because
# if we use either (col >= lb AND col < ub) or (col > lb AND col <= ub)
@@ -84,8 +93,10 @@ sub new {
# the upper boundary for the current nibble *and* the lower boundary
# for the next nibble. See _next_boundaries().
my $ub_sql
= $nibble_sql_preamble
. " WHERE (" . $asc->{boundaries}->{'>='} . ")" # lower boundary
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$index_cols})
. " $from "
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. ($args{where} ? " AND ($args{where})" : '')
. " $order_by "
. " LIMIT 2 OFFSET " . (($o->get('chunk-size') || 1) - 1)
@@ -93,23 +104,38 @@ sub new {
MKDEBUG && _d('Next upper boundary statement:', $ub_sql);
my $nibble_sql
= $nibble_sql_preamble
. " WHERE (" . $asc->{boundaries}->{'>='} . ")" # lower boundary
. " AND (" . $asc->{boundaries}->{'<='} . ")" # upper boundary
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{cols}})
. " $from "
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " $order_by"
. " /*nibble*/";
MKDEBUG && _d('Nibble statement:', $nibble_sql);
# If the chunk size is >= number of rows in table, then we don't
# need to chunk; we can just select all rows, in order, at once.
my $one_nibble_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{cols}})
. " $from "
. ($args{where} ? " AND ($args{where})" : '')
. " $order_by"
. " /*one nibble*/";
MKDEBUG && _d('One nibble statement:', $one_nibble_sql);
my $self = {
%args,
asc => $asc,
first_lb_sql => $first_lb_sql,
ub_sql => $ub_sql,
nibble_sql => $nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
index => $index,
first_lb_sql => $first_lb_sql,
last_ub_sql => $last_ub_sql,
ub_sql => $ub_sql,
nibble_sql => $nibble_sql,
one_nibble_sql => $one_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
return bless $self, $class;
@@ -121,9 +147,13 @@ sub next {
# First call, init everything. This could be done in new(), but
# all work is delayed until actually needed.
if ($self->{nibbleno} == 0) {
$self->_can_nibble_once();
$self->_prepare_sths();
$self->_get_first_lb();
$self->_get_bounds();
# $self->_check_index_usage();
if ( my $callback = $self->{callbacks}->{init} ) {
$callback->();
}
}
# Return rows in nibble. sth->{Active} is always true with DBD::mysql v3,
@@ -135,42 +165,105 @@ sub next {
if ( $row ) {
$self->{rowno}++;
MKDEBUG && _d('Row', $self->{rowno}, 'in nibble', $self->{nibbleno});
if ( my $callback = $self->{callbacks}->{before_row} ) {
$callback->();
}
# fetchrow_arraryref re-uses its internal arrayref, so we must copy.
return [ @$row ];
}
MKDEBUG && _d('No more rowso in nibble', $self->{nibbleno});
MKDEBUG && _d('No more rows in nibble', $self->{nibbleno});
if ( my $callback = $self->{callbacks}->{after_nibble} ) {
$callback->();
}
$self->{rowno} = 0;
$self->{have_rows} = 0;
}
# If there's another boundary, fetch the rows within it.
if ( $self->_next_boundaries() ) {
$self->{nibbleno}++;
MKDEBUG && _d($self->{nibble_sth}->{Statement}, 'params:',
join(', ', (@{$self->{lb}}, @{$self->{ub}})));
$self->{nibble_sth}->execute(@{$self->{lb}}, @{$self->{ub}});
$self->{have_rows} = $self->{nibble_sth}->rows();
if ( $self->{have_rows} ) {
$self->{nibbleno}++;
MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno});
if ( my $callback = $self->{callbacks}->{before_nibble} ) {
$callback->();
}
return $self->next();
}
}
MKDEBUG && _d('Done nibbling');
if ( my $callback = $self->{callbacks}->{done} ) {
$callback->();
}
return;
}
sub nibble_number {
my ($self) = @_;
return $self->{nibbleno};
}
sub number_of_rows {
my ($self) = @_;
return $self->{have_rows};
}
sub row_number {
my ($self) = @_;
return $self->{nibbleno};
}
sub _can_nibble_once {
my ($self) = @_;
my ($dbh, $tbl, $q) = @{$self}{qw(dbh tbl Quoter)};
my $table_status;
eval {
my $sql = "SHOW TABLE STATUS FROM " . $q->quote($tbl->{db})
. " LIKE " . $q->literal_like($tbl->{tbl});
MKDEBUG && _d($sql);
$table_status = $dbh->selectrow_hashref($sql);
MKDEBUG && _d('Table status:', Dumper($table_status));
};
if ( $EVAL_ERROR ) {
warn $EVAL_ERROR;
return 0;
}
my $n_rows = defined $table_status->{Rows} ? $table_status->{Rows}
: defined $table_status->{rows} ? $table_status->{rows}
: 0;
my $chunk_size = $self->{OptionParser}->get('chunk-size') || 1;
$self->{one_nibble} = $n_rows <= $chunk_size ? 1 : 0;
MKDEBUG && _d('One nibble:', $self->{one_nibble} ? 'yes' : 'no');
return $self->{one_nibble};
}
sub _prepare_sths {
my ($self) = @_;
MKDEBUG && _d('Preparing statement handles');
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql});
$self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql});
if ( $self->{one_nibble} ) {
$self->{nibble_sth} = $self->{dbh}->prepare($self->{one_nibble_sql});
}
else {
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql});
$self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql});
}
}
sub _get_first_lb {
sub _get_bounds {
my ($self) = @_;
return if $self->{one_nibble};
$self->{next_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql});
MKDEBUG && _d('First lower boundary:', Dumper($self->{lb}));
MKDEBUG && _d('First lower boundary:', Dumper($self->{next_lb}));
$self->{last_ub} = $self->{dbh}->selectrow_arrayref($self->{last_ub_sql});
MKDEBUG && _d('Last upper boundary:', Dumper($self->{last_ub}));
return;
}
@@ -178,40 +271,20 @@ sub _check_index_usage {
my ($self) = @_;
my ($dbh, $tbl, $q) = @{$self}{qw(dbh tbl Quoter)};
my $table_status;
my $explain;
eval {
my $sql = "SHOW TABLE STATUS FROM " . $q->quote($tbl->{db})
. " LIKE " . $q->literal_like($tbl->{tbl});
MKDEBUG && _d($sql);
$table_status = $dbh->selectrow_hashref($sql);
$explain = $dbh->selectall_arrayref("", {Slice => {}});
};
MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR);
my $small_table;
if ( $table_status ) {
my $n_rows = defined $table_status->{Rows} ? $table_status->{Rows}
: defined $table_status->{rows} ? $table_status->{rows}
: undef;
$small_table = 1 if defined $n_rows && $n_rows <= 100;
if ( $EVAL_ERROR ) {
warn "Cannot check if MySQL is using the chunk index: $EVAL_ERROR";
return;
}
MKDEBUG && _d('Small table:', $small_table);
if ( !$small_table ) {
my $explain;
eval {
$explain = $dbh->selectall_arrayref("", {Slice => {}});
};
if ( $EVAL_ERROR ) {
MKDEBUG && _d($EVAL_ERROR);
return;
}
MKDEBUG && _d('EXPLAIN key:', $explain->[0]->{key});
my $explain_index = lc($explain->[0]->{key} || '');
if ( $explain_index ne lc($self->{asc}->{index}) ) {
die "Cannot nibble table $tbl->{db}.$tbl->{tbl} because MySQL chose "
. ($explain_index ? "the `$explain_index`" : 'no') . ' index'
. " instead of the `$self->{asc}->{index}` index";
}
my $explain_index = lc($explain->[0]->{key} || '');
MKDEBUG && _d('EXPLAIN index:', $explain_index);
if ( $explain_index ne $self->{index} ) {
die "Cannot nibble table $tbl->{db}.$tbl->{tbl} because MySQL chose "
. ($explain_index ? "the `$explain_index`" : 'no') . ' index'
. " instead of the chunk index `$self->{asc}->{index}`";
}
return;
@@ -225,25 +298,37 @@ sub _next_boundaries {
return;
}
if ( $self->{one_nibble} ) {
$self->{lb} = $self->{ub} = [];
$self->{no_more_boundaries} = 1; # for next call
return 1;
}
$self->{lb} = $self->{next_lb};
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
join(', ', @{$self->{lb}}));
$self->{ub_sth}->execute(@{$self->{lb}});
my $boundary = $self->{ub_sth}->fetchall_arrayref();
MKDEBUG && _d('Next boundary:', Dumper($boundary));
if ( $boundary && @$boundary ) {
$self->{ub} = $boundary->[0]; # this nibble
$self->{next_lb} = $boundary->[1]; # next nibble
$self->{ub_sth}->finish();
MKDEBUG && _d('Next upper boundary:', Dumper($self->{ub}));
$self->{ub} = $boundary->[0]; # this nibble
if ( $boundary->[1] ) {
$self->{next_lb} = $boundary->[1]; # next nibble
}
else {
$self->{no_more_boundaries} = 1; # for next call
MKDEBUG && _d('Last upper boundary:', Dumper($boundary->[0]));
}
}
else {
$self->{no_more_boundaries} = 1; # for next call
$self->{ub} = $self->{lb};
$self->{ub} = $self->{last_ub};
MKDEBUG && _d('Last upper boundary:', Dumper($self->{ub}));
}
$self->{ub_sth}->finish();
return 1; # have boundaries
return 1; # have boundary
}
sub _d {