Detect infinite loops. Use best non-unique index. Disable chunk size limit if chunk index is unique.

This commit is contained in:
Daniel Nichter
2011-09-23 17:33:23 -06:00
parent 07cb6010a2
commit 9f3e05691c
4 changed files with 220 additions and 30 deletions

View File

@@ -57,10 +57,7 @@ sub new {
my ($dbh, $tbl, $chunk_size, $o, $q) = @args{@required_args};
# Get an index to nibble by. We'll order rows by the index's columns.
my $index = $args{TableParser}->find_best_index(
$tbl->{tbl_struct},
$args{chunk_index},
);
my $index = _find_best_index(%args);
die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index;
my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols};
@@ -290,6 +287,86 @@ sub set_chunk_size {
return;
}
sub _find_best_index {
my (%args) = @_;
my @required_args = qw(tbl TableParser dbh Quoter);
my ($tbl, $tp) = @args{@required_args};
my $tbl_struct = $tbl->{tbl_struct};
my $indexes = $tbl_struct->{keys};
my $best_index;
my @possible_indexes;
if ( my $want_index = $args{chunk_index} ) {
MKDEBUG && _d('Want to use nibble index', $want_index);
if ( $want_index eq 'PRIMARY' || $indexes->{$want_index}->{is_unique} ) {
$best_index = $want_index;
}
else {
push @possible_indexes, $want_index;
}
}
else {
foreach my $index ( $tp->sort_indexes($tbl_struct) ) {
if ( $index eq 'PRIMARY' || $indexes->{$index}->{is_unique} ) {
$best_index = $index;
last;
}
else {
push @possible_indexes, $index;
}
}
}
if ( !$best_index && @possible_indexes ) {
MKDEBUG && _d('No PRIMARY or unique indexes;',
'will use index with highest cardinality');
foreach my $index ( @possible_indexes ) {
$indexes->{$index}->{cardinality} = _get_index_cardinality(
%args,
index => $index,
);
}
@possible_indexes = sort {
# Prefer the index with the highest cardinality.
my $cmp
= $indexes->{$b}->{cardinality} <=> $indexes->{$b}->{cardinality};
if ( $cmp == 0 ) {
# Indexes have the same cardinality; prefer the one with
# more columns.
$cmp = scalar @{$indexes->{$b}->{cols}}
<=> scalar @{$indexes->{$a}->{cols}};
}
$cmp;
} @possible_indexes;
$best_index = $possible_indexes[0];
}
MKDEBUG && _d('Best index:', $best_index);
return $best_index;
}
sub _get_index_cardinality {
my (%args) = @_;
my @required_args = qw(dbh tbl index Quoter);
my ($dbh, $tbl, $index, $q) = @args{@required_args};
my $sql = "SHOW INDEXES FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. " WHERE Key_name = '$index'";
MKDEBUG && _d($sql);
my $cardinality = 1;
my $rows = $dbh->selectall_hashref($sql, 'key_name');
foreach my $row ( values %$rows ) {
$cardinality *= $row->{cardinality} if $row->{cardinality};
}
MKDEBUG && _d('Index', $index, 'cardinality:', $cardinality);
return $cardinality;
}
sub _can_nibble_index {
my ($index) = @_;
}
sub _can_nibble_once {
my ($self) = @_;
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
@@ -380,6 +457,22 @@ sub _next_boundaries {
if ( $boundary && @$boundary ) {
$self->{ub} = $boundary->[0]; # this nibble
if ( $boundary->[1] ) {
if ( $self->_identical_boundaries($boundary) ) {
my $tbl = $self->{tbl};
my $index = $tbl->{tbl_struct}->{keys}->{$self->{index}};
my $n_cols = scalar @{$index->{cols}};
my $chunkno = $self->{nibbleno} + 1;
die "Possible infinite loop detected! "
. "The upper boundary for chunk $chunkno is "
. "<" . join(', ', @{$boundary->[0]}) . "> and the lower "
. "boundary for chunk " . ($chunkno + 1) . " is also "
. "<" . join(', ', @{$boundary->[1]}) . ">. "
. "This usually happens when using a non-unique single "
. "column index. The current chunk index for table "
. "$tbl->{db}.$tbl->{tbl} is $self->{index} which is"
. ($index->{is_unique} ? '' : ' not') . " unique and covers "
. ($n_cols > 1 ? "$n_cols columns" : "1 column") . ".\n";
}
$self->{next_lb} = $boundary->[1]; # next nibble
}
else {
@@ -397,6 +490,20 @@ sub _next_boundaries {
return 1; # have boundary
}
sub _identical_boundaries {
my ($self, $boundaries) = @_;
my $ub = $boundaries->[0];
my $lb = $boundaries->[1];
return 0 unless $ub && $lb;
my $n_vals = scalar @$ub;
for my $i ( 0..($n_vals-1) ) {
# One diff means the bounds aren't identical.
return 0 if $lb->[$i] ne $ub->[$i];
}
MKDEBUG && _d('Infinite loop detected');
return 1;
}
sub DESTROY {
my ( $self ) = @_;
foreach my $key ( keys %$self ) {