Implement and test --check-spool. Make Mock/UserAgent save put and post data in an array. Make Percona/WebAPI/Client accept ready-made text resources, for multi-part resources.

This commit is contained in:
Daniel Nichter
2013-01-08 18:49:42 -07:00
parent d68301f293
commit 526437632c
6 changed files with 435 additions and 108 deletions

View File

@@ -952,12 +952,17 @@ sub _set {
my $res = $args{resources};
my $url = $args{url};
my $content;
my $content = '';
if ( ref($res) eq 'ARRAY' ) {
PTDEBUG && _d('List of resources');
$content = '[' . join(",\n", map { as_json($_) } @$res) . ']';
}
elsif ( -f $res ) {
PTDEBUG && _d('Reading content from file', $res);
elsif ( ref($res) ) {
PTDEBUG && _d('Resource object');
$content = as_json($res);
}
elsif ( $res !~ m/\n/ && -f $res ) {
PTDEBUG && _d('List of resources in file', $res);
$content = '[';
my $data = do {
local $INPUT_RECORD_SEPARATOR = undef;
@@ -969,7 +974,8 @@ sub _set {
$content .= $data;
}
else {
$content = as_json($res);
PTDEBUG && _d('Resource text');
$content = $res;
}
eval {
@@ -4363,6 +4369,7 @@ use POSIX qw(signal_h);
use Time::HiRes qw(sleep time);
use JSON qw(decode_json);
use File::Temp qw(tempfile);
use File::Path;
use Percona::Toolkit;
use Percona::WebAPI::Client;
@@ -4376,6 +4383,7 @@ use Percona::WebAPI::Util;
Percona::Toolkit->import(qw(_d Dumper have_required_args));
Percona::WebAPI::Util->import(qw(resource_diff));
Percona::WebAPI::Representation->import(qw(as_json as_config));
Transformers->import(qw(ts));
use sigtrap 'handler', \&sig_int, 'normal-signals';
@@ -4407,19 +4415,79 @@ sub main {
$o->usage_or_errors();
# ########################################################################
# Check the API key.
# Check the API key and agent ID.
# ########################################################################
my $api_key = $o->get('api-key');
if ( !$api_key ) {
_err("No API key was found or specified. pt-agent requires a "
. "Percona Web Services API key to run. Put your API key "
. "Percona Web Services API key. Put your API key "
. "in a --config file or specify it with --api-key.");
}
my $agent_id = $o->get('agent-id');
if ( ($o->get('check-spool') || $o->get('run-service')) && !$agent_id ) {
_err("No agent ID was found or specified. --check-spool and "
. "--run-service require an agent ID. Run pt-agent without these "
. "options to create and configure the agent, then try again.");
}
# ########################################################################
# Check the config file.
# TODO: only the main proc needs write access
# --run-service
# This runs locally and offline, doesn't need a web API connection.
# ########################################################################
if ( my $service = $o->get('run-service') ) {
$exit_status = run_service(
service => $service,
spool_dir => $o->get('spool'),
lib_dir => $o->get('lib'),
);
_info("Done running $service, exit $exit_status");
exit $exit_status;
}
# ########################################################################
# Connect to the Percona web API.
# ########################################################################
my ($client, $agent) = connect_to_percona(
api_key => $api_key,
agent_id => $agent_id, # optional
);
# ########################################################################
# --check-spool
# ########################################################################
if ( $o->get('check-spool') ) {
# TODO: rewrite Daemon to have args passed in so we can do
# a PID file check for spool procs. Or implement file locking.
check_spool(
client => $client,
agent => $agent,
spool_dir => $o->get('spool'),
);
_info("Done checking spool, exit $exit_status");
exit $exit_status;
}
# ########################################################################
# This is the main pt-agent daemon, a long-running and resilient
# process. Only internal errors should cause it to stop. Else,
# external errors, like Percona web API not responding, should be
# retried forever.
# ########################################################################
# Daemonize first so all output goes to the --log.
my $daemon;
if ( $o->get('daemonize') ) {
$daemon = new Daemon(o=>$o);
$daemon->daemonize();
PTDEBUG && _d('I am a daemon now');
}
elsif ( $o->get('pid') ) {
$daemon = new Daemon(o=>$o);
$daemon->make_PID_file();
}
# Check and init the config file.
my $config_file = get_config_file();
_info("Config file: $config_file");
if ( -f $config_file ) {
@@ -4435,95 +4503,32 @@ sub main {
};
if ( $EVAL_ERROR ) {
chomp $EVAL_ERROR;
_err("$EVAL_ERROR. pt-agent requires write access to "
. "$config_file to run.");
_err($EVAL_ERROR
. "\npt-agent requires write access to $config_file.");
}
}
# ########################################################################
# Check the lib dir.
# ########################################################################
# Wait time between checking for new config and services.
# Use the tool's built-in default until a config is gotten,
# then config->{check-interval} will be pass in.
my $check_interval = $o->get('check-interval');
my $check_wait = sub {
my ($t) = @_;
return unless $oktorun;
$t ||= $check_interval;
_info("Sleeping $t seconds");
sleep $t;
};
# ########################################################################
# Run pt-agent.
# ########################################################################
my $daemon;
if ( my $service = $o->get('run-service') ) {
run_service(
service => $service,
spool_dir => $o->get('spool'),
lib_dir => $o->get('lib'),
);
}
elsif ( $o->get('check-spool') ) {
check_spool(
api_key => $api_key,
spool_dir => $o->get('spool'),
lib_dir => $o->get('lib'),
);
}
else {
# This is the main pt-agent daemon, a long-running and resilient
# process. Only internal errors should cause it to stop. Else,
# external errors, like Percona web API not responding, should be
# retried forever.
if ( $o->get('daemonize') ) {
$daemon = new Daemon(o=>$o);
$daemon->daemonize();
PTDEBUG && _d('I am a daemon now');
}
elsif ( $o->get('pid') ) {
$daemon = new Daemon(o=>$o);
$daemon->make_PID_file();
}
# During initial connection and agent init, wait less time
# than --check-interval between errors.
# TODO: make user-configurable? --reconnect-interval?
my $init_interval = 120;
my $init_wait = sub {
return unless $oktorun;
_info("Sleeping $init_interval seconds");
sleep $init_interval;
};
# Get a connected Percona Web API client.
my $client = get_api_client(
api_key => $api_key,
tries => undef,
interval => $init_wait,
);
# Start or create the agent.
my $agent = init_agent(
client => $client,
interval => $init_wait,
agent_id => $o->get('agent-id'), # optional
);
# Wait time between checking for new config and services.
# Use the tool's built-in default until a config is gotten,
# then config->{check-interval} will be pass in.
my $check_interval = $o->get('check-interval');
my $check_wait = sub {
my ($t) = @_;
return unless $oktorun;
$t ||= $check_interval;
_info("Sleeping $t seconds");
sleep $t;
};
# Run the agent's main loop which doesn't return until the service
# is stopped, killed, or has an internal bug.
run_agent(
agent => $agent,
client => $client,
interval => $check_wait,
config_file => $config_file,
lib_dir => $o->get('lib'),
);
}
# Run the agent's main loop which doesn't return until the service
# is stopped, killed, or has an internal bug.
run_agent(
agent => $agent,
client => $client,
interval => $check_wait,
config_file => $config_file,
lib_dir => $o->get('lib'),
);
_info("pt-agent exit $exit_status, oktorun $oktorun");
@@ -4538,6 +4543,47 @@ sub main {
# Percona Web API subs for agent and spool processes #
# ################################################## #
# Wrapper for code common to main agent and --check-spool process:
# connect to the Percona web API by getting a client and an Agent.
sub connect_to_percona {
my (%args) = @_;
have_required_args(\%args, qw(
api_key
)) or die;
my $api_key = $args{api_key};
my $interval = $args{interval};
# Optional args
my $agent_id = $args{agent_id};
# During initial connection and agent init, wait less time
# than --check-interval between errors.
# TODO: make user-configurable? --reconnect-interval?
my $init_interval = 120;
my $init_wait = sub {
return unless $oktorun;
_info("Sleeping $init_interval seconds");
sleep $init_interval;
};
# Get a connected Percona Web API client.
my $client = get_api_client(
api_key => $api_key,
tries => undef,
interval => $init_wait,
);
# Start or create the agent.
my $agent = init_agent(
client => $client,
interval => $init_wait,
agent_id => $agent_id, # optional
);
return $client, $agent;
}
# Create and connect a Percona Web API client.
sub get_api_client {
my (%args) = @_;
@@ -4573,10 +4619,6 @@ sub get_api_client {
return $client;
}
# ################################ #
# Agent (main daemon) process subs #
# ################################ #
# Initialize the agent, i.e. create and return an Agent resource.
# If there's an agent_id, then its updated (PUT), else a new agent
# is created (POST). Doesn't return until successful.
@@ -4597,6 +4639,7 @@ sub init_agent {
_info('Initializing agent');
# Do a version-check every time the agent starts. If versions
# have changed, this can affect how services are implemented.
$versions ||= get_versions();
@@ -4640,6 +4683,10 @@ sub init_agent {
return $agent;
}
# ################################ #
# Agent (main daemon) process subs #
# ################################ #
# Run the agent, i.e. exec the main loop to check/update the config
# and services. Doesn't return until service stopped or killed.
sub run_agent {
@@ -4705,6 +4752,8 @@ sub run_agent {
_warn($EVAL_ERROR);
}
# TODO: need to schedule a pt-agent --check-spool process.
# Get services only if there's a current, running config.
# Without one, we won't know how to implement services.
if ( $config ) {
@@ -4787,7 +4836,7 @@ sub write_config {
print { $fh } $api_key, "\n"
or die "Error writing to $file: $OS_ERROR";
}
print { $fh } Percona::WebAPI::Representation::as_config($config)
print { $fh } as_config($config)
or die "Error writing to $file: $OS_ERROR";
close $fh
or die "Error closing $file: $OS_ERROR";
@@ -4818,7 +4867,7 @@ sub write_services {
my $action = -f $file ? 'Updated' : 'Created';
open my $fh, '>', $file
or die "Error opening $file: $OS_ERROR";
print { $fh } Percona::WebAPI::Representation::as_json($service)
print { $fh } as_json($service)
or die "Error writing to $file: $OS_ERROR";
close $fh
or die "Error closing $file: $OS_ERROR";
@@ -4828,7 +4877,7 @@ sub write_services {
# Remove old services: one's that still exisit but weren't
# writen ^, so they're no longer implemented.
opendir my $dh, $lib_dir
opendir(my $dh, $lib_dir)
or die "Error opening $lib_dir: $OS_ERROR";
while ( my $file = readdir($dh) ) {
next if -d $file;
@@ -5053,10 +5102,137 @@ sub replace_special_vars {
# Spool process subs #
# ################## #
# Send every file or directory in each service's directory in --spool/.
# E.g. --spool/query-monitor should contain files with pt-query-digest
# output. The per-service dirs are created in run_service().
sub check_spool {
my (%args) = @_;
have_required_args(\%args, qw(
client
agent
spool_dir
)) or die;
my $client = $args{client};
my $agent = $args{agent};
my $spool_dir = $args{spool_dir};
# Iterate through the service dirs in --spool/.
chdir $spool_dir
or die "Error changing dir to $spool_dir: $OS_ERROR";
opendir(my $spool_dh, $spool_dir)
or die "Error opening $spool_dir: $OS_ERROR";
_info("Checking spool directory $spool_dir");
SERVICE:
while ( my $service_dir = readdir($spool_dh) ) {
next unless -d $service_dir && $service_dir !~ m/^\./;
# Need a link for the service to know where to send the data.
# TODO: should pt-agent rm the old service dir?
if ( !$client->links->{$service_dir} ) {
_warn("Ignoring $service_dir because there is no link for "
. "the service. If this agent no longer implements "
. "the service, then remove $spool_dir/$service_dir/.");
next SERVICE;
}
# Iterate through the data files or dirs in this service's dir.
opendir(my $service_dh, $service_dir);
if ( !$service_dh ) {
chomp $EVAL_ERROR;
_warn("Error opening $service_dir: $OS_ERROR");
next SERVICE;
}
DATA:
while ( my $file = readdir($service_dh) ) {
next unless -f "$service_dir/$file";
$file = "$service_dir/$file";
# Send the data to Percona.
eval {
if ( -d $file ) {
# TODO
}
else {
# The file is a file, yay. Just send it as-is.
send_file(
client => $client,
agent => $agent,
file => $file,
url => $client->links->{$service_dir},
);
}
};
if ( $EVAL_ERROR ) {
chomp $EVAL_ERROR;
_warn("Failed to send $file: $EVAL_ERROR");
next DATA;
}
# Remove the data if sent successfully.
eval {
if ( -d $file ) {
# TODO: rmtree
}
else {
unlink $file or die $OS_ERROR;
}
};
if ( $EVAL_ERROR ) {
chomp $EVAL_ERROR;
_warn("Sent $file but failed to remove it: $EVAL_ERROR");
last SERVICE;
}
_info("Sent and removed $file");
} # DATA
closedir $service_dh
or warn "Error closing $service_dir: $OS_ERROR";
} # SERVICE
closedir $spool_dh
or warn "Error closeing $spool_dir: $OS_ERROR";
return;
}
sub send_data {
# Send the Agent and file's contents as-is as a multi-part POST.
sub send_file {
my (%args) = @_;
have_required_args(\%args, qw(
client
agent
file
url
)) or die;
my $client = $args{client};
my $agent = $args{agent};
my $file = $args{file};
my $url = $args{url};
_info("Sending $file to $url");
# Create a multi-part resource: first the Agent, so Percona knows
# from whom this data is coming, then the contents of the file as-is.
# We don't know or care about the file's contents, but Percona will.
my $agent_json = as_json($agent);
my $data = slurp($file);
my $boundary = '--Ym91bmRhcnk='; # "boundary" in base64
my $resource = <<CONTENT;
$agent_json
$boundary
$data
CONTENT
chomp($resource); # remove trailing newline
$client->post(
url => $url,
resources => $resource,
);
return;
}
# ################## #

View File

@@ -31,8 +31,8 @@ sub new {
put => [],
},
content => {
post => undef,
put => undef,
post => [],
put => [],
},
};
return bless $self, $class;
@@ -42,7 +42,7 @@ sub request {
my ($self, $req) = @_;
my $type = lc($req->method);
if ( $type eq 'post' || $type eq 'put' ) {
$self->{content}->{$type} = $req->content;
push @{$self->{content}->{$type}}, $req->content;
}
my $r = shift @{$self->{responses}->{$type}};
my $c = $self->{encode}->($r->{content});

View File

@@ -233,12 +233,17 @@ sub _set {
my $res = $args{resources};
my $url = $args{url};
my $content;
my $content = '';
if ( ref($res) eq 'ARRAY' ) {
PTDEBUG && _d('List of resources');
$content = '[' . join(",\n", map { as_json($_) } @$res) . ']';
}
elsif ( -f $res ) {
PTDEBUG && _d('Reading content from file', $res);
elsif ( ref($res) ) {
PTDEBUG && _d('Resource object');
$content = as_json($res);
}
elsif ( $res !~ m/\n/ && -f $res ) {
PTDEBUG && _d('List of resources in file', $res);
$content = '[';
my $data = do {
local $INPUT_RECORD_SEPARATOR = undef;
@@ -250,7 +255,8 @@ sub _set {
$content .= $data;
}
else {
$content = as_json($res);
PTDEBUG && _d('Resource text');
$content = $res;
}
eval {

129
t/pt-agent/check_spool.t Normal file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env perl
BEGIN {
die "The PERCONA_TOOLKIT_BRANCH environment variable is not set.\n"
unless $ENV{PERCONA_TOOLKIT_BRANCH} && -d $ENV{PERCONA_TOOLKIT_BRANCH};
unshift @INC, "$ENV{PERCONA_TOOLKIT_BRANCH}/lib";
};
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Test::More;
use JSON;
use File::Temp qw(tempdir);
use Percona::Test;
use Percona::Test::Mock::UserAgent;
require "$trunk/bin/pt-agent";
Percona::Toolkit->import(qw(Dumper have_required_args));
Percona::WebAPI::Representation->import(qw(as_hashref));
my $sample = "t/pt-agent/samples";
# #############################################################################
# Create mock client and Agent
# #############################################################################
# These aren't the real tests yet: to run_agent(), first we need
# a client and Agent, so create mock ones.
my $json = JSON->new;
$json->allow_blessed([]);
$json->convert_blessed([]);
my $ua = Percona::Test::Mock::UserAgent->new(
encode => sub { my $c = shift; return $json->encode($c || {}) },
);
# Create cilent, get entry links
my $links = {
agents => '/agents',
config => '/agents/1/config',
services => '/agents/1/services',
'query-monitor' => '/query-monitor',
};
$ua->{responses}->{get} = [
{
content => $links,
},
];
my $client = eval {
Percona::WebAPI::Client->new(
api_key => '123',
ua => $ua,
);
};
is(
$EVAL_ERROR,
'',
'Create mock client'
) or die;
my $agent = Percona::WebAPI::Resource::Agent->new(
id => '123',
hostname => 'prod1',
);
is_deeply(
as_hashref($agent),
{
id => '123',
hostname => 'prod1',
},
'Create mock Agent'
) or die;
# #############################################################################
# Test check_spool()
# #############################################################################
my $tmpdir = tempdir("/tmp/pt-agent.$PID.XXXXXX", CLEANUP => 1);
mkdir "$tmpdir/query-monitor"
or die "Cannot mkdir $tmpdir/query-monitor: $OS_ERROR";
`cp $trunk/$sample/query-monitor/data001 $tmpdir/query-monitor`;
$ua->{responses}->{post} = [
{
content => $links,
},
];
my $output = output(
sub {
pt_agent::check_spool(
client => $client,
agent => $agent,
spool_dir => $tmpdir,
),
},
stderr => 1,
);
is(
scalar @{$client->ua->{content}->{post}},
1,
"Only sent 1 resource"
) or diag(Dumper($client->ua->{content}->{post}));
ok(
no_diff(
$client->ua->{content}->{post}->[0] || '',
"$sample/query-monitor/data001.send",
cmd_output => 1,
),
"Sent data file as multi-part resource (query-monitor/data001)"
) or diag(Dumper($client->ua->{content}->{post}));
ok(
!-f "$tmpdir/query-monitor/data001",
"Removed data file after sending successfully"
);
# #############################################################################
# Done.
# #############################################################################
done_testing;

View File

@@ -0,0 +1,7 @@
[
{
query_id: 1,
arg: "select * from t where id = 1",
Query_time: 0.123456,
}
]

View File

@@ -0,0 +1,9 @@
{"id":"123","hostname":"prod1"}
--Ym91bmRhcnk=
[
{
query_id: 1,
arg: "select * from t where id = 1",
Query_time: 0.123456,
}
]