mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-01 18:25:59 +00:00
179 lines
5.1 KiB
Perl
179 lines
5.1 KiB
Perl
# This program is copyright 2007-2011 Baron Schwartz, 2011 Percona Ireland Ltd.
|
|
# Feedback and improvements are welcome.
|
|
#
|
|
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
|
|
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
|
|
# licenses.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
|
|
# Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
# ###########################################################################
|
|
# TableSyncGroupBy package
|
|
# ###########################################################################
|
|
{
|
|
# Package: TableSyncGroupBy
|
|
# TableSyncGroupBy is a table sync algo that uses GROUP BY.
|
|
# This package syncs tables without primary keys by doing an all-columns GROUP
|
|
# BY with a count, and then streaming through the results to see how many of
|
|
# each group exist.
|
|
package TableSyncGroupBy;
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use English qw(-no_match_vars);
|
|
use constant PTDEBUG => $ENV{PTDEBUG} || 0;
|
|
|
|
sub new {
|
|
my ( $class, %args ) = @_;
|
|
foreach my $arg ( qw(Quoter) ) {
|
|
die "I need a $arg argument" unless $args{$arg};
|
|
}
|
|
my $self = { %args };
|
|
return bless $self, $class;
|
|
}
|
|
|
|
sub name {
|
|
return 'GroupBy';
|
|
}
|
|
|
|
sub can_sync {
|
|
return 1; # We can sync anything.
|
|
}
|
|
|
|
sub prepare_to_sync {
|
|
my ( $self, %args ) = @_;
|
|
my @required_args = qw(tbl_struct cols ChangeHandler);
|
|
foreach my $arg ( @required_args ) {
|
|
die "I need a $arg argument" unless defined $args{$arg};
|
|
}
|
|
|
|
$self->{cols} = $args{cols};
|
|
$self->{buffer_in_mysql} = $args{buffer_in_mysql};
|
|
$self->{ChangeHandler} = $args{ChangeHandler};
|
|
|
|
$self->{count_col} = '__maatkit_count';
|
|
while ( $args{tbl_struct}->{is_col}->{$self->{count_col}} ) {
|
|
# Prepend more _ until not a column.
|
|
$self->{count_col} = "_$self->{count_col}";
|
|
}
|
|
PTDEBUG && _d('COUNT column will be named', $self->{count_col});
|
|
|
|
$self->{done} = 0;
|
|
|
|
return;
|
|
}
|
|
|
|
sub uses_checksum {
|
|
return 0; # We don't need checksum queries.
|
|
}
|
|
|
|
sub set_checksum_queries {
|
|
return; # This shouldn't be called, but just in case.
|
|
}
|
|
|
|
sub prepare_sync_cycle {
|
|
my ( $self, $host ) = @_;
|
|
return;
|
|
}
|
|
|
|
sub get_sql {
|
|
my ( $self, %args ) = @_;
|
|
my $cols = join(', ', map { $self->{Quoter}->quote($_) } @{$self->{cols}});
|
|
return "SELECT"
|
|
. ($self->{buffer_in_mysql} ? ' SQL_BUFFER_RESULT' : '')
|
|
. " $cols, COUNT(*) AS $self->{count_col}"
|
|
. ' FROM ' . $self->{Quoter}->quote(@args{qw(database table)})
|
|
. ' WHERE ' . ( $args{where} || '1=1' )
|
|
. " GROUP BY $cols ORDER BY $cols";
|
|
}
|
|
|
|
# The same row means that the key columns are equal, so there are rows with the
|
|
# same columns in both tables; but there are different numbers of rows. So we
|
|
# must either delete or insert the required number of rows to the table.
|
|
sub same_row {
|
|
my ( $self, %args ) = @_;
|
|
my ($lr, $rr) = @args{qw(lr rr)};
|
|
my $cc = $self->{count_col};
|
|
my $lc = $lr->{$cc};
|
|
my $rc = $rr->{$cc};
|
|
my $diff = abs($lc - $rc);
|
|
return unless $diff;
|
|
$lr = { %$lr };
|
|
delete $lr->{$cc};
|
|
$rr = { %$rr };
|
|
delete $rr->{$cc};
|
|
foreach my $i ( 1 .. $diff ) {
|
|
if ( $lc > $rc ) {
|
|
$self->{ChangeHandler}->change('INSERT', $lr, $self->key_cols());
|
|
}
|
|
else {
|
|
$self->{ChangeHandler}->change('DELETE', $rr, $self->key_cols());
|
|
}
|
|
}
|
|
}
|
|
|
|
# Insert into the table the specified number of times.
|
|
sub not_in_right {
|
|
my ( $self, %args ) = @_;
|
|
my $lr = $args{lr};
|
|
$lr = { %$lr };
|
|
my $cnt = delete $lr->{$self->{count_col}};
|
|
foreach my $i ( 1 .. $cnt ) {
|
|
$self->{ChangeHandler}->change('INSERT', $lr, $self->key_cols());
|
|
}
|
|
}
|
|
|
|
# Delete from the table the specified number of times.
|
|
sub not_in_left {
|
|
my ( $self, %args ) = @_;
|
|
my $rr = $args{rr};
|
|
$rr = { %$rr };
|
|
my $cnt = delete $rr->{$self->{count_col}};
|
|
foreach my $i ( 1 .. $cnt ) {
|
|
$self->{ChangeHandler}->change('DELETE', $rr, $self->key_cols());
|
|
}
|
|
}
|
|
|
|
sub done_with_rows {
|
|
my ( $self ) = @_;
|
|
$self->{done} = 1;
|
|
}
|
|
|
|
sub done {
|
|
my ( $self ) = @_;
|
|
return $self->{done};
|
|
}
|
|
|
|
sub key_cols {
|
|
my ( $self ) = @_;
|
|
return $self->{cols};
|
|
}
|
|
|
|
# Return 1 if you have changes yet to make and you don't want the TableSyncer to
|
|
# commit your transaction or release your locks.
|
|
sub pending_changes {
|
|
my ( $self ) = @_;
|
|
return;
|
|
}
|
|
|
|
sub _d {
|
|
my ($package, undef, $line) = caller 0;
|
|
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
|
|
map { defined $_ ? $_ : 'undef' }
|
|
@_;
|
|
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
|
|
}
|
|
|
|
1;
|
|
}
|
|
# ###########################################################################
|
|
# End TableSyncGroupBy package
|
|
# ###########################################################################
|