Files
percona-toolkit/lib/LogSplitter.pm

445 lines
14 KiB
Perl

# This program is copyright 2008-2011 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# LogSplitter package $Revision: 7177 $
# ###########################################################################
# Package: LogSplitter
# LogSplitter splits MySQL query logs by sessions.
{
package LogSplitter;
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
use Data::Dumper;
$Data::Dumper::Indent = 1;
$Data::Dumper::Sortkeys = 1;
$Data::Dumper::Quotekeys = 0;
my $oktorun = 1;
sub new {
my ( $class, %args ) = @_;
foreach my $arg ( qw(attribute base_dir parser session_files) ) {
die "I need a $arg argument" unless $args{$arg};
}
# TODO: this is probably problematic on Windows
$args{base_dir} .= '/' if substr($args{base_dir}, -1, 1) ne '/';
if ( $args{split_random} ) {
MKDEBUG && _d('Split random');
$args{attribute} = '_sessionno'; # set round-robin 1..session_files
}
my $self = {
# %args will override these default args if given explicitly.
base_file_name => 'session',
max_dirs => 1_000,
max_files_per_dir => 5_000,
max_sessions => 5_000_000, # max_dirs * max_files_per_dir
merge_sessions => 1,
session_files => 64,
quiet => 0,
verbose => 0,
max_open_files => 1_000,
close_lru_files => 100,
# Override default args above.
%args,
# These args cannot be overridden.
n_dirs_total => 0, # total number of dirs created
n_files_total => 0, # total number of session files created
n_files_this_dir => -1, # number of session files in current dir
session_fhs => [], # filehandles for each session
n_open_fhs => 0, # current number of open session filehandles
n_events_total => 0, # total number of events in log
n_events_saved => 0, # total number of events saved
n_sessions_skipped => 0, # total number of sessions skipped
n_sessions_saved => 0, # number of sessions saved
sessions => {}, # sessions data store
created_dirs => [],
};
MKDEBUG && _d('new LogSplitter final args:', Dumper($self));
return bless $self, $class;
}
sub split {
my ( $self, @logs ) = @_;
$oktorun = 1; # True as long as we haven't created too many
# session files or too many dirs and files
my $callbacks = $self->{callbacks};
my $next_sessionno;
if ( $self->{split_random} ) {
# round-robin iterator
$next_sessionno = make_rr_iter(1, $self->{session_files});
}
if ( @logs == 0 ) {
MKDEBUG && _d('Implicitly reading STDIN because no logs were given');
push @logs, '-';
}
# Split all the log files.
my $lp = $self->{parser};
LOG:
foreach my $log ( @logs ) {
last unless $oktorun;
next unless defined $log;
if ( !-f $log && $log ne '-' ) {
warn "Skipping $log because it is not a file";
next LOG;
}
my $fh;
if ( $log eq '-' ) {
$fh = *STDIN;
}
else {
if ( !open $fh, "<", $log ) {
warn "Cannot open $log: $OS_ERROR\n";
next LOG;
}
}
MKDEBUG && _d('Splitting', $log);
my $event = {};
my $more_events = 1;
my $more_events_sub = sub { $more_events = $_[0]; };
EVENT:
while ( $oktorun ) {
$event = $lp->parse_event(
next_event => sub { return <$fh>; },
tell => sub { return tell $fh; },
oktorun => $more_events_sub,
);
if ( $event ) {
$self->{n_events_total}++;
if ( $self->{split_random} ) {
$event->{_sessionno} = $next_sessionno->();
}
if ( $callbacks ) {
foreach my $callback ( @$callbacks ) {
$event = $callback->($event);
last unless $event;
}
}
$self->_save_event($event) if $event;
}
if ( !$more_events ) {
MKDEBUG && _d('Done parsing', $log);
close $fh;
next LOG;
}
last LOG unless $oktorun;
}
}
# Close session filehandles.
while ( my $fh = pop @{ $self->{session_fhs} } ) {
close $fh->{fh};
}
$self->{n_open_fhs} = 0;
$self->_merge_session_files() if $self->{merge_sessions};
$self->print_split_summary() unless $self->{quiet};
return;
}
sub _save_event {
my ( $self, $event ) = @_;
my ($session, $session_id) = $self->_get_session_ds($event);
return unless $session;
if ( !defined $session->{fh} ) {
$self->{n_sessions_saved}++;
MKDEBUG && _d('New session:', $session_id, ',',
$self->{n_sessions_saved}, 'of', $self->{max_sessions});
my $session_file = $self->_get_next_session_file();
if ( !$session_file ) {
$oktorun = 0;
MKDEBUG && _d('Not oktorun because no _get_next_session_file');
return;
}
# Close Last Recently Used session fhs if opening if this new
# session fh will cause us to have too many open files.
if ( $self->{n_open_fhs} >= $self->{max_open_files} ) {
$self->_close_lru_session()
}
# Open a fh for this session file.
open my $fh, '>', $session_file
or die "Cannot open session file $session_file: $OS_ERROR";
$session->{fh} = $fh;
$self->{n_open_fhs}++;
# Save fh and session file in case we need to open/close it later.
$session->{active} = 1;
$session->{session_file} = $session_file;
push @{$self->{session_fhs}}, { fh => $fh, session_id => $session_id };
MKDEBUG && _d('Created', $session_file, 'for session',
$self->{attribute}, '=', $session_id);
# This special comment lets mk-log-player know when a session begins.
print $fh "-- START SESSION $session_id\n\n";
}
elsif ( !$session->{active} ) {
# Reopen the existing but inactive session. This happens when
# a new session (above) had to close LRU session fhs.
# Again, close Last Recently Used session fhs if reopening if this
# session's fh will cause us to have too many open files.
if ( $self->{n_open_fhs} >= $self->{max_open_files} ) {
$self->_close_lru_session();
}
# Reopen this session's fh.
open $session->{fh}, '>>', $session->{session_file}
or die "Cannot reopen session file "
. "$session->{session_file}: $OS_ERROR";
# Mark this session as active again.
$session->{active} = 1;
$self->{n_open_fhs}++;
MKDEBUG && _d('Reopend', $session->{session_file}, 'for session',
$self->{attribute}, '=', $session_id);
}
else {
MKDEBUG && _d('Event belongs to active session', $session_id);
}
my $session_fh = $session->{fh};
# Print USE db if 1) we haven't done so yet or 2) the db has changed.
my $db = $event->{db} || $event->{Schema};
if ( $db && ( !defined $session->{db} || $session->{db} ne $db ) ) {
print $session_fh "use $db\n\n";
$session->{db} = $db;
}
print $session_fh $self->flatten($event->{arg}), "\n\n";
$self->{n_events_saved}++;
return;
}
# Returns shortcut to session data store and id for the given event.
# The returned session will be undef if no more sessions are allowed.
sub _get_session_ds {
my ( $self, $event ) = @_;
my $attrib = $self->{attribute};
if ( !$event->{ $attrib } ) {
MKDEBUG && _d('No attribute', $attrib, 'in event:', Dumper($event));
return;
}
# This could indicate a problem in parser not parsing
# a log event correctly thereby leaving $event->{arg} undefined.
# Or, it could simply be an event like:
# use db;
# SET NAMES utf8;
return unless $event->{arg};
# Don't print admin commands like quit or ping because these
# cannot be played.
return if ($event->{cmd} || '') eq 'Admin';
my $session;
my $session_id = $event->{ $attrib };
# The following is necessary to prevent Perl from auto-vivifying
# a lot of empty hashes for new sessions that are ignored due to
# already having max_sessions.
if ( $self->{n_sessions_saved} < $self->{max_sessions} ) {
# Will auto-vivify if necessary.
$session = $self->{sessions}->{ $session_id } ||= {};
}
elsif ( exists $self->{sessions}->{ $session_id } ) {
# Use only existing sessions.
$session = $self->{sessions}->{ $session_id };
}
else {
$self->{n_sessions_skipped} += 1;
MKDEBUG && _d('Skipping new session', $session_id,
'because max_sessions is reached');
}
return $session, $session_id;
}
sub _close_lru_session {
my ( $self ) = @_;
my $session_fhs = $self->{session_fhs};
my $lru_n = $self->{n_sessions_saved} - $self->{max_open_files} - 1;
my $close_to_n = $lru_n + $self->{close_lru_files} - 1;
MKDEBUG && _d('Closing session fhs', $lru_n, '..', $close_to_n,
'(',$self->{n_sessions}, 'sessions', $self->{n_open_fhs}, 'open fhs)');
foreach my $session ( @$session_fhs[ $lru_n..$close_to_n ] ) {
close $session->{fh};
$self->{n_open_fhs}--;
$self->{sessions}->{ $session->{session_id} }->{active} = 0;
}
return;
}
# Returns an empty string on failure, or the next session file name on success.
# This will fail if we have opened maxdirs and maxfiles.
sub _get_next_session_file {
my ( $self, $n ) = @_;
return if $self->{n_dirs_total} >= $self->{max_dirs};
# n_files_this_dir will only be < 0 for the first dir and file
# because n_file is set to -1 in new(). This is a hack
# to cause the first dir and file to be created automatically.
if ( ($self->{n_files_this_dir} >= $self->{max_files_per_dir})
|| $self->{n_files_this_dir} < 0 ) {
$self->{n_dirs_total}++;
$self->{n_files_this_dir} = 0;
my $new_dir = "$self->{base_dir}$self->{n_dirs_total}";
if ( !-d $new_dir ) {
my $retval = system("mkdir $new_dir");
if ( ($retval >> 8) != 0 ) {
die "Cannot create new directory $new_dir: $OS_ERROR";
}
MKDEBUG && _d('Created new base_dir', $new_dir);
push @{$self->{created_dirs}}, $new_dir;
}
elsif ( MKDEBUG ) {
_d($new_dir, 'already exists');
}
}
else {
MKDEBUG && _d('No dir created; n_files_this_dir:',
$self->{n_files_this_dir}, 'n_files_total:',
$self->{n_files_total});
}
$self->{n_files_total}++;
$self->{n_files_this_dir}++;
my $dir_n = $self->{n_dirs_total} . '/';
my $session_n = sprintf '%d', $n || $self->{n_sessions_saved};
my $session_file = $self->{base_dir}
. $dir_n
. $self->{base_file_name}."-$session_n.txt";
MKDEBUG && _d('Next session file', $session_file);
return $session_file;
}
# Flattens multiple new-line and spaces to single new-lines and spaces
# and remove /* comment */ blocks.
sub flatten {
my ( $self, $query ) = @_;
return unless $query;
$query =~ s!/\*.*?\*/! !g;
$query =~ s/^\s+//;
$query =~ s/\s{2,}/ /g;
return $query;
}
sub _merge_session_files {
my ( $self ) = @_;
print "Merging session files...\n" unless $self->{quiet};
my @multi_session_files;
for my $i ( 1..$self->{session_files} ) {
push @multi_session_files, $self->{base_dir} ."sessions-$i.txt";
}
my @single_session_files = map {
$_->{session_file};
} values %{$self->{sessions}};
my $i = make_rr_iter(0, $#multi_session_files); # round-robin iterator
foreach my $single_session_file ( @single_session_files ) {
my $multi_session_file = $multi_session_files[ $i->() ];
my $cmd;
if ( $self->{split_random} ) {
$cmd = "mv $single_session_file $multi_session_file";
}
else {
$cmd = "cat $single_session_file >> $multi_session_file";
}
eval { `$cmd`; };
if ( $EVAL_ERROR ) {
warn "Failed to `$cmd`: $OS_ERROR";
}
}
foreach my $created_dir ( @{$self->{created_dirs}} ) {
my $cmd = "rm -rf $created_dir";
eval { `$cmd`; };
if ( $EVAL_ERROR ) {
warn "Failed to `$cmd`: $OS_ERROR";
}
}
return;
}
sub make_rr_iter {
my ( $start, $end ) = @_;
my $current = $start;
return sub {
$current = $start if $current > $end ;
$current++; # For next iteration.
return $current - 1;
};
}
sub print_split_summary {
my ( $self ) = @_;
print "Split summary:\n";
my $fmt = "%-20s %-10s\n";
printf $fmt, 'Total sessions',
$self->{n_sessions_saved} + $self->{n_sessions_skipped};
printf $fmt, 'Sessions saved',
$self->{n_sessions_saved};
printf $fmt, 'Total events', $self->{n_events_total};
printf $fmt, 'Events saved', $self->{n_events_saved};
return;
}
sub _d {
my ($package, undef, $line) = caller 0;
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
map { defined $_ ? $_ : 'undef' }
@_;
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}
1;
}
# ###########################################################################
# End LogSplitter package
# ###########################################################################