Add new output types: join, store:output, and store:key_value_tuple. Save run_time for query tasks. Call run_services_once() before run_services(). Change meta.ts to meta.data_ts. Call replace_special_vars() for queries.

This commit is contained in:
Daniel Nichter
2013-06-12 20:09:36 -07:00
parent e51c7a9c8c
commit c97d3a9fbc

View File

@@ -6304,6 +6304,17 @@ sub get_services {
# start-query-history is ran before query-history is scheduled
# and starts running.
# Run services with the run_once flag. Unlike run_services(),
# this call runs the service directly, whether it's meta or not,
# then it removes it from the services hashref so there's no
# chance of running it again unless it's received again.
run_services_once(
services => $sorted_services->{services},
lib_dir => $lib_dir,
bin_dir => $args{bin_dir}, # optional, for testing
exec_cmd => $args{exec_cmd}, # optional, for testing
);
# Start new services.
my $started_ok = run_services(
action => 'start',
@@ -6322,17 +6333,6 @@ sub get_services {
exec_cmd => $args{exec_cmd}, # optional, for testing
);
# Run services with the run_once flag. Unlike run_services(),
# this call runs the service directly, whether it's meta or not,
# then it removes it from the services hashref so there's no
# chance of running it again unless it's received again.
my $ran_once_ok = run_services_once(
services => $sorted_services->{services},
lib_dir => $lib_dir,
bin_dir => $args{bin_dir}, # optional, for testing
exec_cmd => $args{exec_cmd}, # optional, for testing
);
# Schedule any services with a run_schedule or spool_schedule.
# This must be called last, after write_services() and
# run_services() because, for example, a service schedule
@@ -6341,14 +6341,12 @@ sub get_services {
# ready to go at this point.
if ( scalar @$removed_ok
|| scalar @$started_ok
|| scalar @$restarted_ok
|| scalar @$ran_once_ok )
|| scalar @$restarted_ok )
{
schedule_services(
services => [
@$started_ok,
@$restarted_ok,
@$ran_once_ok,
@{$sorted_services->{unchanged}},
],
lib_dir => $lib_dir,
@@ -6678,7 +6676,6 @@ sub run_services_once {
my $lib_dir = $args{lib_dir};
# Optional args
my $restart = $args{restart};
my $bin_dir = defined $args{bin_dir} ? $args{bin_dir}
: "$FindBin::Bin/";
my $exec_cmd = $args{exec_cmd} || sub { return system(@_) };
@@ -6688,7 +6685,7 @@ sub run_services_once {
my $cmd_fmt = ($env_vars ? "$env_vars " : '')
. $bin_dir . "pt-agent --run-service %s >> $log 2>&1";
my @started_ok;
my @ran_ok;
SERVICE:
foreach my $name ( sort keys %$services ) {
my $service = $services->{$name};
@@ -6696,19 +6693,19 @@ sub run_services_once {
delete $services->{$name};
my $cmd = sprintf $cmd_fmt, "start-$name";
my $cmd = sprintf $cmd_fmt, $name;
$logger->info("Running $name: $cmd");
my $cmd_exit_status = $exec_cmd->($cmd);
if ( $cmd_exit_status != 0 ) {
$logger->warning("Error starting $name, check $log and "
$logger->error("Error running $name, check $log and "
."$lib_dir/logs/$name.run");
next SERVICE;
}
push @started_ok, $service;
push @ran_ok, $service;
$logger->info("Ran $name successfully");
}
return \@started_ok;
return \@ran_ok;
}
# ########################## #
@@ -6856,7 +6853,7 @@ sub run_service {
my $data_file = $prefix . '.' . $service->name . '.data';
my $tmp_data_file = "$tmp_dir/$data_file";
my $taskno = 0;
my $metadata = { ts => $prefix };
my $metadata = { data_ts => $prefix };
TASK:
foreach my $task ( @$tasks ) {
@@ -6871,7 +6868,11 @@ sub run_service {
# is the service name; type is "data", "tmp", "meta", etc.; and
# n is an optional ID or instance of the type. The .data is the
# only file required: it's the data sent by send_data().
my $task_output_file = "$tmp_dir/$prefix." . $service->name . ".$taskno.output";
my $output_file;
my $join_char;
my $store = {};
my ($store_output, $store_key_value_tuple);
my $output = $task->output || '';
if ( $output eq 'spool' ) {
$output_file = $tmp_data_file;
@@ -6887,6 +6888,24 @@ sub run_service {
$output_file = "$lib_dir/meta/" . $service->name . ".meta.$attrib";
push @output_files, $output_file;
}
elsif ( $output =~ m/^join:(.)$/ ) {
# TODO
$join_char = $1;
$output_file = $task_output_file;
push @output_files, $output_file;
}
elsif ( $output eq 'store:key_value_tuple' ) {
$store_key_value_tuple = 1;
$output_file = '/dev/null';
push @output_files, undef;
}
elsif ( $output eq 'store:output' ) {
# TODO
$store_output = 1;
$output_file = $task_output_file;
push @output_files, $output_file;
}
else {
$output_file = '/dev/null';
push @output_files, undef;
@@ -6894,14 +6913,60 @@ sub run_service {
PTDEBUG && _d("Task $taskno output:", Dumper(\@output_files));
if ( my $query = $task->query ) {
$query = replace_special_vars(
cmd => $query,
output_files => \@output_files, # __RUN_n_OUTPUT__
service => $service->name, # __SERVICE__
lib_dir => $lib_dir, # __LIB__
meta_dir => "$lib_dir/meta", # __META__
stage_dir => $tmp_dir, # __STAGE__
store => $store, # __STORE_key__
ts => $prefix,
);
$logger->info("Task $taskno query: $query");
my $rows;
my $t0 = time;
eval {
$cxn->dbh->do($query);
if ( $join_char || $store_key_value_tuple ) {
$rows = $cxn->dbh->selectall_arrayref($query);
}
else {
$cxn->dbh->do($query);
}
};
if ( $EVAL_ERROR ) {
$logger->warning("Error executing $query: $EVAL_ERROR");
$logger->error("Error executing $query: $EVAL_ERROR");
last TASK;
}
if ( $rows ) {
$logger->info('Query returned ' . scalar @$rows . ' rows');
if ( $join_char ) {
my $fh;
if ( !open($fh, '>', $output_file) ) {
$logger->error("Cannot open $output_file: $OS_ERROR");
last TASK;
}
foreach my $row ( @$rows ) {
print { $fh } join($join_char, map { defined $_ ? $_ : 'NULL' } @$row), "\n"
or $logger->error("Cannot write to $output_file: $OS_ERROR");
}
close $fh
or $logger->warning("Cannot close $output_file: $OS_ERROR");
}
elsif ( $store_key_value_tuple ) {
foreach my $row ( @$rows ) {
$store->{$row->[0]} = defined $row->[1] ? $row->[1] : 'NULL';
}
}
}
my $t1 = time;
push @{$metadata->{tasks}}, {
start_ts => ts($t0, 1),
end_ts => ts($t1, 1),
run_time => sprintf('%.6f', $t1 - $t0),
};
}
elsif ( my $program = $task->program ) {
# Create the full command line to execute, replacing any
@@ -6913,11 +6978,12 @@ sub run_service {
);
$cmd = replace_special_vars(
cmd => $cmd,
output_files => \@output_files,
output_files => \@output_files, # __RUN_n_OUTPUT__
service => $service->name, # __SERVICE__
lib_dir => $lib_dir, # __LIB__
meta_dir => "$lib_dir/meta", # __META__
stage_dir => $tmp_dir, # __STAGE__
store => $store, # __STORE_key__
ts => $prefix,
);
$logger->info("Task $taskno command: $cmd");
@@ -6925,6 +6991,9 @@ sub run_service {
# Execute this run.
my $t0 = time;
system($cmd);
if ( $store_output ) {
$store->{$taskno} = slurp($output_file);
}
my $t1 = time;
my $cmd_exit_status = $CHILD_ERROR >> 8;
$logger->info("Task $taskno: exit $cmd_exit_status");
@@ -7036,6 +7105,7 @@ sub replace_special_vars {
lib_dir
meta_dir
stage_dir
store
ts
)) or die;
my $cmd = $args{cmd};
@@ -7044,6 +7114,7 @@ sub replace_special_vars {
my $lib_dir = $args{lib_dir};
my $meta_dir = $args{meta_dir};
my $stage_dir = $args{stage_dir};
my $store = $args{store};
my $ts = $args{ts};
my $new_cmd = join(' ',
@@ -7057,6 +7128,9 @@ sub replace_special_vars {
die "Run$runno has no output for $word to access.\n";
}
}
if ( my ($key) = $word =~ m/__STORE_([\w.-]+)__/ ) {
$word = $store->{$key};
}
$word =~ s/__TS__/$ts/g;
$word =~ s/__LIB__/$lib_dir/g;
$word =~ s/__META__/$meta_dir/g;