Almost working pt-agent main process. Clean up HTTP::Micro. Add Percona/WebAPI/Util, and some basic Percona/WebAPI/Representation tests.

This commit is contained in:
Daniel Nichter
2012-12-25 16:51:18 -07:00
parent 6f2d543653
commit 9241c27b7c
7 changed files with 643 additions and 443 deletions

View File

@@ -1,4 +1,4 @@
# This program is copyright 2012 Percona Inc.
# This program is copyright 2012-2013 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
@@ -15,23 +15,21 @@
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# HTTPMicro package
# HTTP::Micro package
# ###########################################################################
{
# Package: HTTPMicro
# Package: HTTP::Micro
# A stripped down version of HTTP::Tiny; but not a correct HTTP/1.1
# implementation
# implementation.
package HTTP::Micro;
our $VERSION = '0.01';
package HTTPMicro;
BEGIN {
$HTTPMicro::VERSION = '0.001';
}
use strict;
use warnings;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Carp ();
my @attributes;
BEGIN {
@attributes = qw(agent timeout);
@@ -103,7 +101,7 @@ sub _request {
headers => {},
};
my $handle = HTTPMicro::Handle->new(timeout => $self->{timeout});
my $handle = HTTP::Micro::Handle->new(timeout => $self->{timeout});
$handle->connect($scheme, $host, $port);
@@ -169,322 +167,327 @@ sub _split_url {
return ($scheme, $host, $port, $path_query);
}
package
HTTPMicro::Handle; # hide from PAUSE/indexers
use strict;
use warnings;
} # HTTP::Micro
use Carp qw[croak];
use Errno qw[EINTR EPIPE];
use IO::Socket qw[SOCK_STREAM];
{
package HTTP::Micro::Handle;
sub BUFSIZE () { 32768 }
use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
my $Printable = sub {
local $_ = shift;
s/\r/\\r/g;
s/\n/\\n/g;
s/\t/\\t/g;
s/([^\x20-\x7E])/sprintf('\\x%.2X', ord($1))/ge;
$_;
};
use Carp qw(croak);
use Errno qw(EINTR EPIPE);
use IO::Socket qw(SOCK_STREAM);
sub new {
my ($class, %args) = @_;
return bless {
rbuf => '',
timeout => 60,
max_line_size => 16384,
%args
}, $class;
}
sub BUFSIZE () { 32768 }
my $ssl_verify_args = {
check_cn => "when_only",
wildcards_in_alt => "anywhere",
wildcards_in_cn => "anywhere"
};
my $Printable = sub {
local $_ = shift;
s/\r/\\r/g;
s/\n/\\n/g;
s/\t/\\t/g;
s/([^\x20-\x7E])/sprintf('\\x%.2X', ord($1))/ge;
$_;
};
sub connect {
@_ == 4 || croak(q/Usage: $handle->connect(scheme, host, port)/);
my ($self, $scheme, $host, $port) = @_;
sub new {
my ($class, %args) = @_;
return bless {
rbuf => '',
timeout => 60,
max_line_size => 16384,
%args
}, $class;
}
if ( $scheme eq 'https' ) {
eval "require IO::Socket::SSL"
unless exists $INC{'IO/Socket/SSL.pm'};
croak(qq/IO::Socket::SSL must be installed for https support\n/)
unless $INC{'IO/Socket/SSL.pm'};
}
elsif ( $scheme ne 'http' ) {
croak(qq/Unsupported URL scheme '$scheme'\n/);
}
my $ssl_verify_args = {
check_cn => "when_only",
wildcards_in_alt => "anywhere",
wildcards_in_cn => "anywhere"
};
$self->{fh} = 'IO::Socket::INET'->new(
PeerHost => $host,
PeerPort => $port,
Proto => 'tcp',
Type => SOCK_STREAM,
Timeout => $self->{timeout}
) or croak(qq/Could not connect to '$host:$port': $@/);
sub connect {
@_ == 4 || croak(q/Usage: $handle->connect(scheme, host, port)/);
my ($self, $scheme, $host, $port) = @_;
binmode($self->{fh})
or croak(qq/Could not binmode() socket: '$!'/);
if ( $scheme eq 'https' ) {
eval "require IO::Socket::SSL"
unless exists $INC{'IO/Socket/SSL.pm'};
croak(qq/IO::Socket::SSL must be installed for https support\n/)
unless $INC{'IO/Socket/SSL.pm'};
}
elsif ( $scheme ne 'http' ) {
croak(qq/Unsupported URL scheme '$scheme'\n/);
}
if ( $scheme eq 'https') {
IO::Socket::SSL->start_SSL($self->{fh});
ref($self->{fh}) eq 'IO::Socket::SSL'
or die(qq/SSL connection failed for $host\n/);
if ( $self->{fh}->can("verify_hostname") ) {
$self->{fh}->verify_hostname( $host, $ssl_verify_args );
}
else {
# Can't use $self->{fh}->verify_hostname because the IO::Socket::SSL
# that comes from yum doesn't have it, so use our inlined version.
my $fh = $self->{fh};
_verify_hostname_of_cert($host, _peer_certificate($fh), $ssl_verify_args)
or die(qq/SSL certificate not valid for $host\n/);
}
}
$self->{host} = $host;
$self->{port} = $port;
$self->{fh} = IO::Socket::INET->new(
PeerHost => $host,
PeerPort => $port,
Proto => 'tcp',
Type => SOCK_STREAM,
Timeout => $self->{timeout}
) or croak(qq/Could not connect to '$host:$port': $@/);
return $self;
}
binmode($self->{fh})
or croak(qq/Could not binmode() socket: '$!'/);
sub close {
@_ == 1 || croak(q/Usage: $handle->close()/);
my ($self) = @_;
CORE::close($self->{fh})
or croak(qq/Could not close socket: '$!'/);
}
if ( $scheme eq 'https') {
IO::Socket::SSL->start_SSL($self->{fh});
ref($self->{fh}) eq 'IO::Socket::SSL'
or die(qq/SSL connection failed for $host\n/);
if ( $self->{fh}->can("verify_hostname") ) {
$self->{fh}->verify_hostname( $host, $ssl_verify_args );
}
else {
# Can't use $self->{fh}->verify_hostname because the IO::Socket::SSL
# that comes from yum doesn't have it, so use our inlined version.
my $fh = $self->{fh};
_verify_hostname_of_cert($host, _peer_certificate($fh), $ssl_verify_args)
or die(qq/SSL certificate not valid for $host\n/);
}
}
$self->{host} = $host;
$self->{port} = $port;
sub write {
@_ == 2 || croak(q/Usage: $handle->write(buf)/);
my ($self, $buf) = @_;
return $self;
}
my $len = length $buf;
my $off = 0;
sub close {
@_ == 1 || croak(q/Usage: $handle->close()/);
my ($self) = @_;
CORE::close($self->{fh})
or croak(qq/Could not close socket: '$!'/);
}
local $SIG{PIPE} = 'IGNORE';
sub write {
@_ == 2 || croak(q/Usage: $handle->write(buf)/);
my ($self, $buf) = @_;
while () {
$self->can_write
or croak(q/Timed out while waiting for socket to become ready for writing/);
my $r = syswrite($self->{fh}, $buf, $len, $off);
if (defined $r) {
$len -= $r;
$off += $r;
last unless $len > 0;
}
elsif ($! == EPIPE) {
croak(qq/Socket closed by remote server: $!/);
}
elsif ($! != EINTR) {
croak(qq/Could not write to socket: '$!'/);
}
}
return $off;
}
my $len = length $buf;
my $off = 0;
sub read {
@_ == 2 || @_ == 3 || croak(q/Usage: $handle->read(len)/);
my ($self, $len) = @_;
local $SIG{PIPE} = 'IGNORE';
my $buf = '';
my $got = length $self->{rbuf};
while () {
$self->can_write
or croak(q/Timed out while waiting for socket to become ready for writing/);
my $r = syswrite($self->{fh}, $buf, $len, $off);
if (defined $r) {
$len -= $r;
$off += $r;
last unless $len > 0;
}
elsif ($! == EPIPE) {
croak(qq/Socket closed by remote server: $!/);
}
elsif ($! != EINTR) {
croak(qq/Could not write to socket: '$!'/);
}
}
return $off;
}
if ($got) {
my $take = ($got < $len) ? $got : $len;
$buf = substr($self->{rbuf}, 0, $take, '');
$len -= $take;
}
sub read {
@_ == 2 || @_ == 3 || croak(q/Usage: $handle->read(len)/);
my ($self, $len) = @_;
while ($len > 0) {
$self->can_read
or croak(q/Timed out while waiting for socket to become ready for reading/);
my $r = sysread($self->{fh}, $buf, $len, length $buf);
if (defined $r) {
last unless $r;
$len -= $r;
}
elsif ($! != EINTR) {
croak(qq/Could not read from socket: '$!'/);
}
}
if ($len) {
croak(q/Unexpected end of stream/);
}
return $buf;
}
my $buf = '';
my $got = length $self->{rbuf};
sub readline {
@_ == 1 || croak(q/Usage: $handle->readline()/);
my ($self) = @_;
if ($got) {
my $take = ($got < $len) ? $got : $len;
$buf = substr($self->{rbuf}, 0, $take, '');
$len -= $take;
}
while () {
if ($self->{rbuf} =~ s/\A ([^\x0D\x0A]* \x0D?\x0A)//x) {
return $1;
}
$self->can_read
or croak(q/Timed out while waiting for socket to become ready for reading/);
my $r = sysread($self->{fh}, $self->{rbuf}, BUFSIZE, length $self->{rbuf});
if (defined $r) {
last unless $r;
}
elsif ($! != EINTR) {
croak(qq/Could not read from socket: '$!'/);
}
}
croak(q/Unexpected end of stream while looking for line/);
}
while ($len > 0) {
$self->can_read
or croak(q/Timed out while waiting for socket to become ready for reading/);
my $r = sysread($self->{fh}, $buf, $len, length $buf);
if (defined $r) {
last unless $r;
$len -= $r;
}
elsif ($! != EINTR) {
croak(qq/Could not read from socket: '$!'/);
}
}
if ($len) {
croak(q/Unexpected end of stream/);
}
return $buf;
}
sub read_header_lines {
@_ == 1 || @_ == 2 || croak(q/Usage: $handle->read_header_lines([headers])/);
my ($self, $headers) = @_;
$headers ||= {};
my $lines = 0;
my $val;
sub readline {
@_ == 1 || croak(q/Usage: $handle->readline()/);
my ($self) = @_;
while () {
my $line = $self->readline;
while () {
if ($self->{rbuf} =~ s/\A ([^\x0D\x0A]* \x0D?\x0A)//x) {
return $1;
}
$self->can_read
or croak(q/Timed out while waiting for socket to become ready for reading/);
my $r = sysread($self->{fh}, $self->{rbuf}, BUFSIZE, length $self->{rbuf});
if (defined $r) {
last unless $r;
}
elsif ($! != EINTR) {
croak(qq/Could not read from socket: '$!'/);
}
}
croak(q/Unexpected end of stream while looking for line/);
}
if ($line =~ /\A ([^\x00-\x1F\x7F:]+) : [\x09\x20]* ([^\x0D\x0A]*)/x) {
my ($field_name) = lc $1;
$val = \($headers->{$field_name} = $2);
}
elsif ($line =~ /\A [\x09\x20]+ ([^\x0D\x0A]*)/x) {
$val
or croak(q/Unexpected header continuation line/);
next unless length $1;
$$val .= ' ' if length $$val;
$$val .= $1;
}
elsif ($line =~ /\A \x0D?\x0A \z/x) {
last;
}
else {
croak(q/Malformed header line: / . $Printable->($line));
}
}
return $headers;
}
sub read_header_lines {
@_ == 1 || @_ == 2 || croak(q/Usage: $handle->read_header_lines([headers])/);
my ($self, $headers) = @_;
$headers ||= {};
my $lines = 0;
my $val;
sub write_header_lines {
(@_ == 2 && ref $_[1] eq 'HASH') || croak(q/Usage: $handle->write_header_lines(headers)/);
my($self, $headers) = @_;
while () {
my $line = $self->readline;
my $buf = '';
while (my ($k, $v) = each %$headers) {
my $field_name = lc $k;
$field_name =~ /\A [\x21\x23-\x27\x2A\x2B\x2D\x2E\x30-\x39\x41-\x5A\x5E-\x7A\x7C\x7E]+ \z/x
or croak(q/Invalid HTTP header field name: / . $Printable->($field_name));
$field_name =~ s/\b(\w)/\u$1/g;
$buf .= "$field_name: $v\x0D\x0A";
}
$buf .= "\x0D\x0A";
return $self->write($buf);
}
if ($line =~ /\A ([^\x00-\x1F\x7F:]+) : [\x09\x20]* ([^\x0D\x0A]*)/x) {
my ($field_name) = lc $1;
$val = \($headers->{$field_name} = $2);
}
elsif ($line =~ /\A [\x09\x20]+ ([^\x0D\x0A]*)/x) {
$val
or croak(q/Unexpected header continuation line/);
next unless length $1;
$$val .= ' ' if length $$val;
$$val .= $1;
}
elsif ($line =~ /\A \x0D?\x0A \z/x) {
last;
}
else {
croak(q/Malformed header line: / . $Printable->($line));
}
}
return $headers;
}
sub read_content_body {
@_ == 3 || @_ == 4 || croak(q/Usage: $handle->read_content_body(callback, response, [read_length])/);
my ($self, $cb, $response, $len) = @_;
$len ||= $response->{headers}{'content-length'};
sub write_header_lines {
(@_ == 2 && ref $_[1] eq 'HASH') || croak(q/Usage: $handle->write_header_lines(headers)/);
my($self, $headers) = @_;
croak("No content-length in the returned response, and this "
. "UA doesn't implement chunking") unless defined $len;
my $buf = '';
while (my ($k, $v) = each %$headers) {
my $field_name = lc $k;
$field_name =~ /\A [\x21\x23-\x27\x2A\x2B\x2D\x2E\x30-\x39\x41-\x5A\x5E-\x7A\x7C\x7E]+ \z/x
or croak(q/Invalid HTTP header field name: / . $Printable->($field_name));
$field_name =~ s/\b(\w)/\u$1/g;
$buf .= "$field_name: $v\x0D\x0A";
}
$buf .= "\x0D\x0A";
return $self->write($buf);
}
while ($len > 0) {
my $read = ($len > BUFSIZE) ? BUFSIZE : $len;
$cb->($self->read($read), $response);
$len -= $read;
}
sub read_content_body {
@_ == 3 || @_ == 4 || croak(q/Usage: $handle->read_content_body(callback, response, [read_length])/);
my ($self, $cb, $response, $len) = @_;
$len ||= $response->{headers}{'content-length'};
return;
}
croak("No content-length in the returned response, and this "
. "UA doesn't implement chunking") unless defined $len;
sub write_content_body {
@_ == 2 || croak(q/Usage: $handle->write_content_body(request)/);
my ($self, $request) = @_;
my ($len, $content_length) = (0, $request->{headers}{'content-length'});
while ($len > 0) {
my $read = ($len > BUFSIZE) ? BUFSIZE : $len;
$cb->($self->read($read), $response);
$len -= $read;
}
$len += $self->write($request->{content});
return;
}
$len == $content_length
or croak(qq/Content-Length missmatch (got: $len expected: $content_length)/);
sub write_content_body {
@_ == 2 || croak(q/Usage: $handle->write_content_body(request)/);
my ($self, $request) = @_;
my ($len, $content_length) = (0, $request->{headers}{'content-length'});
return $len;
}
$len += $self->write($request->{content});
sub read_response_header {
@_ == 1 || croak(q/Usage: $handle->read_response_header()/);
my ($self) = @_;
$len == $content_length
or croak(qq/Content-Length missmatch (got: $len expected: $content_length)/);
my $line = $self->readline;
return $len;
}
$line =~ /\A (HTTP\/(0*\d+\.0*\d+)) [\x09\x20]+ ([0-9]{3}) [\x09\x20]+ ([^\x0D\x0A]*) \x0D?\x0A/x
or croak(q/Malformed Status-Line: / . $Printable->($line));
sub read_response_header {
@_ == 1 || croak(q/Usage: $handle->read_response_header()/);
my ($self) = @_;
my ($protocol, $version, $status, $reason) = ($1, $2, $3, $4);
my $line = $self->readline;
return {
status => $status,
reason => $reason,
headers => $self->read_header_lines,
protocol => $protocol,
};
}
$line =~ /\A (HTTP\/(0*\d+\.0*\d+)) [\x09\x20]+ ([0-9]{3}) [\x09\x20]+ ([^\x0D\x0A]*) \x0D?\x0A/x
or croak(q/Malformed Status-Line: / . $Printable->($line));
sub write_request_header {
@_ == 4 || croak(q/Usage: $handle->write_request_header(method, request_uri, headers)/);
my ($self, $method, $request_uri, $headers) = @_;
my ($protocol, $version, $status, $reason) = ($1, $2, $3, $4);
return $self->write("$method $request_uri HTTP/1.1\x0D\x0A")
+ $self->write_header_lines($headers);
}
return {
status => $status,
reason => $reason,
headers => $self->read_header_lines,
protocol => $protocol,
};
}
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout = $self->{timeout}
unless defined $timeout && $timeout >= 0;
sub write_request_header {
@_ == 4 || croak(q/Usage: $handle->write_request_header(method, request_uri, headers)/);
my ($self, $method, $request_uri, $headers) = @_;
my $fd = fileno $self->{fh};
defined $fd && $fd >= 0
or croak(q/select(2): 'Bad file descriptor'/);
return $self->write("$method $request_uri HTTP/1.1\x0D\x0A")
+ $self->write_header_lines($headers);
}
my $initial = time;
my $pending = $timeout;
my $nfound;
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout = $self->{timeout}
unless defined $timeout && $timeout >= 0;
vec(my $fdset = '', $fd, 1) = 1;
my $fd = fileno $self->{fh};
defined $fd && $fd >= 0
or croak(q/select(2): 'Bad file descriptor'/);
while () {
$nfound = ($type eq 'read')
? select($fdset, undef, undef, $pending)
: select(undef, $fdset, undef, $pending) ;
if ($nfound == -1) {
$! == EINTR
or croak(qq/select(2): '$!'/);
redo if !$timeout || ($pending = $timeout - (time - $initial)) > 0;
$nfound = 0;
}
last;
}
$! = 0;
return $nfound;
}
my $initial = time;
my $pending = $timeout;
my $nfound;
sub can_read {
@_ == 1 || @_ == 2 || croak(q/Usage: $handle->can_read([timeout])/);
my $self = shift;
return $self->_do_timeout('read', @_)
}
vec(my $fdset = '', $fd, 1) = 1;
sub can_write {
@_ == 1 || @_ == 2 || croak(q/Usage: $handle->can_write([timeout])/);
my $self = shift;
return $self->_do_timeout('write', @_)
}
while () {
$nfound = ($type eq 'read')
? select($fdset, undef, undef, $pending)
: select(undef, $fdset, undef, $pending) ;
if ($nfound == -1) {
$! == EINTR
or croak(qq/select(2): '$!'/);
redo if !$timeout || ($pending = $timeout - (time - $initial)) > 0;
$nfound = 0;
}
last;
}
$! = 0;
return $nfound;
}
sub can_read {
@_ == 1 || @_ == 2 || croak(q/Usage: $handle->can_read([timeout])/);
my $self = shift;
return $self->_do_timeout('read', @_)
}
sub can_write {
@_ == 1 || @_ == 2 || croak(q/Usage: $handle->can_write([timeout])/);
my $self = shift;
return $self->_do_timeout('write', @_)
}
} # HTTP::Micro::Handle
# Partially copy-pasted from IO::Socket::SSL 1.76, with some changes because
# we're forced to use IO::Socket::SSL version 1.01 in yum-based distros
@@ -507,6 +510,7 @@ BEGIN {
}
}
{
use Carp qw(croak);
my %dispatcher = (
issuer => sub { Net::SSLeay::X509_NAME_oneline( Net::SSLeay::X509_get_issuer_name( shift )) },
subject => sub { Net::SSLeay::X509_NAME_oneline( Net::SSLeay::X509_get_subject_name( shift )) },
@@ -703,7 +707,6 @@ if ( $INC{"IO/Socket/SSL.pm"} ) {
}
1;
}
# ###########################################################################
# End HTTPMicro package
# ###########################################################################

View File

@@ -0,0 +1,43 @@
# This program is copyright 2012-2013 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA 02111-1307 USA.
# ###########################################################################
# Percona::WebAPI::Util package
# ###########################################################################
{
package Percona::WebAPI::Util;
use Digest::MD5 qw(md5_hex);
use Percona::WebAPI::Representation;
require Exporter;
our @ISA = qw(Exporter);
our %EXPORT_TAGS = ();
our @EXPORT_OK = (qw(resource_diff));
our @EXPORT = ();
sub resource_diff {
my ($x, $y) = @_;
return md5_hex(Percona::WebAPI::Representation::as_json($x))
cmp md5_hex(Percona::WebAPI::Representation::as_json($y));
}
1;
}
# ###########################################################################
# End Percona::WebAPI::Util package
# ###########################################################################

View File

@@ -40,7 +40,7 @@ use constant PTDEBUG => $ENV{PTDEBUG} || 0;
local $EVAL_ERROR;
eval {
require Percona::Toolkit;
require Percona::HTTP::Micro;
require HTTP::Micro;
};
my $dir = File::Spec->tmpdir();
@@ -163,7 +163,7 @@ sub pingback {
# Optional args
my ($instances, $ua, $vc) = @args{qw(instances ua VersionCheck)};
$ua ||= HTTPMicro->new( timeout => 5 );
$ua ||= HTTP::Micro->new( timeout => 5 );
$vc ||= VersionCheck->new();
# GET https://upgrade.percona.com, the server will return