Implement table progress. Adj rate and chunk size only if nibbled > 0 rows. Always use --chunk-size if --chunk-time=0.

This commit is contained in:
Daniel Nichter
2011-09-23 08:54:58 -06:00
parent 361c958a66
commit b783470aaa

View File

@@ -4163,6 +4163,10 @@ sub table_is_allowed {
my $filter = $self->{filters}; my $filter = $self->{filters};
if ( $db eq 'mysql' && ($tbl eq 'general_log' || $tbl eq 'slow_log') ) {
return 0;
}
if ( $filter->{'ignore-tables'}->{$tbl} if ( $filter->{'ignore-tables'}->{$tbl}
&& ($filter->{'ignore-tables'}->{$tbl} eq '*' && ($filter->{'ignore-tables'}->{$tbl} eq '*'
|| $filter->{'ignore-tables'}->{$tbl} eq $db) ) { || $filter->{'ignore-tables'}->{$tbl} eq $db) ) {
@@ -4320,6 +4324,291 @@ sub _d {
# End Retry package # End Retry package
# ########################################################################### # ###########################################################################
# ###########################################################################
# Transformers package
# ###########################################################################
{
# Package: Transformers
# Transformers exports subroutines that convert and beautify values.
package Transformers;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
use Time::Local qw(timegm timelocal);
use Digest::MD5 qw(md5_hex);
require Exporter;
our @ISA = qw(Exporter);
our %EXPORT_TAGS = ();
our @EXPORT = ();
our @EXPORT_OK = qw(
micro_t
percentage_of
secs_to_time
time_to_secs
shorten
ts
parse_timestamp
unix_timestamp
any_unix_timestamp
make_checksum
crc32
);
our $mysql_ts = qr/(\d\d)(\d\d)(\d\d) +(\d+):(\d+):(\d+)(\.\d+)?/;
our $proper_ts = qr/(\d\d\d\d)-(\d\d)-(\d\d)[T ](\d\d):(\d\d):(\d\d)(\.\d+)?/;
our $n_ts = qr/(\d{1,5})([shmd]?)/; # Limit \d{1,5} because \d{6} looks
# like a MySQL YYMMDD without hh:mm:ss.
sub micro_t {
my ( $t, %args ) = @_;
my $p_ms = defined $args{p_ms} ? $args{p_ms} : 0; # precision for ms vals
my $p_s = defined $args{p_s} ? $args{p_s} : 0; # precision for s vals
my $f;
$t = 0 if $t < 0;
# "Remove" scientific notation so the regex below does not make
# 6.123456e+18 into 6.123456.
$t = sprintf('%.17f', $t) if $t =~ /e/;
# Truncate after 6 decimal places to avoid 0.9999997 becoming 1
# because sprintf() rounds.
$t =~ s/\.(\d{1,6})\d*/\.$1/;
if ($t > 0 && $t <= 0.000999) {
$f = ($t * 1000000) . 'us';
}
elsif ($t >= 0.001000 && $t <= 0.999999) {
$f = sprintf("%.${p_ms}f", $t * 1000);
$f = ($f * 1) . 'ms'; # * 1 to remove insignificant zeros
}
elsif ($t >= 1) {
$f = sprintf("%.${p_s}f", $t);
$f = ($f * 1) . 's'; # * 1 to remove insignificant zeros
}
else {
$f = 0; # $t should = 0 at this point
}
return $f;
}
# Returns what percentage $is of $of.
sub percentage_of {
my ( $is, $of, %args ) = @_;
my $p = $args{p} || 0; # float precision
my $fmt = $p ? "%.${p}f" : "%d";
return sprintf $fmt, ($is * 100) / ($of ||= 1);
}
sub secs_to_time {
my ( $secs, $fmt ) = @_;
$secs ||= 0;
return '00:00' unless $secs;
# Decide what format to use, if not given
$fmt ||= $secs >= 86_400 ? 'd'
: $secs >= 3_600 ? 'h'
: 'm';
return
$fmt eq 'd' ? sprintf(
"%d+%02d:%02d:%02d",
int($secs / 86_400),
int(($secs % 86_400) / 3_600),
int(($secs % 3_600) / 60),
$secs % 60)
: $fmt eq 'h' ? sprintf(
"%02d:%02d:%02d",
int(($secs % 86_400) / 3_600),
int(($secs % 3_600) / 60),
$secs % 60)
: sprintf(
"%02d:%02d",
int(($secs % 3_600) / 60),
$secs % 60);
}
# Convert time values to number of seconds:
# 1s = 1, 1m = 60, 1h = 3600, 1d = 86400.
sub time_to_secs {
my ( $val, $default_suffix ) = @_;
die "I need a val argument" unless defined $val;
my $t = 0;
my ( $prefix, $num, $suffix ) = $val =~ m/([+-]?)(\d+)([a-z])?$/;
$suffix = $suffix || $default_suffix || 's';
if ( $suffix =~ m/[smhd]/ ) {
$t = $suffix eq 's' ? $num * 1 # Seconds
: $suffix eq 'm' ? $num * 60 # Minutes
: $suffix eq 'h' ? $num * 3600 # Hours
: $num * 86400; # Days
$t *= -1 if $prefix && $prefix eq '-';
}
else {
die "Invalid suffix for $val: $suffix";
}
return $t;
}
sub shorten {
my ( $num, %args ) = @_;
my $p = defined $args{p} ? $args{p} : 2; # float precision
my $d = defined $args{d} ? $args{d} : 1_024; # divisor
my $n = 0;
my @units = ('', qw(k M G T P E Z Y));
while ( $num >= $d && $n < @units - 1 ) {
$num /= $d;
++$n;
}
return sprintf(
$num =~ m/\./ || $n
? "%.${p}f%s"
: '%d',
$num, $units[$n]);
}
# Turns a unix timestamp into an ISO8601 formatted date and time. $gmt makes
# this relative to GMT, for test determinism.
sub ts {
my ( $time, $gmt ) = @_;
my ( $sec, $min, $hour, $mday, $mon, $year )
= $gmt ? gmtime($time) : localtime($time);
$mon += 1;
$year += 1900;
my $val = sprintf("%d-%02d-%02dT%02d:%02d:%02d",
$year, $mon, $mday, $hour, $min, $sec);
if ( my ($us) = $time =~ m/(\.\d+)$/ ) {
$us = sprintf("%.6f", $us);
$us =~ s/^0\././;
$val .= $us;
}
return $val;
}
# Turns MySQL's 071015 21:43:52 into a properly formatted timestamp. Also
# handles a timestamp with fractions after it.
sub parse_timestamp {
my ( $val ) = @_;
if ( my($y, $m, $d, $h, $i, $s, $f)
= $val =~ m/^$mysql_ts$/ )
{
return sprintf "%d-%02d-%02d %02d:%02d:"
. (defined $f ? '%09.6f' : '%02d'),
$y + 2000, $m, $d, $h, $i, (defined $f ? $s + $f : $s);
}
return $val;
}
# Turns a properly formatted timestamp like 2007-10-15 01:43:52
# into an int (seconds since epoch). Optional microseconds are printed. $gmt
# makes it use GMT time instead of local time (to make tests deterministic).
sub unix_timestamp {
my ( $val, $gmt ) = @_;
if ( my($y, $m, $d, $h, $i, $s, $us) = $val =~ m/^$proper_ts$/ ) {
$val = $gmt
? timegm($s, $i, $h, $d, $m - 1, $y)
: timelocal($s, $i, $h, $d, $m - 1, $y);
if ( defined $us ) {
$us = sprintf('%.6f', $us);
$us =~ s/^0\././;
$val .= $us;
}
}
return $val;
}
# Turns several different types of timestamps into a unix timestamp.
# Each type is auto-detected. Supported types are:
# * N[shdm] Now - N[shdm]
# * 071015 21:43:52 MySQL slow log timestamp
# * 2009-07-01 [3:43:01] Proper timestamp with options HH:MM:SS
# * NOW() A MySQL time express
# For the last type, the callback arg is required. It is passed the
# given value/expression and is expected to return a single value
# (the result of the expression).
sub any_unix_timestamp {
my ( $val, $callback ) = @_;
if ( my ($n, $suffix) = $val =~ m/^$n_ts$/ ) {
$n = $suffix eq 's' ? $n # Seconds
: $suffix eq 'm' ? $n * 60 # Minutes
: $suffix eq 'h' ? $n * 3600 # Hours
: $suffix eq 'd' ? $n * 86400 # Days
: $n; # default: Seconds
MKDEBUG && _d('ts is now - N[shmd]:', $n);
return time - $n;
}
elsif ( $val =~ m/^\d{9,}/ ) {
# unix timestamp 100000000 is roughly March, 1973, so older
# dates won't be caught here; they'll probably be mistaken
# for a MySQL slow log timestamp.
MKDEBUG && _d('ts is already a unix timestamp');
return $val;
}
elsif ( my ($ymd, $hms) = $val =~ m/^(\d{6})(?:\s+(\d+:\d+:\d+))?/ ) {
MKDEBUG && _d('ts is MySQL slow log timestamp');
$val .= ' 00:00:00' unless $hms;
return unix_timestamp(parse_timestamp($val));
}
elsif ( ($ymd, $hms) = $val =~ m/^(\d{4}-\d\d-\d\d)(?:[T ](\d+:\d+:\d+))?/) {
MKDEBUG && _d('ts is properly formatted timestamp');
$val .= ' 00:00:00' unless $hms;
return unix_timestamp($val);
}
else {
MKDEBUG && _d('ts is MySQL expression');
return $callback->($val) if $callback && ref $callback eq 'CODE';
}
MKDEBUG && _d('Unknown ts type:', $val);
return;
}
# Returns the rightmost 64 bits of an MD5 checksum of the value.
sub make_checksum {
my ( $val ) = @_;
my $checksum = uc substr(md5_hex($val), -16);
MKDEBUG && _d($checksum, 'checksum for', $val);
return $checksum;
}
# Perl implementation of CRC32, ripped off from Digest::Crc32. The results
# ought to match what you get from any standard CRC32 implementation, such as
# that inside MySQL.
sub crc32 {
my ( $string ) = @_;
return unless $string;
my $poly = 0xEDB88320;
my $crc = 0xFFFFFFFF;
foreach my $char ( split(//, $string) ) {
my $comp = ($crc ^ ord($char)) & 0xFF;
for ( 1 .. 8 ) {
$comp = $comp & 1 ? $poly ^ ($comp >> 1) : $comp >> 1;
}
$crc = (($crc >> 8) & 0x00FFFFFF) ^ $comp;
}
return $crc ^ 0xFFFFFFFF;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End Transformers package
# ###########################################################################
# ########################################################################### # ###########################################################################
# Progress package # Progress package
# This package is a copy without comments from the original. The original # This package is a copy without comments from the original. The original
@@ -4970,8 +5259,6 @@ sub main {
my $total_rows = 0; my $total_rows = 0;
my $total_time = 0; my $total_time = 0;
my $total_rate = 0; my $total_rate = 0;
my $chunk_size = $o->get('chunk-size');
my $chunk_time = $o->get('chunk-time');
# ######################################################################## # ########################################################################
# Callbacks for each table's nibble iterator. All checksum work is done # Callbacks for each table's nibble iterator. All checksum work is done
@@ -4987,7 +5274,7 @@ sub main {
# skip this chunk and get fetch the next boundary. # skip this chunk and get fetch the next boundary.
my $is_oversize = is_oversize_chunk( my $is_oversize = is_oversize_chunk(
%args, %args,
chunk_size => $chunk_size, chunk_size => $tbl->{chunk_size},
chunk_size_limit => $o->get('chunk-size-limit'), chunk_size_limit => $o->get('chunk-size-limit'),
); );
if ( $is_oversize ) { if ( $is_oversize ) {
@@ -5026,42 +5313,51 @@ sub main {
$fetch_sth->finish(); $fetch_sth->finish();
$update_sth->finish(); $update_sth->finish();
# Update the rate of rows per second for the entire server. # Update rate, chunk size, and progress if the nibble actually
# This is used for the initial chunk size of the next table. # selected some rows.
$total_rows += $cnt; if ( ($cnt || 0) > 0 ) {
$total_time += $tbl->{nibble_time}; # Update the rate of rows per second for the entire server.
$total_rate = int($total_rows / $total_time); # This is used for the initial chunk size of the next table.
MKDEBUG && _d('Total avg rate:', $total_rate); $total_rows += $cnt;
$total_time += $tbl->{nibble_time};
$total_rate = int($total_rows / $total_time);
MKDEBUG && _d('Total avg rate:', $total_rate);
# Adjust chunk size. This affects the next chunk. # Adjust chunk size. This affects the next chunk.
if ( $o->get('chunk-time') ) { if ( $o->get('chunk-time') ) {
$chunk_size = $tbl->{rate}->update($cnt, $tbl->{nibble_time}); $tbl->{chunk_size}
if ( $chunk_size < 1 ) { = $tbl->{rate}->update($cnt, $tbl->{nibble_time});
# This shouldn't happen. WeightedAvgRate::update() may return if ( $tbl->{chunk_size} < 1 ) {
# a value < 1, but minimum chunk size is 1. # This shouldn't happen. WeightedAvgRate::update() may return
warn "Checksums are executing very slowly. --chunk-size " # a value < 1, but minimum chunk size is 1.
. "has been automatically reduced to 1. Check that the " warn "Checksums are executing very slowly. --chunk-size "
. "server is not being overloaded, or increase " . "has been automatically reduced to 1. Check that the "
. "--chunk-time. The last chunk, number $args{nibbleno} " . "server is not being overloaded, or increase "
. "of table $tbl->{db}.$tbl->{tbl}, selected $cnt rows " . "--chunk-time. The last chunk, number $args{nibbleno} "
. "and took " . "of table $tbl->{db}.$tbl->{tbl}, selected $cnt rows "
. sprintf('%.3f', $tbl->{nibble_time}) . "and took "
. " seconds to execute.\n"; . sprintf('%.3f', $tbl->{nibble_time})
$chunk_size = 1; . " seconds to execute.\n";
$tbl->{chunk_size} = 1;
}
$args{NibbleIterator}->set_chunk_size($tbl->{chunk_size});
}
if ( my $tbl_pr = $tbl->{progress} ) {
$tbl_pr->update(sub {return $tbl->{checksum_results}->{n_rows}});
} }
$args{NibbleIterator}->set_chunk_size($chunk_size);
} }
# Wait forever for slaves to catch up. # Wait forever for slaves to catch up.
my $pr; my $lag_pr;
if ( $o->get('progress') ) { if ( $o->get('progress') ) {
$pr = new Progress( $lag_pr = new Progress(
jobsize => scalar @$slaves, jobsize => scalar @$slaves,
spec => $o->get('progress'), spec => $o->get('progress'),
name => "Waiting for replicas to catch up", name => "Waiting for replicas to catch up",
); );
} }
$replica_lag->wait(Progress => $pr); $replica_lag->wait(Progress => $lag_pr);
return; return;
}, },
@@ -5088,6 +5384,8 @@ sub main {
# be saved here. print_checksum_results() uses this info. # be saved here. print_checksum_results() uses this info.
$tbl->{checksum_results} = {}; $tbl->{checksum_results} = {};
# USE the correct db while checksumming this table. The "correct"
# db is a complicated subject; see sub for comments.
use_repl_db( use_repl_db(
dbh => $dbh, dbh => $dbh,
tbl => $tbl, tbl => $tbl,
@@ -5096,16 +5394,42 @@ sub main {
Quoter => $q, Quoter => $q,
); );
# Set table's initial chunk size. If this is the first table,
# then total rate will be zero, so use --chunk-size. Or, if
# --chunk-time=0, then only use --chunk-size for every table.
# Else, the initial chunk size is based on the total rates of
# rows/s from all previous tables.
my $chunk_time = $o->get('chunk-time');
my $chunk_size = $chunk_time && $total_rate
? int($total_rate * $chunk_time)
: $o->get('chunk-size');
$tbl->{chunk_size} = $chunk_size;
# Init a new weighted avg rate calculator for the table.
$tbl->{rate} = new WeightedAvgRate(target_t => $chunk_time);
# Make a Progress obj for this table. It may not be used;
# depends on how many rows, chunk size, how fast the server
# is, etc. But just in case, all tables have a Progress obj.
if ( $o->get('progress') ) {
$tbl->{progress} = table_progress(
dbh => $dbh,
tbl => $tbl,
OptionParser => $o,
Quoter => $q,
);
}
# Make a nibble iterator for this table.
my $checksum_cols = $rc->make_chunk_checksum( my $checksum_cols = $rc->make_chunk_checksum(
dbh => $dbh, dbh => $dbh,
tbl => $tbl, tbl => $tbl,
%crc_args %crc_args
); );
my $nibble_iter = new NibbleIterator( my $nibble_iter = new NibbleIterator(
dbh => $dbh, dbh => $dbh,
tbl => $tbl, tbl => $tbl,
chunk_size => $total_rate ? int($total_rate * $chunk_time) chunk_size => $tbl->{chunk_size},
: $o->get('chunk-size'),
chunk_index => $o->get('chunk-index'), chunk_index => $o->get('chunk-index'),
dms => $checksum_dms, dms => $checksum_dms,
select => $checksum_cols, select => $checksum_cols,
@@ -5116,9 +5440,7 @@ sub main {
TableParser => $tp, TableParser => $tp,
); );
# Init a new weighted avg rate calculator for the table. # Finally, checksum the table.
$tbl->{rate} = new WeightedAvgRate(target_t => $o->get('chunk-time'));
# The "1 while" loop is necessary because we're executing REPLACE # The "1 while" loop is necessary because we're executing REPLACE
# statements which don't return rows and NibbleIterator only # statements which don't return rows and NibbleIterator only
# returns if it has rows to return. So all the work is done via # returns if it has rows to return. So all the work is done via
@@ -5137,6 +5459,7 @@ sub main {
$exit_status |= 1 if $tbl->{checksum_results}->{errors}; $exit_status |= 1 if $tbl->{checksum_results}->{errors};
} }
MKDEBUG && _d('Exit status', $exit_status, 'oktorun', $oktorun);
return $exit_status; return $exit_status;
} }
@@ -5318,7 +5641,6 @@ sub check_repl_table {
return; return;
} }
# Sub: use_repl_db # Sub: use_repl_db
# USE the correct database for the --replicate table. # USE the correct database for the --replicate table.
# This sub must be called before any work is done with the --replicatte # This sub must be called before any work is done with the --replicatte
@@ -5499,45 +5821,29 @@ sub print_inconsistent_tbls {
return; return;
} }
sub table_progress {
# Sub: _explain my (%args) = @_;
# EXPLAIN a chunk or table. my @required_args = qw(dbh tbl OptionParser Quoter);
#
# Parameters:
# %args - Arguments
#
# Required Arguments:
# * dbh - dbh
# * db - db name, not quoted
# * tbl - tbl name, not quoted
# * Quoter - <Quoter> object
#
# Optional Arguments:
# * where - Arrayref of WHERE clauses added to chunk
# * index_hint - FORCE INDEX clause
#
# Returns:
# Hashref of first EXPLAIN row
sub _explain {
my ( %args ) = @_;
my @required_args = qw(dbh db tbl Quoter);
foreach my $arg ( @required_args ) { foreach my $arg ( @required_args ) {
die "I need a $arg argument" unless $args{$arg}; die "I need a $arg argument" unless $args{$arg};
} }
my ($dbh, $db, $tbl, $q) = @args{@required_args}; my ($dbh, $tbl, $o, $q) = @args{@required_args};
my $db_tbl = $q->quote($db, $tbl);
my $where;
if ( $args{where} && @{$args{where}} ) {
$where = join(" AND ", map { "($_)" } grep { defined } @{$args{where}});
}
my $sql = "EXPLAIN SELECT * FROM $db_tbl"
. ($args{index_hint} ? " $args{index_hint}" : "")
. ($args{where} ? " WHERE $where" : "");
MKDEBUG && _d($dbh, $sql);
my $table = $q->quote(@{$tbl}{qw(db tbl)});
my $sql = "EXPLAIN SELECT COUNT(*) FROM $table"
. ($args{where} ? " WHERE $args{where}" : '');
MKDEBUG && _d($sql);
my $expl = $dbh->selectrow_hashref($sql); my $expl = $dbh->selectrow_hashref($sql);
return $expl; my $rows = $expl->{rows} || 0;
my $pr;
if ( $rows ) {
$pr = new Progress(
jobsize => $rows,
spec => $o->get('progress'),
name => "Checksumming $tbl->{db}.$tbl->{tbl}",
);
}
return $pr;
} }
# Catches signals so we can exit gracefully. # Catches signals so we can exit gracefully.