Remove code to handle metadata; instead, let tasks handle metadata and simply concat and send any service.meta.* files/values. Prefix run files with a ts to allow for concurrent runs of the same service.

This commit is contained in:
Daniel Nichter
2013-05-01 20:19:42 -07:00
parent 89db22894e
commit fc420ea88e
6 changed files with 139 additions and 135 deletions

View File

@@ -5085,15 +5085,18 @@ sub init_lib_dir {
# Optiona args
my $verify = $args{verify};
my $quiet = $args{quiet};
_info(($verify ? 'Verify' : 'Initializing') . " --lib $lib_dir");
_info(($verify ? 'Verify' : 'Initializing') . " --lib $lib_dir")
unless $quiet;
if ( ! -d $lib_dir ) {
if ( $verify ) {
die "$lib_dir does not exist\n";
}
else {
_info("$lib_dir does not exist, creating");
_info("$lib_dir does not exist, creating")
unless $quiet;
_safe_mkdir($lib_dir);
}
}
@@ -5108,7 +5111,8 @@ sub init_lib_dir {
die "$dir does not exist\n";
}
else {
_info("$dir does not exist, creating");
_info("$dir does not exist, creating")
unless $quiet;
_safe_mkdir($dir);
}
}
@@ -6039,19 +6043,8 @@ sub run_service {
my $cxn = $args{Cxn};
# Optional args
my $json = $args{json}; # for testing
my $suffix = $args{suffix} || '.' . int(time); # for testing
my $curr_ts = $args{curr_ts} || ts(time, 1); # 1=UTC
# The seconds are :01 or :02 because cron runs commands about 1 or 2
# seconds after the minute mark. Unfortunately this means that interval
# 00:00:01 is not the same as 00:00::02 to MySQL, but for our purposes
# they are the same interval. This matters because, not only should data
# intervals align, but intervals across all agents should align too. So
# agent 1 data for :00-:05 should align with agent 2 data for the same
# interval, i.e. SELECT * FROM query_samples WHERE start_ts='00:00:00'
# should get data for that interval for all agents.
$curr_ts =~ s/\d\d.\d+$/00/; # :01.123456 -> :00
my $json = $args{json}; # for testing
my $prefix = $args{prefix} || int(time); # for testing
# Can't do anything with the lib dir. Since we haven't started
# logging yet, cron should capture this error and email the user.
@@ -6116,47 +6109,12 @@ sub run_service {
}
}
# Setup the interval meta file which contains the UTC timestamp
# of the last time this service ran ($prev_ts). If there's no
# previous ts, then this is the first run of this service, so
# since we don't know when the service started, and it probably
# didn't start on an interval boundary, we throw away the results
# have the current ts ($curr_ts) which is on an interval boundary
# (because services are ran via cron) and wait until the next
# time we're ran, then we'll have collected from one interval
# boundary (this one) to another (the next one).
my $meta_file = "$lib_dir/meta/" . $service->name . ".interval";
my $metadata = {
start_ts => undef,
end_ts => $curr_ts,
tasks => [],
};
if ( $use_spool && !$service->run_once ) {
my $prev_ts;
if ( -f $meta_file ) {
$prev_ts = slurp($meta_file);
chomp($prev_ts) if $prev_ts;
if ( !$prev_ts ) {
_info($meta_file, ' exists but is empty; waiting until next '
. 'interval to collect a complete sample');
}
else {
_info("Interval: $prev_ts to $curr_ts");
}
}
else {
_info('First run, waiting until next interval to collect '
. 'a complete sample');
}
$metadata->{start_ts} = $prev_ts;
}
# Run the tasks, spool any data.
my @output_files;
my $data_file = $service->name . ($service->run_once ? '' : $suffix);
my $data_file = $prefix . '.' . $service->name;
my $tmp_data_file = "$tmp_dir/$data_file";
my $have_data_file = 0;
my $taskno = 0;
my $metadata = {};
TASK:
foreach my $task ( @$tasks ) {
PTDEBUG && _d("Task $taskno:", $task->name);
@@ -6164,26 +6122,26 @@ sub run_service {
# Set up the output file, i.e. where this run puts its results.
# Runs can access each other's output files. E.g. run0 may
# write to fileX, then subsequent tasks can access that file
# with the special var __RUN_N_OUTPUT__ where N=0.
# with the special var __RUN_N_OUTPUT__ where N=0. Output files
# have this format: (prefix.)service.type(.n), where prefix is
# an optional unique ID for this run (usually a Unix ts); service
# is the service name; type is "data", "tmp", "meta", etc.; and
# n is an optional ID or instance of the type. The .data is the
# only file required: it's the data sent by send_data().
my $output_file;
my $output = $task->output || '';
if ( $output eq 'spool' ) {
if ( $have_data_file++ ) {
die "Invalid service: two tasks have output=spool: "
. Dumper($service);
}
# TODO: save snapshot of meta/service.* before running spool task
$output_file = $tmp_data_file;
push @output_files, $output_file;
}
elsif ( $output eq 'tmp' ) {
my ($fh, $file) = tempfile();
close $fh;
my $file = "$tmp_dir/$prefix." . $service->name . ".tmp.$taskno";
$output_file = $file;
push @output_files, $file;
}
elsif ( $output =~ m/^meta:(\S+)/ ) {
$output_file = "$lib_dir/meta/" . $service->name . ".$1";
my $attrib = $1;
$output_file = "$lib_dir/meta/" . $service->name . ".meta.$attrib";
push @output_files, undef;
}
else {
@@ -6207,7 +6165,6 @@ sub run_service {
# special vars like __RUN_N_OUTPUT__, __TMPDIR__, etc.
my $cmd = join(' ',
$task->program,
($task->options || ''),
'>',
$output_file,
);
@@ -6216,6 +6173,8 @@ sub run_service {
output_files => \@output_files,
service => $service->name, # __SERVICE__
lib_dir => $lib_dir, # __LIB__
meta_dir => "$lib_dir/meta", # __META__
stage_dir => $tmp_dir, # __STAGE__
);
_info("Task $taskno command: $cmd");
@@ -6233,6 +6192,11 @@ sub run_service {
run_time => sprintf('%.6f', $t1 - $t0),
exit_status => $cmd_exit_status,
};
if ( $cmd_exit_status != 0 ) {
_info($task->name . ' exit status not zero, stopping');
last TASK;
}
}
else {
_warn('Invalid Task resource:', Dumper($task));
@@ -6243,14 +6207,10 @@ sub run_service {
}
# Move the spool file from --spool/.tmp/ to --spool/<service>/
# if 1) the service spools data and 2) there is data and 3) this
# is not the first run of the service.
# if 1) the service spools data and 2) there is data.
my $file_size = -s $tmp_data_file;
_info("$tmp_data_file size: " . ($file_size || 0) . " bytes");
if ( $use_spool
&& $file_size
&& ($metadata->{start_ts} || $service->run_once) )
{
if ( $use_spool && $file_size ) {
# Save metadata about this sample _first_, because --send-data looks
# for the data file first, then for a corresponding .meta file. If
# we write the data file first, then we create a race condition: while
@@ -6259,6 +6219,12 @@ sub run_service {
# file first guarantees that if --send-data sees a data file, the
# .meta already exists. (And there's no race condition on writing
# the data file because we use a quasi-atomic system mv.)
read_metadata(
service => $service->name,
prefix => $prefix,
metadata => $metadata,
stage_dir => $tmp_dir,
);
write_to_file(
data => as_json($metadata, json => $json),
file => "$data_dir/$data_file.meta",
@@ -6269,7 +6235,7 @@ sub run_service {
# simply move the inode, _not_ copy the file. A system mv on
# the same filesystem is pretty much guaranteed to do an optimized,
# i.e. quasi-atomic, move.
my $cmd = "mv $tmp_data_file $data_dir/$data_file";
my $cmd = "mv $tmp_data_file $data_dir/$data_file.data";
_info($cmd);
system($cmd);
my $cmd_exit_status = $CHILD_ERROR >> 8;
@@ -6277,16 +6243,6 @@ sub run_service {
$exit_status |= $cmd_exit_status;
}
# Always update the meta file if spooling, even if there was no data
# or this is the first run of the service (in which case this is the
# first interval boundary for the service).
if ( $use_spool && !$service->run_once ) {
write_to_file(
data => "$curr_ts\n",
file => $meta_file,
);
}
# Remove tmp output files.
foreach my $file ( @output_files ) {
next unless defined $file && -f $file;
@@ -6294,7 +6250,7 @@ sub run_service {
or _warn("Error removing $file: $OS_ERROR");
}
return;
return $exit_status; # returning global var for testing
}
sub load_service {
@@ -6326,11 +6282,15 @@ sub replace_special_vars {
output_files
service
lib_dir
meta_dir
stage_dir
)) or die;
my $cmd = $args{cmd};
my $output_files = $args{output_files};
my $service = $args{service};
my $lib_dir = $args{lib_dir};
my $meta_dir = $args{meta_dir};
my $stage_dir = $args{stage_dir};
my $ts_micro = sprintf('%.6f', time);
@@ -6347,6 +6307,8 @@ sub replace_special_vars {
}
$word =~ s/__TS_MICRO__/$ts_micro/g;
$word =~ s/__LIB__/$lib_dir/g;
$word =~ s/__META__/$meta_dir/g;
$word =~ s/__STAGE__/$stage_dir/g;
$word =~ s/__SERVICE__/$service/g;
$word;
}
@@ -6366,9 +6328,11 @@ sub init_spool_dir {
# Optional args
my $service = $args{service};
my $quiet = $args{quiet};
if ( !-d $spool_dir ) {
_info("$spool_dir does not exist, creating");
_info("$spool_dir does not exist, creating")
unless $quiet;
_safe_mkdir($spool_dir);
}
elsif ( !-w $spool_dir ) {
@@ -6379,7 +6343,8 @@ sub init_spool_dir {
next unless $subdir; # service may be undef
my $dir = "$spool_dir/$subdir";
if ( ! -d $dir ) {
_info("$dir does not exist, creating");
_info("$dir does not exist, creating")
unless $quiet;
_safe_mkdir($dir);
}
elsif ( !-w $dir ) {
@@ -6393,6 +6358,33 @@ sub init_spool_dir {
return $data_dir, $tmp_dir;
}
sub read_metadata {
my (%args) = @_;
have_required_args(\%args, qw(
service
prefix
metadata
stage_dir
)) or die;
my $service = $args{service};
my $prefix = $args{prefix};
my $metadata = $args{metadata};
my $stage_dir = $args{stage_dir};
# Example filename: 123456.query-history.meta.stop_offset
foreach my $file ( glob "$stage_dir/$prefix.$service.meta.*" ) {
PTDEBUG && _d('metadata file:', $file);
my ($attrib) = $file =~ m/\.meta\.(\S+)$/;
my $value = slurp($file);
chomp($value) if $value;
PTDEBUG && _d('metadata', $attrib, '=', $value);
$metadata->{$attrib} = $value;
}
return;
}
# ######################## #
# --send-data process subs #
# ######################## #
@@ -6487,12 +6479,10 @@ sub send_data {
# Only iterator over data files because run_service() writes
# them last to avoid a race condition with us. See the code
# comment about writing the .meta file first in run_service().
next if $file =~ m/\.meta$/;
next unless $file =~ m/\.data$/;
my $data_file = "$service_dir/$file";
next unless -f $data_file;
my $meta_file = "$service_dir/$file.meta";
(my $meta_file = $data_file) =~ s/\.data/.meta/;
eval {
# Send the file as-is. The --run-service process should

View File

@@ -34,6 +34,8 @@ sub test_replace {
output_files => \@output_files,
service => 'service-name',
lib_dir => '/var/lib/pt-agent',
meta_dir => '/var/lib/pt-agent/meta',
stage_dir => '/var/spool/.tmp',
);
is(

View File

@@ -36,12 +36,16 @@ my $sample = "t/pt-agent/samples";
# Create fake spool and lib dirs. Service-related subs in pt-agent
# automatically add "/services" to the lib dir, but the spool dir is
# used as-is.
my $tmpdir = tempdir("/tmp/pt-agent.$PID.XXXXXX", CLEANUP => 0);
my $tmpdir = tempdir("/tmp/pt-agent.$PID.XXXXXX", CLEANUP => 1);
output(
sub { pt_agent::init_lib_dir(lib_dir => $tmpdir) }
);
my $spool_dir = "$tmpdir/spool";
my $json = JSON->new->canonical([1])->pretty;
$json->allow_blessed([]);
$json->convert_blessed([]);
sub write_svc_files {
my (%args) = @_;
have_required_args(\%args, qw(
@@ -52,8 +56,8 @@ sub write_svc_files {
my $output = output(
sub {
pt_agent::write_services(
services => $services,
lib_dir => $tmpdir,
sorted_services => { added => $services },
lib_dir => $tmpdir,
);
},
stderr => 1,
@@ -68,8 +72,7 @@ sub write_svc_files {
my $run0 = Percona::WebAPI::Resource::Task->new(
name => 'query-history',
number => '0',
program => "$trunk/bin/pt-query-digest",
options => "--output json $trunk/t/lib/samples/slowlogs/slow008.txt",
program => "$trunk/bin/pt-query-digest --output json $trunk/t/lib/samples/slowlogs/slow008.txt",
output => 'spool',
);
@@ -93,26 +96,25 @@ my $output = output(
lib_dir => $tmpdir,
spool_dir => $spool_dir,
Cxn => '',
suffix => '', # optional, for testing
prefix => '1', # optional, for testing
json => $json, # optional, for testing
);
},
stderr => 1,
debug =>1,
);
ok(
no_diff(
"cat $tmpdir/spool/query-history/query-history",
"cat $tmpdir/spool/query-history/1.query-history.data",
"$sample/query-history/data001.json",
),
"1 run: spool data (query-history/data001.json)"
) or diag(`ls -l $tmpdir/spool/`);
) or diag(`ls -l $tmpdir/spool/query-history/`, `cat $tmpdir/logs/query-history.run`);
chomp(my $n_files = `ls -1 $spool_dir | wc -l | awk '{print \$1}'`);
chomp(my $n_files = `ls -1 $spool_dir/query-history/*.data | wc -l | awk '{print \$1}'`);
is(
$n_files,
1,
"1 run: only wrote spool data (query-history/data001.json)"
"1 run: only wrote spool data"
) or diag(`ls -l $spool_dir`);
is(
@@ -120,7 +122,12 @@ is(
0,
"1 run: exit 0"
);
exit;
ok(
-f "$tmpdir/spool/query-history/1.query-history.meta",
"1 run: .meta file exists"
);
# #############################################################################
# Service with two task, both using a program.
# #############################################################################
@@ -135,16 +142,14 @@ diag(`rm -rf $tmpdir/spool/* $tmpdir/services/*`);
$run0 = Percona::WebAPI::Resource::Task->new(
name => 'cat-slow-log',
number => '0',
program => "cat",
options => "$trunk/t/lib/samples/slowlogs/slow008.txt",
program => "cat $trunk/t/lib/samples/slowlogs/slow008.txt",
output => 'tmp',
);
my $run1 = Percona::WebAPI::Resource::Task->new(
name => 'query-history',
number => '1',
program => "$trunk/bin/pt-query-digest",
options => "--output json __RUN_0_OUTPUT__",
program => "$trunk/bin/pt-query-digest --output json __RUN_0_OUTPUT__",
output => 'spool',
);
@@ -167,21 +172,21 @@ $output = output(
spool_dir => $spool_dir,
lib_dir => $tmpdir,
Cxn => '',
suffix => '', # optional, for testing
json => $json, # optional, for testing
prefix => '2', # optional, for testing
);
},
stderr => 1,
);
ok(
no_diff(
"cat $tmpdir/spool/query-history",
"cat $tmpdir/spool/query-history/2.query-history.data",
"$sample/query-history/data001.json",
),
"2 runs: spool data"
"2 runs: spool data (query-history/data001.json)"
);
chomp($n_files = `ls -1 $spool_dir | wc -l | awk '{print \$1}'`);
chomp($n_files = `ls -1 $spool_dir/query-history/*.data | wc -l | awk '{print \$1}'`);
is(
$n_files,
1,
@@ -194,13 +199,10 @@ is(
"2 runs: exit 0"
);
# Get the temp file created by pt-agent by matching it from
# the output line like:
# 2013-01-08T13:14:23.627040 INFO Run 0: cat /Users/daniel/p/pt-agent/t/lib/samples/slowlogs/slow008.txt > /var/folders/To/ToaPSttnFbqvgRqcHPY7qk+++TI/-Tmp-/q1EnzzlDoL
my ($tmpfile) = $output =~ m/cat \S+ > (\S+)/;
ok(
! -f $tmpfile,
my @tmp_files = glob "$tmpdir/spool/.tmp/*";
is_deeply(
\@tmp_files,
[],
"2 runs: temp file removed"
);
@@ -246,8 +248,7 @@ SKIP: {
my $task10 = Percona::WebAPI::Resource::Task->new(
name => 'query-history',
number => '1',
program => "$trunk/bin/pt-query-digest",
options => "--output json --type genlog $new_genlog",
program => "$trunk/bin/pt-query-digest --output json --type genlog $new_genlog",
output => 'spool',
);
my $svc1 = Percona::WebAPI::Resource::Service->new(
@@ -300,10 +301,10 @@ SKIP: {
spool_dir => $spool_dir,
lib_dir => $tmpdir,
Cxn => $cxn,
suffix => '', # optional, for testing
json => $json, # optional, for testing
prefix => '3', # optional, for testing
);
},
stderr => 1,
);
my (undef, $genlog) = $dbh->selectrow_array("SHOW VARIABLES LIKE 'general_log_file'");
@@ -328,10 +329,10 @@ SKIP: {
spool_dir => $spool_dir,
lib_dir => $tmpdir,
Cxn => $cxn,
suffix => '', # optional, for testing
json => $json, # optional, for testing
prefix => '4', # optional, for testing
);
},
stderr => 1,
);
`cp $new_genlog $tmpdir/genlog-after`;
@@ -352,10 +353,10 @@ SKIP: {
spool_dir => $spool_dir,
lib_dir => $tmpdir,
Cxn => $cxn,
suffix => '', # optional, for testing
json => $json, # optional, for testing
prefix => '5', # optional, for testing
);
},
stderr => 1,
);
(undef, $genlog) = $dbh->selectrow_array("SHOW VARIABLES LIKE 'general_log_file'");

View File

@@ -5,6 +5,10 @@ Content-Disposition: form-data; name="agent"
"hostname" : "prod1",
"uuid" : "123"
}
--Ym91bmRhcnk
Content-Disposition: form-data; name="meta"
--Ym91bmRhcnk
Content-Disposition: form-data; name="data"

View File

@@ -14,5 +14,6 @@
"output" : "spool",
"program" : "pt-query-digest"
}
]
],
"ts" : "123456"
}

View File

@@ -82,12 +82,17 @@ is_deeply(
# #############################################################################
my $tmpdir = tempdir("/tmp/pt-agent.$PID.XXXXXX", CLEANUP => 1);
mkdir "$tmpdir/query-history"
or die "Cannot mkdir $tmpdir/query-history: $OS_ERROR";
mkdir "$tmpdir/services"
or die "Cannot mkdir $tmpdir/services: $OS_ERROR";
pt_agent::init_lib_dir(
lib_dir => $tmpdir,
quiet => 1,
);
pt_agent::init_spool_dir(
spool_dir => $tmpdir,
service => 'query-history',
quiet => 1,
);
`cp $trunk/$sample/query-history/data001.json $tmpdir/query-history/`;
`cp $trunk/$sample/query-history/data001.json $tmpdir/query-history/1.data001.data`;
`cp $trunk/$sample/service001 $tmpdir/services/query-history`;
$ua->{responses}->{post} = [
@@ -99,15 +104,16 @@ $ua->{responses}->{post} = [
my $output = output(
sub {
pt_agent::send_data(
client => $client,
agent => $agent,
api_key => '123',
service => 'query-history',
lib_dir => $tmpdir,
spool_dir => $tmpdir,
json => $json, # optional, for testing
# optional, for testing:
client => $client,
agent => $agent,
json => $json,
),
},
stderr => 1,
);
is(
@@ -134,7 +140,7 @@ ok(
) or diag(Dumper($client->ua->{content}->{post}));
ok(
!-f "$tmpdir/query-history/data001.json",
!-f "$tmpdir/query-history/1.data001.data",
"Removed data file after sending successfully"
);