Implement and test running queries for tasks.

This commit is contained in:
Daniel Nichter
2013-03-19 18:17:24 -06:00
parent 846daa6c31
commit cf0342c69e
3 changed files with 235 additions and 27 deletions
+61 -22
View File
@@ -3600,7 +3600,7 @@ sub new {
set => $args{set},
NAME_lc => defined($args{NAME_lc}) ? $args{NAME_lc} : 1,
dbh_set => 0,
ask_pass => $o->get('ask-pass'),
ask_pass => $args{ask_pass},
DSNParser => $dp,
is_cluster_node => undef,
parent => $args{parent},
@@ -4703,10 +4703,16 @@ sub main {
# This runs locally and offline, doesn't need a web API connection.
# ########################################################################
if ( my $service = $o->get('run-service') ) {
my $cxn = Cxn->new(
dsn_string => '',
OptionParser => $o,
DSNParser => $dp,
);
$exit_status = run_service(
service => $service,
spool_dir => $o->get('spool'),
lib_dir => $o->get('lib'),
Cxn => $cxn,
);
_info("Done running $service, exit $exit_status");
exit $exit_status;
@@ -5366,10 +5372,12 @@ sub run_service {
service
spool_dir
lib_dir
Cxn
)) or die;
my $service = $args{service};
my $spool_dir = $args{spool_dir};
my $lib_dir = $args{lib_dir};
my $cxn = $args{Cxn};
# TODO: where should this output go?
_info("Running $service service");
@@ -5378,12 +5386,25 @@ sub run_service {
service => $service,
lib_dir => $lib_dir,
);
my $tasks = $service->tasks;
# Take a quick look through all the tasks to see if any
# will require a MySQL connection. If so, connect now.
if ( grep { $_->query } @$tasks ) {
eval {
$cxn->connect();
};
if ( $EVAL_ERROR ) {
_warn("Cannot connect to MySQL: $EVAL_ERROR");
return 1;
}
}
my @output_files;
my $final_exit_status = 0;
my $spool_file = "$spool_dir/" . $service->name;
my $tasks = $service->tasks;
my $taskno = 0;
TASK:
foreach my $task ( @$tasks ) {
PTDEBUG && _d("Task $taskno:", $task->name);
@@ -5410,33 +5431,51 @@ sub run_service {
}
PTDEBUG && _d("Task $taskno output:", Dumper(\@output_files));
# Create the full command line to execute, replacing any
# special vars like __RUN_N_OUTPUT__, __TMPDIR__, etc.
# TODO: handle query tasks
my $cmd = join(' ',
$task->program,
$task->options,
'>',
$output_file,
);
$cmd = replace_special_vars(
cmd => $cmd,
service => $service,
output_files => \@output_files,
);
_info("Task $taskno command: $cmd");
if ( my $query = $task->query ) {
_info("Task $taskno query: $query");
eval {
$cxn->dbh->do($query);
};
if ( $EVAL_ERROR ) {
_warn("Error executing $query: $EVAL_ERROR");
$final_exit_status |= 1;
last TASK;
}
}
elsif ( my $program = $task->program ) {
# Create the full command line to execute, replacing any
# special vars like __RUN_N_OUTPUT__, __TMPDIR__, etc.
my $cmd = join(' ',
$task->program,
$task->options,
'>',
$output_file,
);
$cmd = replace_special_vars(
cmd => $cmd,
service => $service,
output_files => \@output_files,
);
_info("Task $taskno command: $cmd");
# Execute this run.
system($cmd);
my $exit_status = $CHILD_ERROR >> 8;
_info("Run $taskno: exit $exit_status");
# Execute this run.
system($cmd);
my $exit_status = $CHILD_ERROR >> 8;
$final_exit_status |= $exit_status;
_info("Run $taskno: exit $exit_status");
}
else {
_warn('Invalid Task resource:', Dumper($task));
$final_exit_status |= 1;
last TASK;
}
$final_exit_status |= $exit_status;
$taskno++;
}
# Remove temp output files.
foreach my $file ( @output_files ) {
next unless defined $file;
next if $file eq $spool_file;
unlink $file
or _warn("Error removing $file: $OS_ERROR");