mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-11 13:40:07 +00:00
PMM-6494 Topology labels (#463)
* PMM-6494 Topology labels Added common functions needed for topology lables in mongo exporter and pt-mongodb-summary * PMM-5723 Fix reviewdog. Co-authored-by: Nurlan Moldomurov <nurlan.moldomurov@percona.com>
This commit is contained in:
@@ -1,11 +1,16 @@
|
||||
package testutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -143,30 +148,70 @@ func BaseDir() string {
|
||||
}
|
||||
|
||||
basedir = strings.TrimSpace(string(out))
|
||||
|
||||
return basedir
|
||||
}
|
||||
|
||||
// GetMongoDBAddr returns the address of an instance by replicaset name and instance type like
|
||||
// (rs1, primary) or (rs1, secondary1)
|
||||
// (rs1, primary) or (rs1, secondary1).
|
||||
func GetMongoDBAddr(rs, name string) string {
|
||||
if _, ok := hosts[rs]; !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
replset := hosts[rs]
|
||||
|
||||
if host, ok := replset[name]; ok {
|
||||
return host
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetMongoDBReplsetAddrs return the addresses of all instances for a replicaset name
|
||||
// GetMongoDBReplsetAddrs return the addresses of all instances for a replicaset name.
|
||||
func GetMongoDBReplsetAddrs(rs string) []string {
|
||||
addrs := []string{}
|
||||
|
||||
if _, ok := hosts[rs]; !ok {
|
||||
return addrs
|
||||
}
|
||||
|
||||
for _, host := range hosts[rs] {
|
||||
addrs = append(addrs, host)
|
||||
}
|
||||
|
||||
return addrs
|
||||
}
|
||||
|
||||
// TestClient returns a new MongoDB connection to the specified server port.
|
||||
func TestClient(ctx context.Context, port string) (*mongo.Client, error) {
|
||||
if port == "" {
|
||||
port = MongoDBShard1PrimaryPort
|
||||
}
|
||||
|
||||
hostname := "127.0.0.1"
|
||||
direct := true
|
||||
to := time.Second
|
||||
co := &options.ClientOptions{
|
||||
ConnectTimeout: &to,
|
||||
Hosts: []string{net.JoinHostPort(hostname, port)},
|
||||
Direct: &direct,
|
||||
Auth: &options.Credential{
|
||||
Username: MongoDBUser,
|
||||
Password: MongoDBPassword,
|
||||
PasswordSet: true,
|
||||
},
|
||||
}
|
||||
|
||||
client, err := mongo.Connect(ctx, co)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = client.Ping(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
30
src/go/mongolib/proto/procinfo.go
Normal file
30
src/go/mongolib/proto/procinfo.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package proto
|
||||
|
||||
import "time"
|
||||
|
||||
type ProcInfo struct {
|
||||
CreateTime time.Time
|
||||
Path string
|
||||
UserName string
|
||||
Error error
|
||||
}
|
||||
|
||||
type GetHostInfo struct {
|
||||
Hostname string
|
||||
HostOsType string
|
||||
HostSystemCPUArch string
|
||||
HostDatabases int
|
||||
HostCollections int
|
||||
DBPath string
|
||||
|
||||
ProcPath string
|
||||
ProcUserName string
|
||||
ProcCreateTime time.Time
|
||||
ProcProcessCount int
|
||||
|
||||
// Server Status
|
||||
ProcessName string
|
||||
ReplicasetName string
|
||||
Version string
|
||||
NodeType string
|
||||
}
|
@@ -1,6 +1,9 @@
|
||||
package proto
|
||||
|
||||
import "go.mongodb.org/mongo-driver/bson/primitive"
|
||||
import (
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
const (
|
||||
REPLICA_SET_MEMBER_STARTUP = iota
|
||||
@@ -48,3 +51,83 @@ type ReplicaSetStatus struct {
|
||||
Ok float64 `bson:"ok"` //
|
||||
Set string `bson:"set"` // Replica set name
|
||||
}
|
||||
|
||||
type Member struct {
|
||||
Host string `bson:"host"`
|
||||
Votes int32 `bson:"votes"`
|
||||
ID int32 `bson:"_id"`
|
||||
SlaveDelay int64 `bson:"slaveDelay"`
|
||||
Priority float64 `bson:"priority"`
|
||||
BuildIndexes bool `bson:"buildIndexes"`
|
||||
ArbiterOnly bool `bson:"arbiterOnly"`
|
||||
Hidden bool `bson:"hidden"`
|
||||
Tags bson.M `bson:"tags"`
|
||||
}
|
||||
|
||||
type RSConfig struct {
|
||||
ID string `bson:"_id"`
|
||||
ConfigServer bool `bson:"configsvr"`
|
||||
WriteConcernMajorityJournalDefault bool `bson:"writeConcernMajorityJournalDefault"`
|
||||
Version int32 `bson:"version"`
|
||||
ProtocolVersion int64 `bson:"protocolVersion"`
|
||||
Settings RSSettings `bson:"settings"`
|
||||
Members []Member `bson:"members"`
|
||||
}
|
||||
|
||||
type LastErrorDefaults struct {
|
||||
W int32 `bson:"w"`
|
||||
WTimeout int32 `bson:"wtimeout"`
|
||||
}
|
||||
|
||||
type RSSettings struct {
|
||||
HeartbeatTimeoutSecs int32 `bson:"heartbeatTimeoutSecs"`
|
||||
ElectionTimeoutMillis int32 `bson:"electionTimeoutMillis"`
|
||||
CatchUpTimeoutMillis int32 `bson:"catchUpTimeoutMillis"`
|
||||
GetLastErrorModes bson.M `bson:"getLastErrorModes"`
|
||||
ChainingAllowed bool `bson:"chainingAllowed"`
|
||||
HeartbeatIntervalMillis int32 `bson:"heartbeatIntervalMillis"`
|
||||
CatchUpTakeoverDelayMillis int32 `bson:"catchUpTakeoverDelayMillis"`
|
||||
GetLastErrorDefaults LastErrorDefaults `bson:"getLastErrorDefaults"`
|
||||
ReplicaSetID primitive.ObjectID `bson:"replicaSetId"`
|
||||
}
|
||||
|
||||
type Signature struct {
|
||||
Hash primitive.Binary `bson:"hash"`
|
||||
KeyID int64 `bson:"keyId"`
|
||||
}
|
||||
|
||||
type ClusterTime struct {
|
||||
ClusterTime primitive.Timestamp `bson:"clusterTime"`
|
||||
Signature Signature `bson:"signature"`
|
||||
}
|
||||
|
||||
type ReplicasetConfig struct {
|
||||
Config RSConfig `bson:"config"`
|
||||
OK float64 `bson:"ok"`
|
||||
LastCommittedOpTime primitive.Timestamp `bson:"lastCommittedOpTime"`
|
||||
ClusterTime ClusterTime `bson:"$clusterTime"`
|
||||
OperationTime primitive.Timestamp `bson:"operationTime"`
|
||||
}
|
||||
|
||||
type ConfigVersion struct {
|
||||
ID int32 `bson:"_id"`
|
||||
MinCompatibleVersion int32 `bson:"minCompatibleVersion"`
|
||||
CurrentVersion int32 `bson:"currentVersion"`
|
||||
ClusterID primitive.ObjectID `bson:"clusterId"`
|
||||
}
|
||||
|
||||
type ShardIdentity struct {
|
||||
ID string `bson:"_id"`
|
||||
ShardName string `bson:"shardName"`
|
||||
ClusterID primitive.ObjectID `bson:"clusterId"`
|
||||
ConfigsvrConnectionString string `bson:"configsvrConnectionString"`
|
||||
}
|
||||
|
||||
// MyState is a subset of getDiagnosticData result used to tag metrics in the MongoDB exporter
|
||||
type MyState struct {
|
||||
Data struct {
|
||||
ReplicasetGetStatus struct {
|
||||
MyState int `bson:"myState"`
|
||||
} `bson:"replSetGetStatus"`
|
||||
} `bson:"data"`
|
||||
}
|
||||
|
@@ -2,23 +2,39 @@ package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/shirou/gopsutil/process"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
const (
|
||||
TypeIsDBGrid = "isdbgrid"
|
||||
TypeMongos = "mongos"
|
||||
TypeMongod = "mongod"
|
||||
TypeShardServer = "shardsvr"
|
||||
|
||||
milliToSeconds = 1000
|
||||
shardingNotEnabledErrorCode = 203
|
||||
ErrNotYetInitialized = int32(94)
|
||||
ErrNoReplicationEnabled = int32(76)
|
||||
)
|
||||
|
||||
var (
|
||||
CannotGetQueryError = errors.New("cannot get query field from the profile document (it is not a map)")
|
||||
ShardingNotEnabledError = errors.New("sharding not enabled")
|
||||
|
||||
ErrCannotGetProcess = fmt.Errorf("cannot get process")
|
||||
ErrCannotGetClusterID = fmt.Errorf("cannot get cluster ID")
|
||||
)
|
||||
|
||||
func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOptions) ([]proto.Members, error) {
|
||||
@@ -26,6 +42,7 @@ func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOpti
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot get a new client for GetReplicasetMembers")
|
||||
}
|
||||
|
||||
if err := client.Connect(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot connect to MongoDB")
|
||||
}
|
||||
@@ -34,6 +51,7 @@ func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOpti
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := client.Disconnect(ctx); err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot disconnect from %v", clientOptions.Hosts)
|
||||
}
|
||||
@@ -54,7 +72,10 @@ func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOpti
|
||||
cmdOpts := proto.CommandLineOptions{}
|
||||
// Not always we can get this info. For examples, we cannot get this for hidden hosts so
|
||||
// if there is an error, just ignore it
|
||||
res := client.Database("admin").RunCommand(ctx, primitive.D{{"getCmdLineOpts", 1}, {"recordStats", 1}})
|
||||
res := client.Database("admin").RunCommand(ctx, primitive.D{
|
||||
{Key: "getCmdLineOpts", Value: 1},
|
||||
{Key: "recordStats", Value: 1},
|
||||
})
|
||||
if res.Err() == nil {
|
||||
if err := res.Decode(&cmdOpts); err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot decode getCmdLineOpts response for host %s", hostname)
|
||||
@@ -62,6 +83,7 @@ func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOpti
|
||||
}
|
||||
|
||||
rss := proto.ReplicaSetStatus{}
|
||||
|
||||
res = client.Database("admin").RunCommand(ctx, primitive.M{"replSetGetStatus": 1})
|
||||
if res.Err() != nil {
|
||||
m := proto.Members{
|
||||
@@ -73,26 +95,34 @@ func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOpti
|
||||
m.ID = serverStatus.Pid
|
||||
m.StorageEngine = serverStatus.StorageEngine
|
||||
}
|
||||
|
||||
membersMap[m.Name] = m
|
||||
|
||||
continue // If a host is a mongos we cannot get info but is not a real error
|
||||
}
|
||||
|
||||
if err := res.Decode(&rss); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot decode replSetGetStatus response")
|
||||
}
|
||||
|
||||
for _, m := range rss.Members {
|
||||
if _, ok := membersMap[m.Name]; ok {
|
||||
continue // already exists
|
||||
}
|
||||
|
||||
m.Set = rss.Set
|
||||
|
||||
if serverStatus, err := GetServerStatus(ctx, client); err == nil {
|
||||
m.ID = serverStatus.Pid
|
||||
m.StorageEngine = serverStatus.StorageEngine
|
||||
|
||||
if cmdOpts.Parsed.Sharding.ClusterRole != "" {
|
||||
m.StateStr = cmdOpts.Parsed.Sharding.ClusterRole + "/" + m.StateStr
|
||||
}
|
||||
|
||||
m.StateStr = strings.ToUpper(m.StateStr)
|
||||
}
|
||||
|
||||
membersMap[m.Name] = m
|
||||
}
|
||||
|
||||
@@ -104,30 +134,36 @@ func GetReplicasetMembers(ctx context.Context, clientOptions *options.ClientOpti
|
||||
}
|
||||
|
||||
sort.Slice(members, func(i, j int) bool { return members[i].Name < members[j].Name })
|
||||
|
||||
return members, nil
|
||||
}
|
||||
|
||||
func GetHostnames(ctx context.Context, client *mongo.Client) ([]string, error) {
|
||||
// Probably we are connected to an individual member of a replica set
|
||||
rss := proto.ReplicaSetStatus{}
|
||||
|
||||
res := client.Database("admin").RunCommand(ctx, primitive.M{"replSetGetStatus": 1})
|
||||
if res.Err() == nil {
|
||||
if err := res.Decode(&rss); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot decode replSetGetStatus response for GetHostnames")
|
||||
}
|
||||
|
||||
return buildHostsListFromReplStatus(rss), nil
|
||||
}
|
||||
|
||||
// Try getShardMap first. If we are connected to a mongos it will return
|
||||
// all hosts, including config hosts
|
||||
var shardsMap proto.ShardsMap
|
||||
|
||||
smRes := client.Database("admin").RunCommand(ctx, primitive.M{"getShardMap": 1})
|
||||
if smRes.Err() != nil {
|
||||
if e, ok := smRes.Err().(mongo.CommandError); ok && e.Code == shardingNotEnabledErrorCode {
|
||||
return nil, ShardingNotEnabledError // standalone instance
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(smRes.Err(), "cannot getShardMap for GetHostnames")
|
||||
}
|
||||
|
||||
if err := smRes.Decode(&shardsMap); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot decode getShardMap result for GetHostnames")
|
||||
}
|
||||
@@ -145,48 +181,47 @@ func GetHostnames(ctx context.Context, client *mongo.Client) ([]string, error) {
|
||||
return nil, nil // standalone instance
|
||||
}
|
||||
|
||||
/*
|
||||
"members" : [
|
||||
{
|
||||
"_id" : 0,
|
||||
"name" : "localhost:17001",
|
||||
"health" : 1,
|
||||
"state" : 1,
|
||||
"stateStr" : "PRIMARY",
|
||||
"uptime" : 4700,
|
||||
"optime" : Timestamp(1486554836, 1),
|
||||
"optimeDate" : ISODate("2017-02-08T11:53:56Z"),
|
||||
"electionTime" : Timestamp(1486651810, 1),
|
||||
"electionDate" : ISODate("2017-02-09T14:50:10Z"),
|
||||
"configVersion" : 1,
|
||||
"self" : true
|
||||
},
|
||||
*/
|
||||
func buildHostsListFromReplStatus(replStatus proto.ReplicaSetStatus) []string {
|
||||
/*
|
||||
"members" : [
|
||||
{
|
||||
"_id" : 0,
|
||||
"name" : "localhost:17001",
|
||||
"health" : 1,
|
||||
"state" : 1,
|
||||
"stateStr" : "PRIMARY",
|
||||
"uptime" : 4700,
|
||||
"optime" : Timestamp(1486554836, 1),
|
||||
"optimeDate" : ISODate("2017-02-08T11:53:56Z"),
|
||||
"electionTime" : Timestamp(1486651810, 1),
|
||||
"electionDate" : ISODate("2017-02-09T14:50:10Z"),
|
||||
"configVersion" : 1,
|
||||
"self" : true
|
||||
},
|
||||
*/
|
||||
|
||||
hostnames := []string{}
|
||||
for _, member := range replStatus.Members {
|
||||
hostnames = append(hostnames, member.Name)
|
||||
}
|
||||
|
||||
sort.Strings(hostnames) // to make testing easier
|
||||
|
||||
return hostnames
|
||||
}
|
||||
|
||||
/* Example
|
||||
mongos> db.getSiblingDB('admin').runCommand('getShardMap')
|
||||
{
|
||||
"map" : {
|
||||
"config" : "localhost:19001,localhost:19002,localhost:19003",
|
||||
"localhost:17001" : "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
"r1" : "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
"r1/localhost:17001,localhost:17002,localhost:17003" : "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
},
|
||||
"ok" : 1
|
||||
}.
|
||||
*/
|
||||
func buildHostsListFromShardMap(shardsMap proto.ShardsMap) []string {
|
||||
/* Example
|
||||
mongos> db.getSiblingDB('admin').runCommand('getShardMap')
|
||||
{
|
||||
"map" : {
|
||||
"config" : "localhost:19001,localhost:19002,localhost:19003",
|
||||
"localhost:17001" : "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
"r1" : "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
"r1/localhost:17001,localhost:17002,localhost:17003" : "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
},
|
||||
"ok" : 1
|
||||
}
|
||||
*/
|
||||
|
||||
hostnames := []string{}
|
||||
hm := make(map[string]bool)
|
||||
|
||||
@@ -197,10 +232,11 @@ func buildHostsListFromShardMap(shardsMap proto.ShardsMap) []string {
|
||||
for _, val := range shardsMap.Map {
|
||||
m := strings.Split(val, "/")
|
||||
hostsStr := ""
|
||||
|
||||
switch len(m) {
|
||||
case 1:
|
||||
hostsStr = m[0] // there is no / in the hosts list
|
||||
case 2:
|
||||
case 2: //nolint
|
||||
hostsStr = m[1] // there is a / in the string. Remove the prefix until the / and keep the rest
|
||||
}
|
||||
// since there is no Sets in Go, build a map where the value is the map key
|
||||
@@ -209,33 +245,39 @@ func buildHostsListFromShardMap(shardsMap proto.ShardsMap) []string {
|
||||
hm[host] = false
|
||||
}
|
||||
}
|
||||
|
||||
for host := range hm {
|
||||
hostnames = append(hostnames, host)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(hostnames)
|
||||
|
||||
return hostnames
|
||||
}
|
||||
|
||||
// GetShardedHosts is like GetHostnames but it uses listShards instead of getShardMap
|
||||
// so it won't include config servers in the returned list
|
||||
// so it won't include config servers in the returned list.
|
||||
func GetShardedHosts(ctx context.Context, client *mongo.Client) ([]string, error) {
|
||||
shardsInfo := &proto.ShardsInfo{}
|
||||
|
||||
res := client.Database("admin").RunCommand(ctx, primitive.M{"listShards": 1})
|
||||
if res.Err() != nil {
|
||||
return nil, errors.Wrap(res.Err(), "cannot list shards")
|
||||
}
|
||||
|
||||
if err := res.Decode(&shardsInfo); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot decode listShards response")
|
||||
}
|
||||
|
||||
hostnames := []string{}
|
||||
|
||||
for _, shardInfo := range shardsInfo.Shards {
|
||||
m := strings.Split(shardInfo.Host, "/")
|
||||
h := strings.Split(m[1], ",")
|
||||
hostnames = append(hostnames, h[0])
|
||||
}
|
||||
|
||||
return hostnames, nil
|
||||
}
|
||||
|
||||
@@ -248,6 +290,7 @@ func GetServerStatus(ctx context.Context, client *mongo.Client) (proto.ServerSta
|
||||
{Key: "recordStats", Value: 1},
|
||||
}
|
||||
res := client.Database("admin").RunCommand(ctx, query)
|
||||
|
||||
if res.Err() != nil {
|
||||
return ss, errors.Wrap(res.Err(), "GetHostInfo.serverStatus")
|
||||
}
|
||||
@@ -265,14 +308,17 @@ func GetQueryField(doc proto.SystemProfile) (primitive.M, error) {
|
||||
// however MongoDB 3.0 doesn't have that field
|
||||
// so we need to detect protocol by looking at actual data.
|
||||
query := doc.Query
|
||||
if len(doc.Command) > 0 {
|
||||
|
||||
if len(doc.Command) > 0 { //nolint
|
||||
query = doc.Command
|
||||
|
||||
if doc.Op == "update" || doc.Op == "remove" {
|
||||
if squery, ok := query.Map()["q"]; ok {
|
||||
// just an extra check to ensure this type assertion won't fail
|
||||
if ssquery, ok := squery.(primitive.M); ok {
|
||||
return ssquery, nil
|
||||
}
|
||||
|
||||
return nil, CannotGetQueryError
|
||||
}
|
||||
}
|
||||
@@ -309,6 +355,7 @@ func GetQueryField(doc proto.SystemProfile) (primitive.M, error) {
|
||||
if ssquery, ok := squery.(primitive.M); ok {
|
||||
return ssquery, nil
|
||||
}
|
||||
|
||||
return nil, CannotGetQueryError
|
||||
}
|
||||
|
||||
@@ -317,6 +364,7 @@ func GetQueryField(doc proto.SystemProfile) (primitive.M, error) {
|
||||
if ssquery, ok := squery.(primitive.M); ok {
|
||||
return ssquery, nil
|
||||
}
|
||||
|
||||
return nil, CannotGetQueryError
|
||||
}
|
||||
|
||||
@@ -333,5 +381,173 @@ func GetQueryField(doc proto.SystemProfile) (primitive.M, error) {
|
||||
func GetClientForHost(co *options.ClientOptions, newHost string) (*mongo.Client, error) {
|
||||
newOptions := options.MergeClientOptions(co, &options.ClientOptions{Hosts: []string{newHost}})
|
||||
newOptions.SetDirect(true)
|
||||
|
||||
return mongo.NewClient(newOptions)
|
||||
}
|
||||
|
||||
func GetHostInfo(ctx context.Context, client *mongo.Client) (*proto.GetHostInfo, error) {
|
||||
hi := proto.HostInfo{}
|
||||
if err := client.Database("admin").RunCommand(ctx, primitive.M{"hostInfo": 1}).Decode(&hi); err != nil {
|
||||
log.Debugf("run('hostInfo') error: %s", err)
|
||||
return nil, errors.Wrap(err, "GetHostInfo.hostInfo")
|
||||
}
|
||||
|
||||
cmdOpts := proto.CommandLineOptions{}
|
||||
query := primitive.D{{Key: "getCmdLineOpts", Value: 1}, {Key: "recordStats", Value: 1}}
|
||||
|
||||
err := client.Database("admin").RunCommand(ctx, query).Decode(&cmdOpts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot get command line options")
|
||||
}
|
||||
|
||||
ss := proto.ServerStatus{}
|
||||
query = primitive.D{{Key: "serverStatus", Value: 1}, {Key: "recordStats", Value: 1}}
|
||||
|
||||
if err := client.Database("admin").RunCommand(ctx, query).Decode(&ss); err != nil {
|
||||
return nil, errors.Wrap(err, "GetHostInfo.serverStatus")
|
||||
}
|
||||
|
||||
pi := proto.ProcInfo{}
|
||||
if err := fillProcInfo(int32(ss.Pid), &pi); err != nil {
|
||||
pi.Error = err
|
||||
}
|
||||
|
||||
nodeType, _ := getNodeType(ctx, client)
|
||||
procCount, _ := countMongodProcesses()
|
||||
|
||||
i := &proto.GetHostInfo{
|
||||
Hostname: hi.System.Hostname,
|
||||
HostOsType: hi.Os.Type,
|
||||
HostSystemCPUArch: hi.System.CpuArch,
|
||||
DBPath: "", // Sets default. It will be overridden later if necessary
|
||||
|
||||
ProcessName: ss.Process,
|
||||
ProcProcessCount: procCount,
|
||||
Version: ss.Version,
|
||||
NodeType: nodeType,
|
||||
|
||||
ProcPath: pi.Path,
|
||||
ProcUserName: pi.UserName,
|
||||
ProcCreateTime: pi.CreateTime,
|
||||
}
|
||||
if ss.Repl != nil {
|
||||
i.ReplicasetName = ss.Repl.SetName
|
||||
}
|
||||
|
||||
if cmdOpts.Parsed.Storage.DbPath != "" {
|
||||
i.DBPath = cmdOpts.Parsed.Storage.DbPath
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func ClusterID(ctx context.Context, client *mongo.Client) (string, error) {
|
||||
var cv proto.ConfigVersion
|
||||
if err := client.Database("config").Collection("version").FindOne(ctx, bson.M{}).Decode(&cv); err == nil {
|
||||
return cv.ClusterID.Hex(), nil
|
||||
}
|
||||
|
||||
var si proto.ShardIdentity
|
||||
|
||||
filter := bson.M{"_id": "shardIdentity"}
|
||||
|
||||
if err := client.Database("admin").Collection("system.version").FindOne(ctx, filter).Decode(&si); err == nil {
|
||||
return si.ClusterID.Hex(), nil
|
||||
}
|
||||
|
||||
rc, err := ReplicasetConfig(ctx, client)
|
||||
if err != nil {
|
||||
if e, ok := err.(mongo.CommandError); ok && IsReplicationNotEnabledError(e) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
return rc.Config.Settings.ReplicaSetID.Hex(), nil
|
||||
}
|
||||
|
||||
func IsReplicationNotEnabledError(err mongo.CommandError) bool {
|
||||
return err.Code == ErrNotYetInitialized || err.Code == ErrNoReplicationEnabled
|
||||
}
|
||||
|
||||
func MyState(ctx context.Context, client *mongo.Client) (int, error) {
|
||||
var ms proto.MyState
|
||||
if err := client.Database("admin").RunCommand(ctx, bson.M{"getDiagnosticData": 1}).Decode(&ms); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return ms.Data.ReplicasetGetStatus.MyState, nil
|
||||
}
|
||||
|
||||
func ReplicasetConfig(ctx context.Context, client *mongo.Client) (*proto.ReplicasetConfig, error) {
|
||||
var rs proto.ReplicasetConfig
|
||||
if err := client.Database("admin").RunCommand(ctx, bson.M{"replSetGetConfig": 1}).Decode(&rs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rs, nil
|
||||
}
|
||||
|
||||
func fillProcInfo(pid int32, procInfo *proto.ProcInfo) error {
|
||||
proc, err := process.NewProcess(pid)
|
||||
if err != nil {
|
||||
return errors.Wrapf(ErrCannotGetProcess, "%s", err)
|
||||
}
|
||||
|
||||
ct, err := proc.CreateTime()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
procInfo.CreateTime = time.Unix(ct/milliToSeconds, 0)
|
||||
|
||||
if procInfo.Path, err = proc.Exe(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if procInfo.UserName, err = proc.Username(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getNodeType(ctx context.Context, client *mongo.Client) (string, error) {
|
||||
md := proto.MasterDoc{}
|
||||
if err := client.Database("admin").RunCommand(ctx, primitive.M{"isMaster": 1}).Decode(&md); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if md.SetName != nil || md.Hosts != nil {
|
||||
return TypeShardServer, nil
|
||||
} else if md.Msg == TypeIsDBGrid {
|
||||
// isdbgrid is always the msg value when calling isMaster on a mongos
|
||||
// see http://docs.mongodb.org/manual/core/sharded-cluster-query-router/
|
||||
return TypeMongos, nil
|
||||
}
|
||||
|
||||
return TypeMongod, nil
|
||||
}
|
||||
|
||||
func countMongodProcesses() (int, error) {
|
||||
pids, err := process.Pids()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
count := 0
|
||||
|
||||
for _, pid := range pids {
|
||||
p, err := process.NewProcess(pid)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if name, _ := p.Name(); name == TypeMongod || name == TypeMongos {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
tu "github.com/percona/percona-toolkit/src/go/internal/testutils"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
@@ -20,34 +21,58 @@ func TestGetHostnames(t *testing.T) {
|
||||
wantError bool
|
||||
}{
|
||||
{
|
||||
name: "from_mongos",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s", tu.MongoDBUser, tu.MongoDBPassword, tu.MongoDBHost, tu.MongoDBMongosPort),
|
||||
name: "from_mongos",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s",
|
||||
tu.MongoDBUser,
|
||||
tu.MongoDBPassword,
|
||||
tu.MongoDBHost,
|
||||
tu.MongoDBMongosPort,
|
||||
),
|
||||
want: []string{"127.0.0.1:17001", "127.0.0.1:17002", "127.0.0.1:17004", "127.0.0.1:17005", "127.0.0.1:17007"},
|
||||
wantError: false,
|
||||
},
|
||||
{
|
||||
name: "from_mongod",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s", tu.MongoDBUser, tu.MongoDBPassword, tu.MongoDBHost, tu.MongoDBShard1PrimaryPort),
|
||||
name: "from_mongod",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s",
|
||||
tu.MongoDBUser,
|
||||
tu.MongoDBPassword,
|
||||
tu.MongoDBHost,
|
||||
tu.MongoDBShard1PrimaryPort,
|
||||
),
|
||||
want: []string{"127.0.0.1:17001", "127.0.0.1:17002", "127.0.0.1:17003"},
|
||||
wantError: false,
|
||||
},
|
||||
{
|
||||
name: "from_non_sharded",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s", tu.MongoDBUser, tu.MongoDBPassword, tu.MongoDBHost, tu.MongoDBShard3PrimaryPort),
|
||||
name: "from_non_sharded",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s",
|
||||
tu.MongoDBUser,
|
||||
tu.MongoDBPassword,
|
||||
tu.MongoDBHost,
|
||||
tu.MongoDBShard3PrimaryPort,
|
||||
),
|
||||
want: []string{"127.0.0.1:17021", "127.0.0.1:17022", "127.0.0.1:17023"},
|
||||
wantError: false,
|
||||
},
|
||||
{
|
||||
name: "from_standalone",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s", tu.MongoDBUser, tu.MongoDBPassword, tu.MongoDBHost, tu.MongoDBStandalonePort),
|
||||
name: "from_standalone",
|
||||
uri: fmt.Sprintf("mongodb://%s:%s@%s:%s",
|
||||
tu.MongoDBUser,
|
||||
tu.MongoDBPassword,
|
||||
tu.MongoDBHost,
|
||||
tu.MongoDBStandalonePort,
|
||||
),
|
||||
want: nil,
|
||||
wantError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
uri := test.uri
|
||||
want := test.want
|
||||
wantError := test.wantError
|
||||
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
client, err := mongo.NewClient(options.Client().ApplyURI(test.uri))
|
||||
client, err := mongo.NewClient(options.Client().ApplyURI(uri))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot get a new MongoDB client: %s", err)
|
||||
}
|
||||
@@ -59,12 +84,12 @@ func TestGetHostnames(t *testing.T) {
|
||||
}
|
||||
|
||||
hostnames, err := GetHostnames(ctx, client)
|
||||
if err != nil && !test.wantError {
|
||||
if err != nil && !wantError {
|
||||
t.Errorf("Expecting error=nil, got: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(hostnames, test.want) {
|
||||
t.Errorf("Invalid hostnames. Got: %+v, want %+v", hostnames, test.want)
|
||||
if !reflect.DeepEqual(hostnames, want) {
|
||||
t.Errorf("Invalid hostnames. Got: %+v, want %+v", hostnames, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -194,3 +219,139 @@ func TestGetShardedHosts(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplicasetConfig(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
tcs := []struct {
|
||||
port string
|
||||
wantID string
|
||||
wantConfigServer bool
|
||||
wantError error
|
||||
}{
|
||||
{
|
||||
port: tu.MongoDBStandalonePort,
|
||||
wantID: "",
|
||||
wantConfigServer: false,
|
||||
wantError: mongo.CommandError{
|
||||
Code: 76,
|
||||
Message: "not running with --replSet",
|
||||
Labels: []string(nil),
|
||||
Name: "NoReplicationEnabled",
|
||||
Wrapped: error(nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBMongosPort,
|
||||
wantID: "",
|
||||
wantConfigServer: false,
|
||||
wantError: mongo.CommandError{
|
||||
Code: 59,
|
||||
Message: "no such cmd: replSetGetConfig",
|
||||
Labels: []string(nil),
|
||||
Name: "CommandNotFound",
|
||||
Wrapped: error(nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBShard1PrimaryPort,
|
||||
wantID: "rs1",
|
||||
wantConfigServer: false,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBConfigsvr1Port,
|
||||
wantID: "csReplSet",
|
||||
wantConfigServer: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
client, err := tu.TestClient(ctx, tc.port)
|
||||
assert.NoError(t, err)
|
||||
|
||||
rs, err := ReplicasetConfig(ctx, client)
|
||||
assert.Equal(t, tc.wantError, err, fmt.Sprintf("%v", tc.port))
|
||||
if tc.wantError != nil {
|
||||
continue
|
||||
}
|
||||
assert.Equal(t, tc.wantID, rs.Config.ID)
|
||||
assert.Equal(t, tc.wantConfigServer, rs.Config.ConfigServer)
|
||||
assert.NotEmpty(t, rs.Config.Settings.ReplicaSetID.Hex())
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterID(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tcs := []struct {
|
||||
port string
|
||||
emptyID bool
|
||||
}{
|
||||
{
|
||||
port: tu.MongoDBMongosPort,
|
||||
emptyID: false,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBShard1PrimaryPort,
|
||||
emptyID: false,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBShard1Secondary1Port,
|
||||
emptyID: false,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBConfigsvr1Port,
|
||||
emptyID: false,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBStandalonePort,
|
||||
emptyID: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
client, err := tu.TestClient(ctx, tc.port)
|
||||
assert.NoError(t, err)
|
||||
cid, err := ClusterID(ctx, client)
|
||||
assert.NoError(t, err, fmt.Sprintf("port: %v", tc.port))
|
||||
assert.Equal(t, cid == "", tc.emptyID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMyState(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
tcs := []struct {
|
||||
port string
|
||||
want int
|
||||
}{
|
||||
{
|
||||
port: tu.MongoDBShard1PrimaryPort,
|
||||
want: 1,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBShard1Secondary1Port,
|
||||
want: 2,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBMongosPort,
|
||||
want: 0,
|
||||
},
|
||||
{
|
||||
port: tu.MongoDBStandalonePort,
|
||||
want: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
client, err := tu.TestClient(ctx, tc.port)
|
||||
assert.NoError(t, err)
|
||||
|
||||
state, err := MyState(ctx, client)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.want, state, fmt.Sprintf("port: %v", tc.port))
|
||||
}
|
||||
}
|
||||
|
@@ -6,12 +6,12 @@ cd $BASEDIR
|
||||
|
||||
source ${BASEDIR}/src/go/setenv.sh
|
||||
|
||||
for dir in $(ls -d ./src/go/pt-*)
|
||||
for dir in $(ls -d ./src/go/pt-* ) ./src/go/mongolib
|
||||
do
|
||||
echo "Running tests at $BASEDIR/$dir"
|
||||
cd $BASEDIR/$dir
|
||||
go get ./...
|
||||
go test -v -coverprofile=coverage.out
|
||||
go test -v -coverprofile=coverage.out ./...
|
||||
if [ -f coverage.out ]
|
||||
then
|
||||
go tool cover -func=coverage.out
|
||||
|
Reference in New Issue
Block a user