Added pause to NibbleIterator

This commit is contained in:
Carlos Salguero
2016-11-01 19:05:49 -03:00
parent 340ecfdc00
commit c91350f2eb
4 changed files with 110 additions and 0 deletions

View File

@@ -5529,6 +5529,8 @@ sub new {
$self->{have_rows} = 0; $self->{have_rows} = 0;
$self->{rowno} = 0; $self->{rowno} = 0;
$self->{oktonibble} = 1; $self->{oktonibble} = 1;
$self->{pause_file} = $nibble_params->{pause_file};
$self->{sleep} = $args{sleep} || 60;
return bless $self, $class; return bless $self, $class;
} }
@@ -5567,6 +5569,24 @@ sub next {
NIBBLE: NIBBLE:
while ( $self->{have_rows} || $self->_next_boundaries() ) { while ( $self->{have_rows} || $self->_next_boundaries() ) {
if ($self->{pause_file}) {
while(-f $self->{pause_file}) {
print "Sleeping $self->{sleep} seconds because $self->{pause_file} exists\n";
my $dbh = $self->{Cxn}->dbh();
if ( !$dbh || !$dbh->ping() ) {
eval { $dbh = $self->{Cxn}->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
chomp $EVAL_ERROR;
die "Lost connection to " . $self->{Cxn}->name() . " while waiting for "
. "replica lag ($EVAL_ERROR)\n";
}
}
$dbh->do("SELECT 'nibble iterator keepalive'");
sleep($self->{sleep});
}
}
if ( !$self->{have_rows} ) { if ( !$self->{have_rows} ) {
$self->{nibbleno}++; $self->{nibbleno}++;
PTDEBUG && _d('Nibble:', $self->{nibble_sth}->{Statement}, 'params:', PTDEBUG && _d('Nibble:', $self->{nibble_sth}->{Statement}, 'params:',
@@ -5596,6 +5616,7 @@ sub next {
} }
$self->{rowno} = 0; $self->{rowno} = 0;
$self->{have_rows} = 0; $self->{have_rows} = 0;
} }
PTDEBUG && _d('Done nibbling'); PTDEBUG && _d('Done nibbling');
@@ -5731,10 +5752,13 @@ sub can_nibble {
die "There is no good index and the table is oversized."; die "There is no good index and the table is oversized.";
} }
my $pause_file = ($o->has('pause-file') && $o->get('pause-file')) || undef;
return { return {
row_est => $row_est, # nibble about this many rows row_est => $row_est, # nibble about this many rows
index => $index, # using this index index => $index, # using this index
one_nibble => $one_nibble, # if the table fits in one nibble/chunk one_nibble => $one_nibble, # if the table fits in one nibble/chunk
pause_file => $pause_file,
}; };
} }
@@ -11723,6 +11747,12 @@ short form: -p; type: string
Password to use when connecting. Password to use when connecting.
If password contains commas they must be escaped with a backslash: "exam\,ple" If password contains commas they must be escaped with a backslash: "exam\,ple"
=item --pause-file
type: string
Execution will be paused while the file specified by this param exists.
=item --pid =item --pid
type: string type: string

View File

@@ -6336,6 +6336,8 @@ sub new {
$self->{have_rows} = 0; $self->{have_rows} = 0;
$self->{rowno} = 0; $self->{rowno} = 0;
$self->{oktonibble} = 1; $self->{oktonibble} = 1;
$self->{pause_file} = $nibble_params->{pause_file};
$self->{sleep} = $args{sleep} || 60;
return bless $self, $class; return bless $self, $class;
} }
@@ -6374,6 +6376,24 @@ sub next {
NIBBLE: NIBBLE:
while ( $self->{have_rows} || $self->_next_boundaries() ) { while ( $self->{have_rows} || $self->_next_boundaries() ) {
if ($self->{pause_file}) {
while(-f $self->{pause_file}) {
print "Sleeping $self->{sleep} seconds because $self->{pause_file} exists\n";
my $dbh = $self->{Cxn}->dbh();
if ( !$dbh || !$dbh->ping() ) {
eval { $dbh = $self->{Cxn}->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
chomp $EVAL_ERROR;
die "Lost connection to " . $self->{Cxn}->name() . " while waiting for "
. "replica lag ($EVAL_ERROR)\n";
}
}
$dbh->do("SELECT 'nibble iterator keepalive'");
sleep($self->{sleep});
}
}
if ( !$self->{have_rows} ) { if ( !$self->{have_rows} ) {
$self->{nibbleno}++; $self->{nibbleno}++;
PTDEBUG && _d('Nibble:', $self->{nibble_sth}->{Statement}, 'params:', PTDEBUG && _d('Nibble:', $self->{nibble_sth}->{Statement}, 'params:',
@@ -6403,6 +6423,7 @@ sub next {
} }
$self->{rowno} = 0; $self->{rowno} = 0;
$self->{have_rows} = 0; $self->{have_rows} = 0;
} }
PTDEBUG && _d('Done nibbling'); PTDEBUG && _d('Done nibbling');
@@ -6538,10 +6559,13 @@ sub can_nibble {
die "There is no good index and the table is oversized."; die "There is no good index and the table is oversized.";
} }
my $pause_file = ($o->has('pause-file') && $o->get('pause-file')) || undef;
return { return {
row_est => $row_est, # nibble about this many rows row_est => $row_est, # nibble about this many rows
index => $index, # using this index index => $index, # using this index
one_nibble => $one_nibble, # if the table fits in one nibble/chunk one_nibble => $one_nibble, # if the table fits in one nibble/chunk
pause_file => $pause_file,
}; };
} }
@@ -12397,6 +12421,12 @@ short form: -p; type: string; group: Connection
Password to use when connecting. Password to use when connecting.
If password contains commas they must be escaped with a backslash: "exam\,ple" If password contains commas they must be escaped with a backslash: "exam\,ple"
=item --pause-file
type: string
Execution will be paused while the file specified by this param exists.
=item --pid =item --pid
type: string type: string

View File

@@ -263,6 +263,8 @@ sub new {
$self->{have_rows} = 0; $self->{have_rows} = 0;
$self->{rowno} = 0; $self->{rowno} = 0;
$self->{oktonibble} = 1; $self->{oktonibble} = 1;
$self->{pause_file} = $nibble_params->{pause_file};
$self->{sleep} = $args{sleep} || 60;
return bless $self, $class; return bless $self, $class;
} }
@@ -304,6 +306,24 @@ sub next {
# If there's another nibble, fetch the rows within it. # If there's another nibble, fetch the rows within it.
NIBBLE: NIBBLE:
while ( $self->{have_rows} || $self->_next_boundaries() ) { while ( $self->{have_rows} || $self->_next_boundaries() ) {
if ($self->{pause_file}) {
while(-f $self->{pause_file}) {
print "Sleeping $self->{sleep} seconds because $self->{pause_file} exists\n";
my $dbh = $self->{Cxn}->dbh();
if ( !$dbh || !$dbh->ping() ) {
eval { $dbh = $self->{Cxn}->connect() }; # connect or die trying
if ( $EVAL_ERROR ) {
chomp $EVAL_ERROR;
die "Lost connection to " . $self->{Cxn}->name() . " while waiting for "
. "replica lag ($EVAL_ERROR)\n";
}
}
$dbh->do("SELECT 'nibble iterator keepalive'");
sleep($self->{sleep});
}
}
# If no rows, then we just got the next boundaries, which start # If no rows, then we just got the next boundaries, which start
# the next nibble. # the next nibble.
if ( !$self->{have_rows} ) { if ( !$self->{have_rows} ) {
@@ -341,6 +361,7 @@ sub next {
} }
$self->{rowno} = 0; $self->{rowno} = 0;
$self->{have_rows} = 0; $self->{have_rows} = 0;
} }
PTDEBUG && _d('Done nibbling'); PTDEBUG && _d('Done nibbling');
@@ -493,10 +514,13 @@ sub can_nibble {
# The table can be nibbled if this point is reached, else we would have # The table can be nibbled if this point is reached, else we would have
# died earlier. Return some values about nibbling the table. # died earlier. Return some values about nibbling the table.
my $pause_file = ($o->has('pause-file') && $o->get('pause-file')) || undef;
return { return {
row_est => $row_est, # nibble about this many rows row_est => $row_est, # nibble about this many rows
index => $index, # using this index index => $index, # using this index
one_nibble => $one_nibble, # if the table fits in one nibble/chunk one_nibble => $one_nibble, # if the table fits in one nibble/chunk
pause_file => $pause_file,
}; };
} }

View File

@@ -23,6 +23,7 @@ use RowChecksum;
use NibbleIterator; use NibbleIterator;
use Cxn; use Cxn;
use PerconaTest; use PerconaTest;
use File::Temp qw/ tempfile /;
use constant PTDEBUG => $ENV{PTDEBUG} || 0; use constant PTDEBUG => $ENV{PTDEBUG} || 0;
@@ -92,6 +93,8 @@ sub make_nibble_iter {
resume => $args{resume}, resume => $args{resume},
order_by => $args{order_by}, order_by => $args{order_by},
comments => $args{comments}, comments => $args{comments},
pause_file => $o->get('pause-file'),
sleep => $args{sleep} || 60,
%common_modules, %common_modules,
); );
@@ -355,6 +358,29 @@ is(
"init callbacks can stop nibbling" "init callbacks can stop nibbling"
); );
my ($fh, $filename) = tempfile();
system ("sleep 3 && rm $filename &");
$ni = make_nibble_iter(
db => 'sakila',
tbl => 'payment',
sleep => 3,
argv => [qw(--databases sakila --tables payment --chunk-size 100 --pause-file ), $filename],
);
$output = output(
sub {
for (1..8) { $ni->next() }
},
);
like(
$output,
qr/Sleeping 3 seconds because/,
"nibbles paused"
);
# ############################################################################ # ############################################################################
# Nibble a larger table by numeric pk id # Nibble a larger table by numeric pk id
# ############################################################################ # ############################################################################