Handle one-chunk tables. Chunk tables w/o indexes if they're small enough. Check index for every chunk. Check chunk size if ub and next_lb are equal.

This commit is contained in:
Daniel Nichter
2011-09-26 11:38:35 -06:00
parent 10e31c1b63
commit c9e8444166
5 changed files with 517 additions and 437 deletions

View File

@@ -2987,37 +2987,7 @@ sub make_row_checksum {
my $q = $self->{Quoter};
my $tbl_struct = $tbl->{tbl_struct};
my $func = $args{func} || uc($o->get('function'));
my $trim = $o->get('trim');
my $float_precision = $o->get('float-precision');
my $sep = $o->get('separator') || '#';
$sep =~ s/'//g;
$sep ||= '#';
my $ignore_col = $o->get('ignore-columns') || {};
my $all_cols = $o->get('columns') || $tbl_struct->{cols};
my %cols = map { lc($_) => 1 } grep { !$ignore_col->{$_} } @$all_cols;
my %seen;
my @cols =
map {
my $type = $tbl_struct->{type_for}->{$_};
my $result = $q->quote($_);
if ( $type eq 'timestamp' ) {
$result .= ' + 0';
}
elsif ( $float_precision && $type =~ m/float|double/ ) {
$result = "ROUND($result, $float_precision)";
}
elsif ( $trim && $type =~ m/varchar/ ) {
$result = "TRIM($result)";
}
$result;
}
grep {
$cols{$_} && !$seen{$_}++
}
@{$tbl_struct->{cols}};
my $cols = $self->get_checksum_columns(%args);
my $query;
if ( !$args{no_cols} ) {
@@ -3033,26 +3003,30 @@ sub make_row_checksum {
$col .= " AS $real_col";
}
$col;
} @cols)
} @{$cols->{select}})
. ', ';
}
if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) {
my @nulls = grep { $cols{$_} } @{$tbl_struct->{null_cols}};
my $sep = $o->get('separator') || '#';
$sep =~ s/'//g;
$sep ||= '#';
my @nulls = grep { $cols->{allowed}->{$_} } @{$tbl_struct->{null_cols}};
if ( @nulls ) {
my $bitmap = "CONCAT("
. join(', ', map { 'ISNULL(' . $q->quote($_) . ')' } @nulls)
. ")";
push @cols, $bitmap;
push @{$cols->{select}}, $bitmap;
}
$query .= @cols > 1
? "$func(CONCAT_WS('$sep', " . join(', ', @cols) . '))'
: "$func($cols[0])";
$query .= @{$cols->{select}} > 1
? "$func(CONCAT_WS('$sep', " . join(', ', @{$cols->{select}}) . '))'
: "$func($cols->{select}->[0])";
}
else {
my $fnv_func = uc $func;
$query .= "$fnv_func(" . join(', ', @cols) . ')';
$query .= "$fnv_func(" . join(', ', @{$cols->{select}}) . ')';
}
MKDEBUG && _d('Row checksum:', $query);
@@ -3098,6 +3072,50 @@ sub make_chunk_checksum {
return $select;
}
sub get_checksum_columns {
my ($self, %args) = @_;
my @required_args = qw(tbl);
foreach my $arg( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($tbl) = @args{@required_args};
my $o = $self->{OptionParser};
my $q = $self->{Quoter};
my $trim = $o->get('trim');
my $float_precision = $o->get('float-precision');
my $tbl_struct = $tbl->{tbl_struct};
my $ignore_col = $o->get('ignore-columns') || {};
my $all_cols = $o->get('columns') || $tbl_struct->{cols};
my %cols = map { lc($_) => 1 } grep { !$ignore_col->{$_} } @$all_cols;
my %seen;
my @cols =
map {
my $type = $tbl_struct->{type_for}->{$_};
my $result = $q->quote($_);
if ( $type eq 'timestamp' ) {
$result .= ' + 0';
}
elsif ( $float_precision && $type =~ m/float|double/ ) {
$result = "ROUND($result, $float_precision)";
}
elsif ( $trim && $type =~ m/varchar/ ) {
$result = "TRIM($result)";
}
$result;
}
grep {
$cols{$_} && !$seen{$_}++
}
@{$tbl_struct->{cols}};
return {
select => \@cols,
allowed => \%cols,
};
}
sub get_crc_args {
my ($self, %args) = @_;
my $func = $args{func} || $self->_get_hash_func(%args);
@@ -3331,118 +3349,139 @@ sub new {
}
my ($dbh, $tbl, $chunk_size, $o, $q) = @args{@required_args};
my $one_nibble = !defined $args{one_nibble} || $args{one_nibble}
? _can_nibble_once(%args)
: 0;
my $index = _find_best_index(%args);
die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index;
my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols};
if ( !$index && !$one_nibble ) {
die "Cannot chunk table $tbl->{db}.$tbl->{tbl} because there is "
. "no good index and the table is oversized.";
}
my $asc = $args{TableNibbler}->generate_asc_stmt(
%args,
tbl_struct => $tbl->{tbl_struct},
index => $index,
asc_only => 1,
);
MKDEBUG && _d('Ascend params:', Dumper($asc));
my $self;
if ( $one_nibble ) {
my $tbl_struct = $tbl->{tbl_struct};
my $ignore_col = $o->get('ignore-columns') || {};
my $all_cols = $o->get('columns') || $tbl_struct->{cols};
my @cols = grep { !$ignore_col->{$_} } @$all_cols;
my $from = $q->quote(@{$tbl}{qw(db tbl)}) . " FORCE INDEX(`$index`)";
my $order_by = join(', ', map {$q->quote($_)} @{$index_cols});
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @cols))
. " FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. ($args{where} ? " AND ($args{where})" : '')
. " /*one nibble*/";
MKDEBUG && _d('One nibble statement:', $nibble_sql);
my $first_lb_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY $order_by"
. " LIMIT 1"
. " /*first lower boundary*/";
MKDEBUG && _d('First lower boundary statement:', $first_lb_sql);
my $explain_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @cols))
. " FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. ($args{where} ? " AND ($args{where})" : '')
. " /*explain one nibble*/";
MKDEBUG && _d('Explain one nibble statement:', $explain_nibble_sql);
my $last_ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY "
. join(' DESC, ', map {$q->quote($_)} @{$index_cols}) . ' DESC'
. " LIMIT 1"
. " /*last upper boundary*/";
MKDEBUG && _d('Last upper boundary statement:', $last_ub_sql);
$self = {
%args,
one_nibble => 1,
limit => 0,
nibble_sql => $nibble_sql,
explain_nibble_sql => $explain_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
}
else {
my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols};
my $ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='}
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " LIMIT ?, 2"
. " /*upper boundary*/";
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
my $asc = $args{TableNibbler}->generate_asc_stmt(
%args,
tbl_struct => $tbl->{tbl_struct},
index => $index,
asc_only => 1,
);
MKDEBUG && _d('Ascend params:', Dumper($asc));
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*nibble*/";
MKDEBUG && _d('Nibble statement:', $nibble_sql);
my $from = $q->quote(@{$tbl}{qw(db tbl)}) . " FORCE INDEX(`$index`)";
my $order_by = join(', ', map {$q->quote($_)} @{$index_cols});
my $explain_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*explain nibble*/";
MKDEBUG && _d('Explain nibble statement:', $explain_nibble_sql);
my $first_lb_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY $order_by"
. " LIMIT 1"
. " /*first lower boundary*/";
MKDEBUG && _d('First lower boundary statement:', $first_lb_sql);
my $one_nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*one nibble*/";
MKDEBUG && _d('One nibble statement:', $one_nibble_sql);
my $last_ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY "
. join(' DESC, ', map {$q->quote($_)} @{$index_cols}) . ' DESC'
. " LIMIT 1"
. " /*last upper boundary*/";
MKDEBUG && _d('Last upper boundary statement:', $last_ub_sql);
my $explain_one_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*explain one nibble*/";
MKDEBUG && _d('Explain one nibble statement:', $explain_one_nibble_sql);
my $ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='}
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " LIMIT ?, 2"
. " /*upper boundary*/";
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
my $limit = $chunk_size - 1;
MKDEBUG && _d('Initial chunk size (LIMIT):', $limit);
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*nibble*/";
MKDEBUG && _d('Nibble statement:', $nibble_sql);
my $self = {
%args,
asc => $asc,
index => $index,
from => $from,
order_by => $order_by,
limit => $limit,
first_lb_sql => $first_lb_sql,
last_ub_sql => $last_ub_sql,
ub_sql => $ub_sql,
nibble_sql => $nibble_sql,
explain_nibble_sql => $explain_nibble_sql,
one_nibble_sql => $one_nibble_sql,
explain_one_nibble_sql => $explain_one_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
my $explain_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*explain nibble*/";
MKDEBUG && _d('Explain nibble statement:', $explain_nibble_sql);
my $limit = $chunk_size - 1;
MKDEBUG && _d('Initial chunk size (LIMIT):', $limit);
$self = {
%args,
index => $index,
limit => $limit,
first_lb_sql => $first_lb_sql,
last_ub_sql => $last_ub_sql,
ub_sql => $ub_sql,
nibble_sql => $nibble_sql,
explain_nibble_sql => $explain_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
}
return bless $self, $class;
}
@@ -3451,7 +3490,6 @@ sub next {
my ($self) = @_;
if ($self->{nibbleno} == 0) {
$self->_can_nibble_once();
$self->_prepare_sths();
$self->_get_bounds();
if ( my $callback = $self->{callbacks}->{init} ) {
@@ -3481,10 +3519,10 @@ sub next {
$self->{nibble_sth}->execute(@{$self->{lb}}, @{$self->{ub}});
$self->{have_rows} = $self->{nibble_sth}->rows();
}
MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno});
}
if ( $self->{have_rows} ) {
MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno});
my $row = $self->{nibble_sth}->fetchrow_arrayref();
if ( $row ) {
$self->{rowno}++;
@@ -3527,9 +3565,20 @@ sub nibble_index {
return $self->{index};
}
sub boundaries {
my ($self) = @_;
return $self->{lb}, $self->{ub}, $self->{next_lb};
}
sub one_nibble {
my ($self) = @_;
return $self->{one_nibble};
}
sub set_chunk_size {
my ($self, $limit) = @_;
MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit);
die "Chunk size must be > 0" unless $limit;
$self->{limit} = $limit - 1;
return;
}
@@ -3607,38 +3656,28 @@ sub _get_index_cardinality {
return $cardinality;
}
sub _can_nibble_index {
my ($index) = @_;
}
sub _can_nibble_once {
my ($self) = @_;
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
my (%args) = @_;
my @required_args = qw(dbh tbl chunk_size OptionParser TableParser);
my ($dbh, $tbl, $chunk_size, $o, $tp) = @args{@required_args};
my ($table_status) = $tp->get_table_status($dbh, $tbl->{db}, $tbl->{tbl});
MKDEBUG && _d('TABLE STATUS', Dumper($table_status));
my $n_rows = $table_status->{rows} || 0;
my $chunk_size = $self->{OptionParser}->get('chunk-size') || 1;
$self->{one_nibble} = $n_rows <= $chunk_size ? 1 : 0;
MKDEBUG && _d('One nibble:', $self->{one_nibble} ? 'yes' : 'no');
return $self->{one_nibble};
my $limit = $o->get('chunk-size-limit');
my $one_nibble = $n_rows < $chunk_size * $limit ? 1 : 0;
MKDEBUG && _d('One nibble:', $one_nibble ? 'yes' : 'no');
return $one_nibble;
}
sub _prepare_sths {
my ($self) = @_;
MKDEBUG && _d('Preparing statement handles');
if ( $self->{one_nibble} ) {
$self->{nibble_sth} = $self->{dbh}->prepare($self->{one_nibble_sql})
unless $self->{nibble_sth};
$self->{explain_sth} = $self->{dbh}->prepare($self->{explain_one_nibble_sql})
unless $self->{explain_sth};
}
else {
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql})
unless $self->{ub_sth};
$self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql})
unless $self->{nibble_sth};
$self->{explain_sth} = $self->{dbh}->prepare($self->{explain_nibble_sql})
unless $self->{explain_sth};
if ( !$self->{one_nibble} ) {
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql});
}
$self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql});
$self->{explain_sth} = $self->{dbh}->prepare($self->{explain_nibble_sql});
return;
}
sub _get_bounds {
@@ -3654,29 +3693,6 @@ sub _get_bounds {
return;
}
sub _check_index_usage {
my ($self) = @_;
my ($dbh, $tbl, $q) = @{$self}{qw(dbh tbl Quoter)};
my $explain;
eval {
$explain = $dbh->selectall_arrayref("", {Slice => {}});
};
if ( $EVAL_ERROR ) {
warn "Cannot check if MySQL is using the chunk index: $EVAL_ERROR";
return;
}
my $explain_index = lc($explain->[0]->{key} || '');
MKDEBUG && _d('EXPLAIN index:', $explain_index);
if ( $explain_index ne $self->{index} ) {
die "Cannot nibble table $tbl->{db}.$tbl->{tbl} because MySQL chose "
. ($explain_index ? "the `$explain_index`" : 'no') . ' index'
. " instead of the chunk index `$self->{asc}->{index}`";
}
return;
}
sub _next_boundaries {
my ($self) = @_;
@@ -3691,6 +3707,23 @@ sub _next_boundaries {
return 1;
}
if ( $self->identical_boundaries($self->{lb}, $self->{next_lb}) ) {
MKDEBUG && _d('Infinite loop detected');
my $tbl = $self->{tbl};
my $index = $tbl->{tbl_struct}->{keys}->{$self->{index}};
my $n_cols = scalar @{$index->{cols}};
my $chunkno = $self->{nibbleno};
die "Possible infinite loop detected! "
. "The lower boundary for chunk $chunkno is "
. "<" . join(', ', @{$self->{lb}}) . "> and the lower "
. "boundary for chunk " . ($chunkno + 1) . " is also "
. "<" . join(', ', @{$self->{next_lb}}) . ">. "
. "This usually happens when using a non-unique single "
. "column index. The current chunk index for table "
. "$tbl->{db}.$tbl->{tbl} is $self->{index} which is"
. ($index->{is_unique} ? '' : ' not') . " unique and covers "
. ($n_cols > 1 ? "$n_cols columns" : "1 column") . ".\n";
}
$self->{lb} = $self->{next_lb};
MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:',
@@ -3701,22 +3734,6 @@ sub _next_boundaries {
if ( $boundary && @$boundary ) {
$self->{ub} = $boundary->[0]; # this nibble
if ( $boundary->[1] ) {
if ( $self->_identical_boundaries($boundary) ) {
my $tbl = $self->{tbl};
my $index = $tbl->{tbl_struct}->{keys}->{$self->{index}};
my $n_cols = scalar @{$index->{cols}};
my $chunkno = $self->{nibbleno} + 1;
die "Possible infinite loop detected! "
. "The upper boundary for chunk $chunkno is "
. "<" . join(', ', @{$boundary->[0]}) . "> and the lower "
. "boundary for chunk " . ($chunkno + 1) . " is also "
. "<" . join(', ', @{$boundary->[1]}) . ">. "
. "This usually happens when using a non-unique single "
. "column index. The current chunk index for table "
. "$tbl->{db}.$tbl->{tbl} is $self->{index} which is"
. ($index->{is_unique} ? '' : ' not') . " unique and covers "
. ($n_cols > 1 ? "$n_cols columns" : "1 column") . ".\n";
}
$self->{next_lb} = $boundary->[1]; # next nibble
}
else {
@@ -3734,16 +3751,19 @@ sub _next_boundaries {
return 1; # have boundary
}
sub _identical_boundaries {
my ($self, $boundaries) = @_;
my $ub = $boundaries->[0];
my $lb = $boundaries->[1];
return 0 unless $ub && $lb;
my $n_vals = scalar @$ub;
sub identical_boundaries {
my ($self, $b1, $b2) = @_;
return 0 if ($b1 && !$b2) || (!$b1 && $b2);
return 1 if !$b1 && !$b2;
die "Boundaries have different numbers of values"
if scalar @$b1 != scalar @$b2; # shouldn't happen
my $n_vals = scalar @$b1;
for my $i ( 0..($n_vals-1) ) {
return 0 if $lb->[$i] ne $ub->[$i];
return 0 if $b1->[$i] ne $b2->[$i]; # diff
}
MKDEBUG && _d('Infinite loop detected');
return 1;
}
@@ -5108,6 +5128,13 @@ sub main {
my $dp = $o->DSNParser();
$dp->prop('set-vars', $o->get('set-vars'));
# Add the --replicate table to --ignore-tables.
my %ignore_tables = (
%{$o->get('ignore-tables')},
$o->get('replicate') => 1,
);
$o->set('ignore-tables', \%ignore_tables);
if ( !$o->get('help') ) {
if ( !@ARGV ) {
$o->save_error("No host specified");
@@ -5339,6 +5366,7 @@ sub main {
my $total_rows = 0;
my $total_time = 0;
my $total_rate = 0;
my $limit = $o->get('chunk-size-limit');
# ########################################################################
# Callbacks for each table's nibble iterator. All checksum work is done
@@ -5347,24 +5375,42 @@ sub main {
my $callbacks = {
exec_nibble => sub {
my (%args) = @_;
my $tbl = $args{tbl};
my $nibble_iter = $args{NibbleIterator};
my $tbl = $args{tbl};
# Count every chunk, even if it's ultimately skipped, etc.
$tbl->{checksum_results}->{n_chunks}++;
# Check if the chunk is too large. If yes, then return 0 to
# skip this chunk and get fetch the next boundary.
if ( $tbl->{chunk_size_limit} ) {
my $is_oversize = is_oversize_chunk(
%args,
chunk_size => $tbl->{chunk_size},
limit => $tbl->{chunk_size_limit},
);
if ( $is_oversize ) {
# If the table is being chunk (i.e., it's not small enough to be
# consumed by one nibble), then check index usage and chunk size.
if ( !$nibble_iter->one_nibble() ) {
my $expl = explain_chunk(%args);
my $oversize_chunk
= $limit ? ($expl->{rows} || 0) >= $tbl->{chunk_size} * $limit
: 0;
# Ensure that MySQL is using the chunk index.
if ( ($expl->{key} || '') ne $nibble_iter->nibble_index() ) {
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
"$tbl->{db}.$tbl->{tbl}", 'is too large');
"$tbl->{db}.$tbl->{tbl} not using chunk index, skipping");
$tbl->{checksum_results}->{skipped}++;
$tbl->{nibble_time} = 0;
return 0; # next boundary
}
# Check chunk size limit if the upper boundary (ub) and next lower
# boundary (next_lb) are identical.
if ( $limit ) {
my (undef, $ub, $next_lb) = $nibble_iter->boundaries();
if ( $nibble_iter->identical_boundaries($ub, $next_lb)
&& $oversize_chunk ) {
MKDEBUG && _d('Chunk', $args{nibbleno}, 'of table',
"$tbl->{db}.$tbl->{tbl} is too large, skipping");
$tbl->{checksum_results}->{skipped}++;
$tbl->{nibble_time} = 0;
return 0; # next boundary
}
}
}
# Exec and time the chunk checksum query. If it fails, retry.
@@ -5530,23 +5576,9 @@ sub main {
Quoter => $q,
TableNibbler => $tn,
TableParser => $tp,
RowChecksum => $rc,
);
# If the chunk index is unique, then we'll always get the exact number of
# rows request (or less for the final chunk sometimes), so we disable the
# chunk size limit.
my $chunk_index = $nibble_iter->nibble_index();
if ( $tbl->{tbl_struct}->{keys}->{$chunk_index}->{is_unique} ) {
MKDEBUG && _d('Disabling chunk size limit for table because',
'chunk index', $chunk_index, 'is unique');
$tbl->{chunk_size_limit} = 0;
}
else {
MKDEBUG && _d('Enabling chunk size limit for table because',
'chunk index', $chunk_index, 'is not unique');
$tbl->{chunk_size_limit} = $o->get('chunk-size-limit');
}
# Finally, checksum the table.
# The "1 while" loop is necessary because we're executing REPLACE
# statements which don't return rows and NibbleIterator only
@@ -5866,45 +5898,39 @@ sub create_repl_table {
return;
}
# Sub: is_oversize_chunk
# Determine if the chunk is oversize.
# Sub: explain_chunk
# EXPLAIN a chunk checksum query.
#
# Required Arguments:
# * tbl - Standard tbl hashref
# * explain_sth - Sth to EXPLAIN the chunking query
# * lb - Arrayref with lower boundary values for explain_sth
# * ub - Arrayref with upper boundary values for explain_sth
# * chunk_size - Chunk size
# * limit - Chunk size limit
#
# Returns:
# True if EXPLAIN rows is >= chunk-size * chunk-size-limit, else false
sub is_oversize_chunk {
# Hashref with EXPLAIN plan.
sub explain_chunk {
my ( %args ) = @_;
my @required_args = qw(tbl explain_sth lb ub chunk_size limit);
my @required_args = qw(tbl explain_sth lb ub);
foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless defined $args{$arg};
}
my ($tbl, $expl_sth, $lb, $ub, $chunk_size, $limit) = @args{@required_args};
my ($tbl, $expl_sth, $lb, $ub) = @args{@required_args};
return 0 if $limit == 0; # no limit, all chunk sizes allowed
my $expl_res;
my $expl;
eval {
MKDEBUG && _d($expl_sth->{Statement});
$expl_sth->execute(@$lb, @$ub);
$expl_res = $expl_sth->fetchrow_hashref();
$expl = $expl_sth->fetchrow_hashref();
$expl_sth->finish();
};
if ( $EVAL_ERROR ) {
# This shouldn't happen.
warn "Failed to " . $expl_sth->{Statement} . ": $EVAL_ERROR\n";
$tbl->{checksum_results}->{errors}++;
return 0; # assume chunk size is ok
}
MKDEBUG && _d('EXPLAIN result:', Dumper($expl_res));
return ($expl_res->{rows} || 0) >= $chunk_size * $limit ? 1 : 0;
MKDEBUG && _d('EXPLAIN plan:', Dumper($expl));
return $expl;
}
sub print_inconsistent_tbls {
@@ -6609,7 +6635,7 @@ L<"--create-replicate-table"> (MAGIC_create_replicate):
tbl char(64) NOT NULL,
chunk int NOT NULL,
chunk_time float NULL,
chunk_index varchar(200) NOT NULL,
chunk_index varchar(200) NULL,
lower_boundary text NOT NULL,
upper_boundary text NOT NULL,
this_crc char(40) NOT NULL,

View File

@@ -45,6 +45,7 @@ $Data::Dumper::Quotekeys = 0;
#
# Optional Arguments:
# chunk_index - Index to use for nibbling
# one_nibble - Allow one-chunk tables (default yes)
#
# Returns:
# NibbleIterator object
@@ -56,136 +57,157 @@ sub new {
}
my ($dbh, $tbl, $chunk_size, $o, $q) = @args{@required_args};
my $one_nibble = !defined $args{one_nibble} || $args{one_nibble}
? _can_nibble_once(%args)
: 0;
# Get an index to nibble by. We'll order rows by the index's columns.
my $index = _find_best_index(%args);
die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index;
my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols};
if ( !$index && !$one_nibble ) {
die "Cannot chunk table $tbl->{db}.$tbl->{tbl} because there is "
. "no good index and the table is oversized.";
}
# Figure out how to nibble the table with the index.
my $asc = $args{TableNibbler}->generate_asc_stmt(
%args,
tbl_struct => $tbl->{tbl_struct},
index => $index,
asc_only => 1,
);
MKDEBUG && _d('Ascend params:', Dumper($asc));
my $self;
if ( $one_nibble ) {
my $tbl_struct = $tbl->{tbl_struct};
my $ignore_col = $o->get('ignore-columns') || {};
my $all_cols = $o->get('columns') || $tbl_struct->{cols};
my @cols = grep { !$ignore_col->{$_} } @$all_cols;
# Make SQL statements, prepared on first call to next(). FROM and
# ORDER BY are the same for all statements. FORCE IDNEX and ORDER BY
# are needed to ensure deterministic nibbling.
my $from = $q->quote(@{$tbl}{qw(db tbl)}) . " FORCE INDEX(`$index`)";
my $order_by = join(', ', map {$q->quote($_)} @{$index_cols});
# If the chunk size is >= number of rows in table, then we don't
# need to chunk; we can just select all rows, in order, at once.
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @cols))
. " FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. ($args{where} ? " AND ($args{where})" : '')
. " /*one nibble*/";
MKDEBUG && _d('One nibble statement:', $nibble_sql);
# These statements are only executed once, so they don't use sths.
my $first_lb_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY $order_by"
. " LIMIT 1"
. " /*first lower boundary*/";
MKDEBUG && _d('First lower boundary statement:', $first_lb_sql);
my $explain_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @cols))
. " FROM " . $q->quote(@{$tbl}{qw(db tbl)})
. ($args{where} ? " AND ($args{where})" : '')
. " /*explain one nibble*/";
MKDEBUG && _d('Explain one nibble statement:', $explain_nibble_sql);
my $last_ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY "
. join(' DESC, ', map {$q->quote($_)} @{$index_cols}) . ' DESC'
. " LIMIT 1"
. " /*last upper boundary*/";
MKDEBUG && _d('Last upper boundary statement:', $last_ub_sql);
$self = {
%args,
one_nibble => 1,
limit => 0,
nibble_sql => $nibble_sql,
explain_nibble_sql => $explain_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
}
else {
my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols};
# Nibbles are inclusive, so for a..z, the nibbles are: a-e, f-j, k-o, p-t,
# u-y, and z. This complicates getting the next upper boundary because
# if we use either (col >= lb AND col < ub) or (col > lb AND col <= ub)
# in nibble_sql (below), then that fails for either the last or first
# nibble respectively. E.g. (col >= z AND col < z) doesn't work, nor
# does (col > a AND col <= e). Hence the fancy LIMIT 2 which returns
# the upper boundary for the current nibble *and* the lower boundary
# for the next nibble. See _next_boundaries().
my $ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='}
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " LIMIT ?, 2"
. " /*upper boundary*/";
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
# Figure out how to nibble the table with the index.
my $asc = $args{TableNibbler}->generate_asc_stmt(
%args,
tbl_struct => $tbl->{tbl_struct},
index => $index,
asc_only => 1,
);
MKDEBUG && _d('Ascend params:', Dumper($asc));
# This statement does the actual nibbling work; its rows are returned
# to the caller via next().
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*nibble*/";
MKDEBUG && _d('Nibble statement:', $nibble_sql);
# Make SQL statements, prepared on first call to next(). FROM and
# ORDER BY are the same for all statements. FORCE IDNEX and ORDER BY
# are needed to ensure deterministic nibbling.
my $from = $q->quote(@{$tbl}{qw(db tbl)}) . " FORCE INDEX(`$index`)";
my $order_by = join(', ', map {$q->quote($_)} @{$index_cols});
my $explain_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*explain nibble*/";
MKDEBUG && _d('Explain nibble statement:', $explain_nibble_sql);
# These statements are only executed once, so they don't use sths.
my $first_lb_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY $order_by"
. " LIMIT 1"
. " /*first lower boundary*/";
MKDEBUG && _d('First lower boundary statement:', $first_lb_sql);
# If the chunk size is >= number of rows in table, then we don't
# need to chunk; we can just select all rows, in order, at once.
my $one_nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*one nibble*/";
MKDEBUG && _d('One nibble statement:', $one_nibble_sql);
my $last_ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. ($args{where} ? " WHERE $args{where}" : '')
. " ORDER BY "
. join(' DESC, ', map {$q->quote($_)} @{$index_cols}) . ' DESC'
. " LIMIT 1"
. " /*last upper boundary*/";
MKDEBUG && _d('Last upper boundary statement:', $last_ub_sql);
my $explain_one_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*explain one nibble*/";
MKDEBUG && _d('Explain one nibble statement:', $explain_one_nibble_sql);
# Nibbles are inclusive, so for a..z, the nibbles are: a-e, f-j, k-o, p-t,
# u-y, and z. This complicates getting the next upper boundary because
# if we use either (col >= lb AND col < ub) or (col > lb AND col <= ub)
# in nibble_sql (below), then that fails for either the last or first
# nibble respectively. E.g. (col >= z AND col < z) doesn't work, nor
# does (col > a AND col <= e). Hence the fancy LIMIT 2 which returns
# the upper boundary for the current nibble *and* the lower boundary
# for the next nibble. See _next_boundaries().
my $ub_sql
= "SELECT /*!40001 SQL_NO_CACHE */ "
. join(', ', map { $q->quote($_) } @{$asc->{scols}})
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='}
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " LIMIT ?, 2"
. " /*upper boundary*/";
MKDEBUG && _d('Upper boundary statement:', $ub_sql);
my $limit = $chunk_size - 1;
MKDEBUG && _d('Initial chunk size (LIMIT):', $limit);
# This statement does the actual nibbling work; its rows are returned
# to the caller via next().
my $nibble_sql
= ($args{dms} ? "$args{dms} " : "SELECT ")
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*nibble*/";
MKDEBUG && _d('Nibble statement:', $nibble_sql);
my $self = {
%args,
asc => $asc,
index => $index,
from => $from,
order_by => $order_by,
limit => $limit,
first_lb_sql => $first_lb_sql,
last_ub_sql => $last_ub_sql,
ub_sql => $ub_sql,
nibble_sql => $nibble_sql,
explain_nibble_sql => $explain_nibble_sql,
one_nibble_sql => $one_nibble_sql,
explain_one_nibble_sql => $explain_one_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
my $explain_nibble_sql
= "EXPLAIN SELECT "
. ($args{select} ? $args{select}
: join(', ', map { $q->quote($_) } @{$asc->{cols}}))
. " FROM $from"
. " WHERE " . $asc->{boundaries}->{'>='} # lower boundary
. " AND " . $asc->{boundaries}->{'<='} # upper boundary
. ($args{where} ? " AND ($args{where})" : '')
. " ORDER BY $order_by"
. " /*explain nibble*/";
MKDEBUG && _d('Explain nibble statement:', $explain_nibble_sql);
my $limit = $chunk_size - 1;
MKDEBUG && _d('Initial chunk size (LIMIT):', $limit);
$self = {
%args,
index => $index,
limit => $limit,
first_lb_sql => $first_lb_sql,
last_ub_sql => $last_ub_sql,
ub_sql => $ub_sql,
nibble_sql => $nibble_sql,
explain_nibble_sql => $explain_nibble_sql,
nibbleno => 0,
have_rows => 0,
rowno => 0,
};
}
return bless $self, $class;
}
@@ -196,7 +218,6 @@ sub next {
# First call, init everything. This could be done in new(), but
# all work is delayed until actually needed.
if ($self->{nibbleno} == 0) {
$self->_can_nibble_once();
$self->_prepare_sths();
$self->_get_bounds();
if ( my $callback = $self->{callbacks}->{init} ) {
@@ -229,11 +250,11 @@ sub next {
$self->{nibble_sth}->execute(@{$self->{lb}}, @{$self->{ub}});
$self->{have_rows} = $self->{nibble_sth}->rows();
}
MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno});
}
# Return rows in this nibble.
if ( $self->{have_rows} ) {
MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno});
# Return rows in nibble. sth->{Active} is always true with
# DBD::mysql v3, so we track the status manually.
my $row = $self->{nibble_sth}->fetchrow_arrayref();
@@ -373,37 +394,28 @@ sub _get_index_cardinality {
return $cardinality;
}
sub _can_nibble_index {
my ($index) = @_;
}
sub _can_nibble_once {
my ($self) = @_;
my ($dbh, $tbl, $tp) = @{$self}{qw(dbh tbl TableParser)};
my (%args) = @_;
my @required_args = qw(dbh tbl chunk_size OptionParser TableParser);
my ($dbh, $tbl, $chunk_size, $o, $tp) = @args{@required_args};
my ($table_status) = $tp->get_table_status($dbh, $tbl->{db}, $tbl->{tbl});
MKDEBUG && _d('TABLE STATUS', Dumper($table_status));
my $n_rows = $table_status->{rows} || 0;
$self->{one_nibble} = $n_rows <= $self->{limit} ? 1 : 0;
MKDEBUG && _d('One nibble:', $self->{one_nibble} ? 'yes' : 'no');
return $self->{one_nibble};
my $limit = $o->get('chunk-size-limit');
my $one_nibble = $n_rows < $chunk_size * $limit ? 1 : 0;
MKDEBUG && _d('One nibble:', $one_nibble ? 'yes' : 'no');
return $one_nibble;
}
sub _prepare_sths {
my ($self) = @_;
MKDEBUG && _d('Preparing statement handles');
if ( $self->{one_nibble} ) {
$self->{nibble_sth} = $self->{dbh}->prepare($self->{one_nibble_sql})
unless $self->{nibble_sth};
$self->{explain_sth} = $self->{dbh}->prepare($self->{explain_one_nibble_sql})
unless $self->{explain_sth};
}
else {
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql})
unless $self->{ub_sth};
$self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql})
unless $self->{nibble_sth};
$self->{explain_sth} = $self->{dbh}->prepare($self->{explain_nibble_sql})
unless $self->{explain_sth};
if ( !$self->{one_nibble} ) {
$self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql});
}
$self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql});
$self->{explain_sth} = $self->{dbh}->prepare($self->{explain_nibble_sql});
return;
}
sub _get_bounds {

View File

@@ -65,37 +65,7 @@ sub make_row_checksum {
my $q = $self->{Quoter};
my $tbl_struct = $tbl->{tbl_struct};
my $func = $args{func} || uc($o->get('function'));
my $trim = $o->get('trim');
my $float_precision = $o->get('float-precision');
my $sep = $o->get('separator') || '#';
$sep =~ s/'//g;
$sep ||= '#';
my $ignore_col = $o->get('ignore-columns') || {};
my $all_cols = $o->get('columns') || $tbl_struct->{cols};
my %cols = map { lc($_) => 1 } grep { !$ignore_col->{$_} } @$all_cols;
my %seen;
my @cols =
map {
my $type = $tbl_struct->{type_for}->{$_};
my $result = $q->quote($_);
if ( $type eq 'timestamp' ) {
$result .= ' + 0';
}
elsif ( $float_precision && $type =~ m/float|double/ ) {
$result = "ROUND($result, $float_precision)";
}
elsif ( $trim && $type =~ m/varchar/ ) {
$result = "TRIM($result)";
}
$result;
}
grep {
$cols{$_} && !$seen{$_}++
}
@{$tbl_struct->{cols}};
my $cols = $self->get_checksum_columns(%args);
# Prepend columns to query, resulting in "col1, col2, FUNC(..col1, col2...)",
# unless caller says not to. The only caller that says not to is
@@ -118,29 +88,33 @@ sub make_row_checksum {
$col .= " AS $real_col";
}
$col;
} @cols)
} @{$cols->{select}})
. ', ';
}
if ( uc $func ne 'FNV_64' && uc $func ne 'FNV1A_64' ) {
my $sep = $o->get('separator') || '#';
$sep =~ s/'//g;
$sep ||= '#';
# Add a bitmap of which nullable columns are NULL.
my @nulls = grep { $cols{$_} } @{$tbl_struct->{null_cols}};
my @nulls = grep { $cols->{allowed}->{$_} } @{$tbl_struct->{null_cols}};
if ( @nulls ) {
my $bitmap = "CONCAT("
. join(', ', map { 'ISNULL(' . $q->quote($_) . ')' } @nulls)
. ")";
push @cols, $bitmap;
push @{$cols->{select}}, $bitmap;
}
$query .= @cols > 1
? "$func(CONCAT_WS('$sep', " . join(', ', @cols) . '))'
: "$func($cols[0])";
$query .= @{$cols->{select}} > 1
? "$func(CONCAT_WS('$sep', " . join(', ', @{$cols->{select}}) . '))'
: "$func($cols->{select}->[0])";
}
else {
# As a special case, FNV1A_64/FNV_64 doesn't need its arguments
# concatenated, and doesn't need a bitmap of NULLs.
my $fnv_func = uc $func;
$query .= "$fnv_func(" . join(', ', @cols) . ')';
$query .= "$fnv_func(" . join(', ', @{$cols->{select}}) . ')';
}
MKDEBUG && _d('Row checksum:', $query);
@@ -214,6 +188,50 @@ sub make_chunk_checksum {
return $select;
}
sub get_checksum_columns {
my ($self, %args) = @_;
my @required_args = qw(tbl);
foreach my $arg( @required_args ) {
die "I need a $arg argument" unless $args{$arg};
}
my ($tbl) = @args{@required_args};
my $o = $self->{OptionParser};
my $q = $self->{Quoter};
my $trim = $o->get('trim');
my $float_precision = $o->get('float-precision');
my $tbl_struct = $tbl->{tbl_struct};
my $ignore_col = $o->get('ignore-columns') || {};
my $all_cols = $o->get('columns') || $tbl_struct->{cols};
my %cols = map { lc($_) => 1 } grep { !$ignore_col->{$_} } @$all_cols;
my %seen;
my @cols =
map {
my $type = $tbl_struct->{type_for}->{$_};
my $result = $q->quote($_);
if ( $type eq 'timestamp' ) {
$result .= ' + 0';
}
elsif ( $float_precision && $type =~ m/float|double/ ) {
$result = "ROUND($result, $float_precision)";
}
elsif ( $trim && $type =~ m/varchar/ ) {
$result = "TRIM($result)";
}
$result;
}
grep {
$cols{$_} && !$seen{$_}++
}
@{$tbl_struct->{cols}};
return {
select => \@cols,
allowed => \%cols,
};
}
sub get_crc_args {
my ($self, %args) = @_;
my $func = $args{func} || $self->_get_hash_func(%args);

View File

@@ -38,7 +38,7 @@ if ( !$dbh ) {
plan skip_all => 'Cannot connect to sandbox master';
}
else {
plan tests => 25;
plan tests => 26;
}
my $q = new Quoter();
@@ -82,6 +82,7 @@ sub make_nibble_iter {
chunk_size => $o->get('chunk-size'),
callbacks => $args{callbacks},
select => $args{select},
one_nibble => $args{one_nibble},
%common_modules,
);
@@ -365,7 +366,8 @@ SKIP: {
push @expl, $expl_sth->fetchrow_hashref();
return 0;
},
}
},
one_nibble => 0,
);
$ni->next();
$ni->next();
@@ -518,6 +520,28 @@ throws_ok(
'Detects infinite loop'
);
# ############################################################################
# Nibble small tables without indexes.
# ############################################################################
$ni = make_nibble_iter(
sql_file => "a-z.sql",
db => 'test',
tbl => 't',
argv => [qw(--databases test --chunk-size 100)],
);
$dbh->do('alter table test.t drop index c');
@rows = ();
while (my $row = $ni->next()) {
push @rows, @$row;
}
is_deeply(
\@rows,
[ ('a'..'z') ],
"Nibble small table without indexes"
);
# #############################################################################
# Done.
# #############################################################################

View File

@@ -269,7 +269,7 @@ $o->get_opts();
# ############################################################################
# make_chunk_checksum
# ############################################################################
@ARGV = qw(--columns film_id --no-optimize-xor);
@ARGV = qw(--columns film_id);
$o->get_opts();
is(
$c->make_chunk_checksum(