diff --git a/lib/NibbleIterator.pm b/lib/NibbleIterator.pm new file mode 100644 index 00000000..87610130 --- /dev/null +++ b/lib/NibbleIterator.pm @@ -0,0 +1,261 @@ +# This program is copyright 2011 Percona Inc. +# Feedback and improvements are welcome. +# +# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED +# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF +# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar +# systems, you can issue `man perlgpl' or `man perlartistic' to read these +# licenses. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 59 Temple +# Place, Suite 330, Boston, MA 02111-1307 USA. +# ########################################################################### +# NibbleIterator package +# ########################################################################### +{ +# Package: NibbleIterator +# NibbleIterator nibbles tables. +package NibbleIterator; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use constant MKDEBUG => $ENV{MKDEBUG} || 0; + +use Data::Dumper; +$Data::Dumper::Indent = 1; +$Data::Dumper::Sortkeys = 1; +$Data::Dumper::Quotekeys = 0; + +sub new { + my ( $class, %args ) = @_; + my @required_args = qw(dbh tbl OptionParser Quoter TableNibbler TableParser); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my ($dbh, $tbl, $o, $q) = @args{@required_args}; + + # Get an index to nibble by. We'll order rows by the index's columns. + my $index = $args{TableParser}->find_best_index( + $tbl->{tbl_struct}, + $o->get('chunk-index'), + ); + die "No index to nibble table $tbl->{db}.$tbl->{tbl}" unless $index; + my $index_cols = $tbl->{tbl_struct}->{keys}->{$index}->{cols}; + + # Figure out how to nibble the table with the index. + my $asc = $args{TableNibbler}->generate_asc_stmt( + %args, + tbl_struct => $tbl->{tbl_struct}, + index => $index, + asc_only => 1, + ); + + # Make SQL statements, prepared on first call to next(). The preamble + # and ORDER BY are the same for all statements. FORCE IDNEX and ORDER BY + # are needed to ensure deterministic nibbling. + my $nibble_sql_preamble + = "SELECT /*!40001 SQL_NO_CACHE */ " + . join(', ', map { $q->quote($_) } @{$asc->{cols}}) + . " FROM " . $q->quote(@{$tbl}{qw(db tbl)}) + . " FORCE INDEX(`$index`)"; + my $order_by = "ORDER BY " . join(', ', map {$q->quote($_)} @{$index_cols}); + + # This statement is only executed once, so it doesn't use a sth. + my $first_lb_sql + = $nibble_sql_preamble + . ($args{where} ? " WHERE $args{where}" : '') + . " $order_by " + . " LIMIT 1" + . " /*first lower boundary*/"; + MKDEBUG && _d('First lower boundary statement:', $first_lb_sql); + + # Nibbles are inclusive, so for a..z, the nibbles are: a-e, f-j, k-o, p-t, + # u-y, and z. This complicates getting the next upper boundary because + # if we use either (col >= lb AND col < ub) or (col > lb AND col <= ub) + # in nibble_sql (below), then that fails for either the last or first + # nibble respectively. E.g. (col >= z AND col < z) doesn't work, nor + # does (col > a AND col <= e). Hence the fancy LIMIT 2 which returns + # the upper boundary for the current nibble *and* the lower boundary + # for the next nibble. See _next_boundaries(). + my $ub_sql + = $nibble_sql_preamble + . " WHERE (" . $asc->{boundaries}->{'>='} . ")" # lower boundary + . ($args{where} ? " AND ($args{where})" : '') + . " $order_by " + . " LIMIT 2 OFFSET " . (($o->get('chunk-size') || 1) - 1) + . " /*upper boundary*/"; + MKDEBUG && _d('Next upper boundary statement:', $ub_sql); + + my $nibble_sql + = $nibble_sql_preamble + . " WHERE (" . $asc->{boundaries}->{'>='} . ")" # lower boundary + . " AND (" . $asc->{boundaries}->{'<='} . ")" # upper boundary + . ($args{where} ? " AND ($args{where})" : '') + . " $order_by" + . " /*nibble*/"; + MKDEBUG && _d('Nibble statement:', $nibble_sql); + + my $self = { + %args, + asc => $asc, + first_lb_sql => $first_lb_sql, + ub_sql => $ub_sql, + nibble_sql => $nibble_sql, + nibbleno => 0, + have_rows => 0, + rowno => 0, + }; + + return bless $self, $class; +} + +sub next { + my ($self) = @_; + + # First call, init everything. This could be done in new(), but + # all work is delayed until actually needed. + if ($self->{nibbleno} == 0) { + $self->_prepare_sths(); + $self->_get_first_lb(); + # $self->_check_index_usage(); + } + + # Return rows in nibble. sth->{Active} is always true with DBD::mysql v3, + # so we track the status manually. have_rows will be true if a previous + # call got a nibble with rows. When there's no more rows in this nibble, + # try to get the next nibble. + if ( $self->{have_rows} ) { + my $row = $self->{nibble_sth}->fetchrow_arrayref(); + if ( $row ) { + $self->{rowno}++; + MKDEBUG && _d('Row', $self->{rowno}, 'in nibble', $self->{nibbleno}); + # fetchrow_arraryref re-uses its internal arrayref, so we must copy. + return [ @$row ]; + } + MKDEBUG && _d('No more rowso in nibble', $self->{nibbleno}); + $self->{rowno} = 0; + $self->{have_rows} = 0; + } + + # If there's another boundary, fetch the rows within it. + if ( $self->_next_boundaries() ) { + MKDEBUG && _d($self->{nibble_sth}->{Statement}, 'params:', + join(', ', (@{$self->{lb}}, @{$self->{ub}}))); + $self->{nibble_sth}->execute(@{$self->{lb}}, @{$self->{ub}}); + $self->{have_rows} = $self->{nibble_sth}->rows(); + if ( $self->{have_rows} ) { + $self->{nibbleno}++; + MKDEBUG && _d($self->{have_rows}, 'rows in nibble', $self->{nibbleno}); + return $self->next(); + } + } + + MKDEBUG && _d('Done nibbling'); + return; +} + +sub _prepare_sths { + my ($self) = @_; + MKDEBUG && _d('Preparing statement handles'); + $self->{ub_sth} = $self->{dbh}->prepare($self->{ub_sql}); + $self->{nibble_sth} = $self->{dbh}->prepare($self->{nibble_sql}); +} + +sub _get_first_lb { + my ($self) = @_; + $self->{next_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql}); + MKDEBUG && _d('First lower boundary:', Dumper($self->{lb})); + return; +} + +sub _check_index_usage { + my ($self) = @_; + my ($dbh, $tbl, $q) = @{$self}{qw(dbh tbl Quoter)}; + + my $table_status; + eval { + my $sql = "SHOW TABLE STATUS FROM " . $q->quote($tbl->{db}) + . " LIKE " . $q->literal_like($tbl->{tbl}); + MKDEBUG && _d($sql); + $table_status = $dbh->selectrow_hashref($sql); + }; + MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR); + + my $small_table; + if ( $table_status ) { + my $n_rows = defined $table_status->{Rows} ? $table_status->{Rows} + : defined $table_status->{rows} ? $table_status->{rows} + : undef; + $small_table = 1 if defined $n_rows && $n_rows <= 100; + } + MKDEBUG && _d('Small table:', $small_table); + + if ( !$small_table ) { + my $explain; + eval { + $explain = $dbh->selectall_arrayref("", {Slice => {}}); + }; + if ( $EVAL_ERROR ) { + MKDEBUG && _d($EVAL_ERROR); + return; + } + MKDEBUG && _d('EXPLAIN key:', $explain->[0]->{key}); + my $explain_index = lc($explain->[0]->{key} || ''); + if ( $explain_index ne lc($self->{asc}->{index}) ) { + die "Cannot nibble table $tbl->{db}.$tbl->{tbl} because MySQL chose " + . ($explain_index ? "the `$explain_index`" : 'no') . ' index' + . " instead of the `$self->{asc}->{index}` index"; + } + } + + return; +} + +sub _next_boundaries { + my ($self) = @_; + + if ( $self->{no_more_boundaries} ) { + MKDEBUG && _d('No more boundaries'); + return; + } + + $self->{lb} = $self->{next_lb}; + + MKDEBUG && _d($self->{ub_sth}->{Statement}, 'params:', + join(', ', @{$self->{lb}})); + $self->{ub_sth}->execute(@{$self->{lb}}); + my $boundary = $self->{ub_sth}->fetchall_arrayref(); + if ( $boundary && @$boundary ) { + $self->{ub} = $boundary->[0]; # this nibble + $self->{next_lb} = $boundary->[1]; # next nibble + $self->{ub_sth}->finish(); + MKDEBUG && _d('Next upper boundary:', Dumper($self->{ub})); + } + else { + $self->{no_more_boundaries} = 1; # for next call + $self->{ub} = $self->{lb}; + MKDEBUG && _d('Last upper boundary:', Dumper($self->{ub})); + } + + return 1; # have boundaries +} + +sub _d { + my ($package, undef, $line) = caller 0; + @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } + map { defined $_ ? $_ : 'undef' } + @_; + print STDERR "# $package:$line $PID ", join(' ', @_), "\n"; +} + +1; +} +# ########################################################################### +# End NibbleIterator package +# ########################################################################### diff --git a/lib/SchemaIterator.pm b/lib/SchemaIterator.pm index 2d85ec89..33da4e6b 100644 --- a/lib/SchemaIterator.pm +++ b/lib/SchemaIterator.pm @@ -209,9 +209,9 @@ sub next_schema_object { if ( my $schema = $self->{Schema} ) { $schema->add_schema_object($schema_obj); } + MKDEBUG && _d('Next schema object:', $schema_obj->{db}, $schema_obj->{tbl}); } - MKDEBUG && _d('Next schema object:', $schema_obj->{db}, $schema_obj->{tbl}); return $schema_obj; } diff --git a/lib/TableNibbler.pm b/lib/TableNibbler.pm index 43112120..21fba668 100644 --- a/lib/TableNibbler.pm +++ b/lib/TableNibbler.pm @@ -65,28 +65,26 @@ sub generate_asc_stmt { die "I need a $arg argument" unless defined $args{$arg}; } my ($tbl_struct, $index) = @args{@required_args}; - my @cols = $args{cols} ? @{$args{cols}} : @{$tbl_struct->{cols}}; + my @cols = $args{cols} ? @{$args{cols}} : @{$tbl_struct->{cols}}; my $q = $self->{Quoter}; # This shouldn't happen. TableSyncNibble shouldn't call us with # a nonexistent index. die "Index '$index' does not exist in table" unless exists $tbl_struct->{keys}->{$index}; - - my @asc_cols = @{$tbl_struct->{keys}->{$index}->{cols}}; - my @asc_slice; + MKDEBUG && _d('Will ascend index', $index); # These are the columns we'll ascend. - @asc_cols = @{$tbl_struct->{keys}->{$index}->{cols}}; - MKDEBUG && _d('Will ascend index', $index); - MKDEBUG && _d('Will ascend columns', join(', ', @asc_cols)); + my @asc_cols = @{$tbl_struct->{keys}->{$index}->{cols}}; if ( $args{asc_first} ) { @asc_cols = $asc_cols[0]; MKDEBUG && _d('Ascending only first column'); } + MKDEBUG && _d('Will ascend columns', join(', ', @asc_cols)); # We found the columns by name, now find their positions for use as # array slices, and make sure they are included in the SELECT list. + my @asc_slice; my %col_posn = do { my $i = 0; map { $_ => $i++ } @cols }; foreach my $col ( @asc_cols ) { if ( !exists $col_posn{$col} ) { diff --git a/t/lib/NibbleIterator.t b/t/lib/NibbleIterator.t new file mode 100644 index 00000000..025f21f9 --- /dev/null +++ b/t/lib/NibbleIterator.t @@ -0,0 +1,162 @@ +#!/usr/bin/perl + +BEGIN { + die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n" + unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH}; + unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib"; +}; + +use strict; +use warnings FATAL => 'all'; +use English qw(-no_match_vars); +use Test::More; + +use Schema; +use SchemaIterator; +use Quoter; +use DSNParser; +use Sandbox; +use OptionParser; +use MySQLDump; +use TableParser; +use TableNibbler; +use NibbleIterator; +use PerconaTest; + +use constant MKDEBUG => $ENV{MKDEBUG} || 0; + +use Data::Dumper; +$Data::Dumper::Indent = 1; +$Data::Dumper::Sortkeys = 1; +$Data::Dumper::Quotekeys = 0; + +my $dp = new DSNParser(opts=>$dsn_opts); +my $sb = new Sandbox(basedir => '/tmp', DSNParser => $dp); +my $dbh = $sb->get_dbh_for('master'); + +if ( !$dbh ) { + plan skip_all => 'Cannot connect to sandbox master'; +} +else { + plan tests => 6; +} + + +my $q = new Quoter(); +my $tp = new TableParser(Quoter=>$q); +my $du = new MySQLDump(); +my $nb = new TableNibbler(TableParser=>$tp, Quoter=>$q); +my $o = new OptionParser(description => 'NibbleIterator'); + +$o->get_specs("$trunk/bin/pt-table-checksum"); + +my %common_modules = ( + Quoter => $q, + TableParser => $tp, + MySQLDump => $du, + TableNibbler => $nb, + OptionParser => $o, +); +my $in = "/t/lib/samples/NibbleIterator/"; + +sub make_nibble_iter { + my (%args) = @_; + + if (my $file = $args{sql_file}) { + $sb->load_file('master', "$in/$file"); + } + + @ARGV = $args{argv} ? @{$args{argv}} : (); + $o->get_opts(); + + my $schema = new Schema(); + my $si = new SchemaIterator( + dbh => $dbh, + keep_ddl => 1, + Schema => $schema, + %common_modules, + ); + 1 while $si->next_schema_object(); + + my $ni = new NibbleIterator( + dbh => $dbh, + tbl => $schema->get_table($args{db}, $args{tbl}), + %common_modules, + ); + + return $ni; +} + +my $ni = make_nibble_iter( + sql_file => "a-z.sql", + db => 'test', + tbl => 't', + argv => [qw(--databases test --chunk-size 5)], +); + +my @rows = (); +for (1..5) { + push @rows, $ni->next(); +} +is_deeply( + \@rows, + [['a'],['b'],['c'],['d'],['e']], + 'a-z nibble 1' +) or print Dumper(\@rows); + +@rows = (); +for (1..5) { + push @rows, $ni->next(); +} +is_deeply( + \@rows, + [['f'],['g'],['h'],['i'],['j']], + 'a-z nibble 2' +) or print Dumper(\@rows); + +@rows = (); +for (1..5) { + push @rows, $ni->next(); +} +is_deeply( + \@rows, + [['k'],['l'],['m'],['n'],['o']], + 'a-z nibble 3' +) or print Dumper(\@rows); + +@rows = (); +for (1..5) { + push @rows, $ni->next(); +} +is_deeply( + \@rows, + [['p'],['q'],['r'],['s'],['t']], + 'a-z nibble 4' +) or print Dumper(\@rows); + +@rows = (); +for (1..5) { + push @rows, $ni->next(); +} +is_deeply( + \@rows, + [['u'],['v'],['w'],['x'],['y']], + 'a-z nibble 5' +) or print Dumper(\@rows); + +# There's only 1 row left but extra calls shouldn't return anything or crash. +@rows = (); +for (1..5) { + push @rows, $ni->next(); +} +is_deeply( + \@rows, + [['z']], + 'a-z nibble 6' +) or print Dumper(\@rows); + +# ############################################################################# +# Done. +# ############################################################################# +$sb->wipe_clean($dbh); +exit; diff --git a/t/lib/samples/NibbleIterator/a-z.sql b/t/lib/samples/NibbleIterator/a-z.sql new file mode 100644 index 00000000..e7e485e4 --- /dev/null +++ b/t/lib/samples/NibbleIterator/a-z.sql @@ -0,0 +1,10 @@ +DROP DATABASE IF EXISTS test; +CREATE DATABASE test; +USE test; + +CREATE TABLE t ( + c varchar(16) not null, + index (c) +); + +INSERT INTO t VALUES ('a'), ('b'), ('c'), ('d'), ('e'), ('f'), ('g'), ('h'), ('i'), ('j'), ('k'), ('l'), ('m'), ('n'), ('o'), ('p'), ('q'), ('r'), ('s'), ('t'), ('u'), ('v'), ('w'), ('x'), ('y'), ('z');