From 929000cfddf63b7b4e5356741d58cc94e9a8c082 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Wed, 28 Sep 2011 11:33:12 -0600 Subject: [PATCH] Partial --resume implementation (work in progress). --- bin/pt-table-checksum | 155 ++++++++++++++++++++++++++++++++++++++--- lib/NibbleIterator.pm | 10 ++- t/lib/NibbleIterator.t | 2 +- 3 files changed, 154 insertions(+), 13 deletions(-) diff --git a/bin/pt-table-checksum b/bin/pt-table-checksum index 22ea0ecf..0cbd999c 100755 --- a/bin/pt-table-checksum +++ b/bin/pt-table-checksum @@ -3552,6 +3552,14 @@ sub nibble_number { return $self->{nibbleno}; } +sub set_nibble_number { + my ($self, $n) = @_; + die "I need a number" unless $n; + $self->{nibbleno} = $n; + MKDEBUG && _d('Set new nibble number:', $n); + return; +} + sub nibble_index { my ($self) = @_; return $self->{index}; @@ -3570,10 +3578,9 @@ sub statements { sub boundaries { my ($self) = @_; return { - first_lower => $self->{first_lb}, lower => $self->{lb}, - next_lower => $self->{next_lb}, upper => $self->{ub}, + next_lower => $self->{next_lb}, last_upper => $self->{last_ub}, }; } @@ -3591,9 +3598,9 @@ sub chunk_size { sub set_chunk_size { my ($self, $limit) = @_; return if $self->{one_nibble}; - MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit); die "Chunk size must be > 0" unless $limit; $self->{limit} = $limit - 1; + MKDEBUG && _d('Set new chunk size (LIMIT):', $limit); return; } @@ -3701,8 +3708,7 @@ sub _get_bounds { my ($self) = @_; return if $self->{one_nibble}; - $self->{first_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql}); - $self->{next_lb} = $self->{first_lb}; + $self->{next_lb} = $self->{dbh}->selectrow_arrayref($self->{first_lb_sql}); MKDEBUG && _d('First lower boundary:', Dumper($self->{next_lb})); $self->{last_ub} = $self->{dbh}->selectrow_arrayref($self->{last_ub_sql}); @@ -4051,8 +4057,17 @@ sub new { die "I need either a dbh or file_itr argument" if (!$dbh && !$file_itr) || ($dbh && $file_itr); + my %resume; + if ( my $table = $args{resume} ) { + MKDEBUG && _d('Will resume from', $table); + my ($db, $tbl) = $args{Quoter}->split_unquote($table); + $resume{db} = $db; + $resume{tbl} = $tbl; + } + my $self = { %args, + resume => \%resume, filters => _make_filters(%args), }; @@ -4113,7 +4128,7 @@ sub _make_filters { return \%filters; } -sub next_schema_object { +sub next { my ( $self ) = @_; my $schema_obj; @@ -4163,7 +4178,8 @@ sub _iterate_files { my $db = $1; # XXX $db =~ s/^`//; # strip leading ` $db =~ s/`$//; # and trailing ` - if ( $self->database_is_allowed($db) ) { + if ( $self->database_is_allowed($db) + && $self->_resume_from_database($db) ) { $self->{db} = $db; } } @@ -4176,7 +4192,8 @@ sub _iterate_files { my ($tbl) = $chunk =~ m/$tbl_name/; $tbl =~ s/^\s*`//; $tbl =~ s/`\s*$//; - if ( $self->table_is_allowed($self->{db}, $tbl) ) { + if ( $self->table_is_allowed($self->{db}, $tbl) + && $self->_resume_from_table($tbl) ) { my ($ddl) = $chunk =~ m/^(?:$open_comment)?(CREATE TABLE.+?;)$/ms; if ( !$ddl ) { warn "Failed to parse CREATE TABLE from\n" . $chunk; @@ -4220,7 +4237,9 @@ sub _iterate_dbh { } if ( !$self->{db} ) { - $self->{db} = shift @{$self->{dbs}}; + do { + $self->{db} = shift @{$self->{dbs}}; + } until $self->_resume_from_database($self->{db}); MKDEBUG && _d('Next database:', $self->{db}); return unless $self->{db}; } @@ -4242,6 +4261,7 @@ sub _iterate_dbh { } while ( my $tbl = shift @{$self->{tbls}} ) { + next unless $self->_resume_from_table($tbl); my $engine; if ( $self->{filters}->{'engines'} || $self->{filters}->{'ignore-engines'} ) { @@ -4386,6 +4406,34 @@ sub engine_is_allowed { return 1; } +sub _resume_from_database { + my ($self, $db) = @_; + + return 1 unless $self->{resume}->{db}; + + if ( $db eq $self->{resume}->{db} ) { + MKDEBUG && _d('At resume db', $db); + delete $self->{resume}->{db}; + return 1; + } + + return 0; +} + +sub _resume_from_table { + my ($self, $tbl) = @_; + + return 1 unless $self->{resume}->{tbl}; + + if ( $tbl eq $self->{resume}->{tbl} ) { + MKDEBUG && _d('At resume table', $tbl); + delete $self->{resume}->{tbl}; + return 1; + } + + return 0; +} + sub _d { my ($package, undef, $line) = caller 0; @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; } @@ -5402,6 +5450,18 @@ sub main { my $total_rate = 0; my $limit = $o->get('chunk-size-limit'); + # ######################################################################## + # Resume + # ######################################################################## + my $last_chunk; + if ( $o->get('resume') ) { + $last_chunk = last_chunk( + dbh => $dbh, + repl_table => $repl_table, + Quoter => $q, + ); + } + # ######################################################################## # Callbacks for each table's nibble iterator. All checksum work is done # in these callbacks and the subs that they call. @@ -5410,11 +5470,38 @@ sub main { init => sub { my (%args) = @_; my $tbl = $args{tbl}; + + my $oktonibble = 1; + if ( $o->get('empty-replicate-table') ) { MKDEBUG && _d($delete_sth->{Statement}); $delete_sth->execute($tbl->{db}, $tbl->{tbl}); } - return 1; # continue nibbling + elsif ( $last_chunk ) { + my $nibble_iter = $args{NibbleIterator}; + + my $next_lb = next_lower_boundary( + %args, + last_chunk => $last_chunk, + ); + if ( !$next_lb ) { + # TODO: this will print this table which was already done. + # Need some way to not print table; $tbl->{dont_print}=1? + MKDEBUG && _d('Resuming from last chunk in table;', + 'getting next table'); + $oktonibble = 0; + } + else { + MKDEBUG && _d('Resuming from chunk', $last_chunk->{chunk}); + $nibble_iter->boundaries()->{next_lower} = $next_lb; + $nibble_iter->set_nibble_number($last_chunk->{chunk}); + } + + # Just need to call us once to kick-start the resume process. + $last_chunk = undef; + } + + return $oktonibble; # continue nibbling? }, next_boundaries => sub { my (%args) = @_; @@ -5603,13 +5690,14 @@ sub main { # ######################################################################## my $schema_iter = new SchemaIterator( dbh => $dbh, + resume => $last_chunk ? $q->quote(@{$last_chunk}{qw(db tbl)}) : "", OptionParser => $o, TableParser => $tp, Quoter => $q, ); TABLE: - while ( $oktorun && (my $tbl = $schema_iter->next_schema_object()) ) { + while ( $oktorun && (my $tbl = $schema_iter->next()) ) { eval { # Results, stats, and info related to checksuming this table can # be saved here. print_checksum_results() uses this info. @@ -6079,6 +6167,51 @@ sub table_progress { return $pr; } +sub last_chunk { + my (%args) = @_; + my @required_args = qw(dbh repl_table Quoter); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + my ($dbh, $repl_table, $q) = @args{@required_args}; + + my $sql = "SELECT MAX(ts) FROM $repl_table"; + MKDEBUG && _d($sql); + my ($max_ts) = $dbh->selectrow_array($sql); + if ( !$max_ts ) { + MKDEBUG && _d('Replicate table is empty; will not resume'); + return; + } + + $sql = "SELECT * FROM $repl_table " + . "WHERE ts='$max_ts' " + . "ORDER BY db DESC, tbl DESC, chunk DESC LIMIT 1"; + MKDEBUG && _d($sql); + my $last_chunk = $dbh->selectrow_hashref($sql); + MKDEBUG && _d('Last chunk:', Dumper($last_chunk)); + + $sql = "SELECT MAX(chunk) FROM $repl_table " + . "WHERE db=" . $q->quote($last_chunk->{db}) + . " AND tbl=" . $q->quote($last_chunk->{tbl}); + MKDEBUG && _d($sql); + my ($max_chunk) = $dbh->selectrow_array($sql); + if ( ($last_chunk->{chunk} || 0) ne ($max_chunk || 0) ) { + warn "Not resuming from max chunk (" . ($last_chunk->{chunk} || 0) + . " != " . ($max_chunk || 0) > "); resuming may not work correctly.\n"; + } + + return $last_chunk; +} + +sub next_lower_boundary { + my (%args) = @_; + my @required_args = qw(); + foreach my $arg ( @required_args ) { + die "I need a $arg argument" unless $args{$arg}; + } + return; +} + # Catches signals so we can exit gracefully. sub sig_int { my ( $signal ) = @_; diff --git a/lib/NibbleIterator.pm b/lib/NibbleIterator.pm index 5b506df6..403eb4d2 100644 --- a/lib/NibbleIterator.pm +++ b/lib/NibbleIterator.pm @@ -287,6 +287,14 @@ sub nibble_number { return $self->{nibbleno}; } +sub set_nibble_number { + my ($self, $n) = @_; + die "I need a number" unless $n; + $self->{nibbleno} = $n; + MKDEBUG && _d('Set new nibble number:', $n); + return; +} + sub nibble_index { my ($self) = @_; return $self->{index}; @@ -325,9 +333,9 @@ sub chunk_size { sub set_chunk_size { my ($self, $limit) = @_; return if $self->{one_nibble}; - MKDEBUG && _d('Setting new chunk size (LIMIT):', $limit); die "Chunk size must be > 0" unless $limit; $self->{limit} = $limit - 1; + MKDEBUG && _d('Set new chunk size (LIMIT):', $limit); return; } diff --git a/t/lib/NibbleIterator.t b/t/lib/NibbleIterator.t index 335ade4f..36314edd 100644 --- a/t/lib/NibbleIterator.t +++ b/t/lib/NibbleIterator.t @@ -74,7 +74,7 @@ sub make_nibble_iter { Schema => $schema, %common_modules, ); - 1 while $si->next_schema_object(); + 1 while $si->next(); my $ni = new NibbleIterator( dbh => $dbh,