PT-1741 Migrated Go pt-mongo-tools to new driver

This commit is contained in:
Carlos Salguero
2019-08-02 11:53:39 -03:00
parent e3409c720f
commit c388bbc01c
34 changed files with 1791 additions and 1304 deletions

View File

@@ -2,6 +2,7 @@ package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"html/template"
@@ -19,12 +20,12 @@ import (
"github.com/percona/percona-toolkit/src/go/mongolib/util"
"github.com/percona/percona-toolkit/src/go/pt-mongodb-summary/oplog"
"github.com/percona/percona-toolkit/src/go/pt-mongodb-summary/templates"
"github.com/percona/pmgo"
"github.com/pkg/errors"
"github.com/shirou/gopsutil/process"
log "github.com/sirupsen/logrus"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
@@ -89,8 +90,8 @@ type procInfo struct {
}
type security struct {
Users int
Roles int
Users int64
Roles int64
Auth string
SSL string
BindIP string
@@ -105,9 +106,9 @@ type databases struct {
// Empty bool `bson:"empty"`
// Shards map[string]int64 `bson:"shards"`
} `bson:"databases"`
TotalSize int64 `bson:"totalSize"`
TotalSizeMb int64 `bson:"totalSizeMb"`
OK bool `bson:"ok"`
TotalSize int64 `bson:"totalSize"`
TotalSizeMb int64 `bson:"totalSizeMb"`
OK float64 `bson:"ok"`
}
type clusterwideInfo struct {
@@ -124,7 +125,7 @@ type clusterwideInfo struct {
Chunks []proto.ChunksByCollection
}
type options struct {
type cliOptions struct {
Help bool
Host string
User string
@@ -153,7 +154,6 @@ type collectedInfo struct {
}
func main() {
opts, err := parseFlags()
if err != nil {
log.Errorf("cannot get parameters: %s", err.Error())
@@ -194,48 +194,30 @@ func main() {
}
}
di := &pmgo.DialInfo{
Username: opts.User,
Password: opts.Password,
Addrs: []string{opts.Host},
FailFast: true,
Source: opts.AuthDB,
SSLCAFile: opts.SSLCAFile,
SSLPEMKeyFile: opts.SSLPEMKeyFile,
}
log.Debugf("Connecting to the db using:\n%+v", di)
dialer := pmgo.NewDialer()
hostnames, err := util.GetHostnames(dialer, di)
log.Debugf("hostnames: %v", hostnames)
session, err := dialer.DialWithInfo(di)
ctx := context.Background()
clientOptions := getClientOptions(opts)
client, err := mongo.NewClient(clientOptions)
if err != nil {
message := fmt.Sprintf("Cannot connect to %q", di.Addrs[0])
if di.Username != "" || di.Password != "" {
message += fmt.Sprintf(" using user: %q", di.Username)
if strings.HasPrefix(di.Password, "=") {
message += " (probably you are using = with -p or -u instead of a blank space)"
}
}
message += fmt.Sprintf(". %s", err.Error())
log.Errorf(message)
os.Exit(1)
log.Fatalf("Cannot get a MongoDB client: %s", err)
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
if err := client.Connect(ctx); err != nil {
log.Fatalf("Cannot connect to MongoDB: %s", err)
}
defer client.Disconnect(ctx)
hostnames, err := util.GetHostnames(ctx, client)
log.Debugf("hostnames: %v", hostnames)
ci := &collectedInfo{}
ci.HostInfo, err = getHostinfo(session)
ci.HostInfo, err = getHostInfo(ctx, client)
if err != nil {
message := fmt.Sprintf("Cannot get host info for %q: %s", di.Addrs[0], err.Error())
message := fmt.Sprintf("Cannot get host info for %q: %s", opts.Host, err.Error())
log.Errorf(message)
os.Exit(2)
}
if ci.ReplicaMembers, err = util.GetReplicasetMembers(dialer, di); err != nil {
if ci.ReplicaMembers, err = util.GetReplicasetMembers(ctx, clientOptions); err != nil {
log.Warnf("[Error] cannot get replicaset members: %v\n", err)
os.Exit(2)
}
@@ -243,8 +225,7 @@ func main() {
if opts.RunningOpsSamples > 0 && opts.RunningOpsInterval > 0 {
ci.RunningOps, err = getOpCountersStats(
session,
opts.RunningOpsSamples,
ctx, client, opts.RunningOpsSamples,
time.Duration(opts.RunningOpsInterval)*time.Millisecond,
)
if err != nil {
@@ -253,14 +234,14 @@ func main() {
}
if ci.HostInfo != nil {
if ci.SecuritySettings, err = getSecuritySettings(session, ci.HostInfo.Version); err != nil {
if ci.SecuritySettings, err = getSecuritySettings(ctx, client, ci.HostInfo.Version); err != nil {
log.Errorf("[Error] cannot get security settings: %v\n", err)
}
} else {
log.Warn("Cannot check security settings since host info is not available (permissions?)")
}
if ci.OplogInfo, err = oplog.GetOplogInfo(hostnames, di); err != nil {
if ci.OplogInfo, err = oplog.GetOplogInfo(ctx, hostnames, clientOptions); err != nil {
log.Infof("Cannot get Oplog info: %s\n", err)
} else {
if len(ci.OplogInfo) == 0 {
@@ -272,13 +253,13 @@ func main() {
// individual servers won't know about this info
if ci.HostInfo.NodeType == typeMongos {
if ci.ClusterWideInfo, err = getClusterwideInfo(session); err != nil {
if ci.ClusterWideInfo, err = getClusterwideInfo(ctx, client); err != nil {
log.Printf("[Error] cannot get cluster wide info: %v\n", err)
}
}
if ci.HostInfo.NodeType == typeMongos {
if ci.BalancerStats, err = GetBalancerStats(session); err != nil {
if ci.BalancerStats, err = GetBalancerStats(ctx, client); err != nil {
log.Printf("[Error] cannot get balancer stats: %v\n", err)
}
}
@@ -306,48 +287,63 @@ func formatResults(ci *collectedInfo, format string) ([]byte, error) {
buf = new(bytes.Buffer)
t := template.Must(template.New("replicas").Parse(templates.Replicas))
t.Execute(buf, ci.ReplicaMembers)
if err := t.Execute(buf, ci.ReplicaMembers); err != nil {
return nil, errors.Wrap(err, "cannnot parse replicas section of the output template")
}
t = template.Must(template.New("hosttemplateData").Parse(templates.HostInfo))
t.Execute(buf, ci.HostInfo)
if err := t.Execute(buf, ci.HostInfo); err != nil {
return nil, errors.Wrap(err, "cannnot parse hosttemplateData section of the output template")
}
t = template.Must(template.New("runningOps").Parse(templates.RunningOps))
t.Execute(buf, ci.RunningOps)
if err := t.Execute(buf, ci.RunningOps); err != nil {
return nil, errors.Wrap(err, "cannnot parse runningOps section of the output template")
}
t = template.Must(template.New("ssl").Parse(templates.Security))
t.Execute(buf, ci.SecuritySettings)
if err := t.Execute(buf, ci.SecuritySettings); err != nil {
return nil, errors.Wrap(err, "cannnot parse ssl section of the output template")
}
if ci.OplogInfo != nil && len(ci.OplogInfo) > 0 {
t = template.Must(template.New("oplogInfo").Parse(templates.Oplog))
t.Execute(buf, ci.OplogInfo[0])
if err := t.Execute(buf, ci.OplogInfo[0]); err != nil {
return nil, errors.Wrap(err, "cannnot parse oplogInfo section of the output template")
}
}
t = template.Must(template.New("clusterwide").Parse(templates.Clusterwide))
t.Execute(buf, ci.ClusterWideInfo)
if err := t.Execute(buf, ci.ClusterWideInfo); err != nil {
return nil, errors.Wrap(err, "cannnot parse clusterwide section of the output template")
}
t = template.Must(template.New("balancer").Parse(templates.BalancerStats))
t.Execute(buf, ci.BalancerStats)
if err := t.Execute(buf, ci.BalancerStats); err != nil {
return nil, errors.Wrap(err, "cannnot parse balancer section of the output template")
}
}
return buf.Bytes(), nil
}
func getHostinfo(session pmgo.SessionManager) (*hostInfo, error) {
func getHostInfo(ctx context.Context, client *mongo.Client) (*hostInfo, error) {
hi := proto.HostInfo{}
if err := session.Run(bson.M{"hostInfo": 1}, &hi); err != nil {
log.Debugf("run('hostInfo') error: %s", err.Error())
if err := client.Database("admin").RunCommand(ctx, primitive.M{"hostInfo": 1}).Decode(&hi); err != nil {
log.Debugf("run('hostInfo') error: %s", err)
return nil, errors.Wrap(err, "GetHostInfo.hostInfo")
}
cmdOpts := proto.CommandLineOptions{}
err := session.DB("admin").Run(bson.D{{"getCmdLineOpts", 1}, {"recordStats", 1}}, &cmdOpts)
query := primitive.D{{Key: "getCmdLineOpts", Value: 1}, {Key: "recordStats", Value: 1}}
err := client.Database("admin").RunCommand(ctx, query).Decode(&cmdOpts)
if err != nil {
return nil, errors.Wrap(err, "cannot get command line options")
}
ss := proto.ServerStatus{}
if err := session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, &ss); err != nil {
query = primitive.D{{Key: "serverStatus", Value: 1}, {Key: "recordStats", Value: 1}}
if err := client.Database("admin").RunCommand(ctx, query).Decode(&ss); err != nil {
return nil, errors.Wrap(err, "GetHostInfo.serverStatus")
}
@@ -356,7 +352,7 @@ func getHostinfo(session pmgo.SessionManager) (*hostInfo, error) {
pi.Error = err
}
nodeType, _ := getNodeType(session)
nodeType, _ := getNodeType(ctx, client)
procCount, _ := countMongodProcesses()
i := &hostInfo{
@@ -403,10 +399,10 @@ func countMongodProcesses() (int, error) {
return count, nil
}
func getClusterwideInfo(session pmgo.SessionManager) (*clusterwideInfo, error) {
func getClusterwideInfo(ctx context.Context, client *mongo.Client) (*clusterwideInfo, error) {
var databases databases
err := session.Run(bson.M{"listDatabases": 1}, &databases)
err := client.Database("admin").RunCommand(ctx, primitive.M{"listDatabases": 1}).Decode(&databases)
if err != nil {
return nil, errors.Wrap(err, "getClusterwideInfo.listDatabases ")
}
@@ -416,19 +412,24 @@ func getClusterwideInfo(session pmgo.SessionManager) (*clusterwideInfo, error) {
}
for _, db := range databases.Databases {
collections, err := session.DB(db.Name).CollectionNames()
cursor, err := client.Database(db.Name).ListCollections(ctx, primitive.M{})
if err != nil {
continue
}
cwi.TotalCollectionsCount += len(collections)
for _, collName := range collections {
var collStats proto.CollStats
err := session.DB(db.Name).Run(bson.M{"collStats": collName}, &collStats)
if err != nil {
continue
for cursor.Next(ctx) {
c := proto.CollectionEntry{}
if err := cursor.Decode(&c); err != nil {
return nil, errors.Wrap(err, "cannot decode ListCollections doc")
}
var collStats proto.CollStats
err := client.Database(db.Name).RunCommand(ctx, primitive.M{"collStats": c.Name}).Decode(&collStats)
if err != nil {
return nil, errors.Wrapf(err, "cannot get info for collection %s.%s", db.Name, c.Name)
}
cwi.TotalCollectionsCount++
if collStats.Sharded {
cwi.ShardedDataSize += collStats.Size
cwi.ShardedColsCount++
@@ -438,14 +439,16 @@ func getClusterwideInfo(session pmgo.SessionManager) (*clusterwideInfo, error) {
cwi.UnshardedDataSize += collStats.Size
cwi.UnshardedColsCount++
}
}
cwi.UnshardedColsCount = cwi.TotalCollectionsCount - cwi.ShardedColsCount
cwi.ShardedDataSizeScaled, cwi.ShardedDataSizeScale = sizeAndUnit(cwi.ShardedDataSize)
cwi.UnshardedDataSizeScaled, cwi.UnshardedDataSizeScale = sizeAndUnit(cwi.UnshardedDataSize)
cwi.Chunks, _ = getChunksCount(session)
cwi.Chunks, err = getChunksCount(ctx, client)
if err != nil {
return nil, errors.Wrap(err, "cannot get chunks information")
}
return cwi, nil
}
@@ -462,7 +465,7 @@ func sizeAndUnit(size int64) (float64, string) {
return newSize, unit[idx]
}
func getSecuritySettings(session pmgo.SessionManager, ver string) (*security, error) {
func getSecuritySettings(ctx context.Context, client *mongo.Client, ver string) (*security, error) {
s := security{
Auth: "disabled",
SSL: "disabled",
@@ -476,7 +479,7 @@ func getSecuritySettings(session pmgo.SessionManager, ver string) (*security, er
}
cmdOpts := proto.CommandLineOptions{}
err = session.DB("admin").Run(bson.D{{"getCmdLineOpts", 1}, {"recordStats", 1}}, &cmdOpts)
err = client.Database("admin").RunCommand(ctx, primitive.D{{"getCmdLineOpts", 1}, {"recordStats", 1}}).Decode(&cmdOpts)
if err != nil {
return nil, errors.Wrap(err, "cannot get command line options")
}
@@ -525,17 +528,8 @@ func getSecuritySettings(session pmgo.SessionManager, ver string) (*security, er
}
}
// On some servers, like a mongos with config servers, this fails if session mode is Monotonic
// On some other servers like a secondary in a replica set, this fails if the session mode is Strong.
// Lets try both
newSession := session.Clone()
defer newSession.Close()
newSession.SetMode(mgo.Strong, true)
if s.Users, s.Roles, err = getUserRolesCount(newSession); err != nil {
newSession.SetMode(mgo.Monotonic, true)
if s.Users, s.Roles, err = getUserRolesCount(newSession); err != nil {
if s.Users, s.Roles, err = getUserRolesCount(ctx, client); err != nil {
if s.Users, s.Roles, err = getUserRolesCount(ctx, client); err != nil {
return nil, errors.Wrap(err, "cannot get security settings.")
}
}
@@ -543,23 +537,22 @@ func getSecuritySettings(session pmgo.SessionManager, ver string) (*security, er
return &s, nil
}
func getUserRolesCount(session pmgo.SessionManager) (int, int, error) {
users, err := session.DB("admin").C("system.users").Count()
func getUserRolesCount(ctx context.Context, client *mongo.Client) (int64, int64, error) {
users, err := client.Database("admin").Collection("system.users").CountDocuments(ctx, primitive.M{})
if err != nil {
return 0, 0, errors.Wrap(err, "cannot get users count")
}
roles, err := session.DB("admin").C("system.roles").Count()
roles, err := client.Database("admin").Collection("system.roles").CountDocuments(ctx, primitive.M{})
if err != nil {
return 0, 0, errors.Wrap(err, "cannot get roles count")
}
return users, roles, nil
}
func getNodeType(session pmgo.SessionManager) (string, error) {
func getNodeType(ctx context.Context, client *mongo.Client) (string, error) {
md := proto.MasterDoc{}
err := session.Run("isMaster", &md)
if err != nil {
if err := client.Database("admin").RunCommand(ctx, primitive.M{"isMaster": 1}).Decode(&md); err != nil {
return "", err
}
@@ -573,7 +566,7 @@ func getNodeType(session pmgo.SessionManager) (string, error) {
return "mongod", nil
}
func getOpCountersStats(session pmgo.SessionManager, count int, sleep time.Duration) (*opCounters, error) {
func getOpCountersStats(ctx context.Context, client *mongo.Client, count int, sleep time.Duration) (*opCounters, error) {
oc := &opCounters{}
prevOpCount := &opCounters{}
ss := proto.ServerStatus{}
@@ -585,7 +578,7 @@ func getOpCountersStats(session pmgo.SessionManager, count int, sleep time.Durat
// count + 1 because we need 1st reading to stablish a base to measure variation
for i := 0; i < count+1; i++ {
<-ticker.C
err := session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, &ss)
err := client.Database("admin").RunCommand(ctx, primitive.D{{"serverStatus", 1}, {"recordStats", 1}}).Decode(&ss)
if err != nil {
return nil, err
}
@@ -729,38 +722,8 @@ func getProcInfo(pid int32, templateData *procInfo) error {
return nil
}
func getDbsAndCollectionsCount(hostnames []string) (int, int, error) {
dbnames := make(map[string]bool)
colnames := make(map[string]bool)
for _, hostname := range hostnames {
session, err := mgo.Dial(hostname)
if err != nil {
continue
}
dbs, err := session.DatabaseNames()
if err != nil {
continue
}
for _, dbname := range dbs {
dbnames[dbname] = true
cols, err := session.DB(dbname).CollectionNames()
if err != nil {
continue
}
for _, colname := range cols {
colnames[dbname+"."+colname] = true
}
}
}
return len(dbnames), len(colnames), nil
}
func GetBalancerStats(session pmgo.SessionManager) (*proto.BalancerStats, error) {
scs, err := GetShardingChangelogStatus(session)
func GetBalancerStats(ctx context.Context, client *mongo.Client) (*proto.BalancerStats, error) {
scs, err := GetShardingChangelogStatus(ctx, client)
if err != nil {
return nil, err
}
@@ -788,16 +751,25 @@ func GetBalancerStats(session pmgo.SessionManager) (*proto.BalancerStats, error)
return s, nil
}
func GetShardingChangelogStatus(session pmgo.SessionManager) (*proto.ShardingChangelogStats, error) {
func GetShardingChangelogStatus(ctx context.Context, client *mongo.Client) (*proto.ShardingChangelogStats, error) {
var qresults []proto.ShardingChangelogSummary
coll := session.DB("config").C("changelog")
match := bson.M{"time": bson.M{"$gt": time.Now().Add(-240 * time.Hour)}}
group := bson.M{"_id": bson.M{"event": "$what", "note": "$details.note"}, "count": bson.M{"$sum": 1}}
coll := client.Database("config").Collection("changelog")
match := primitive.M{"time": primitive.M{"$gt": time.Now().Add(-240 * time.Hour)}}
group := primitive.M{"_id": primitive.M{"event": "$what", "note": "$details.note"}, "count": primitive.M{"$sum": 1}}
err := coll.Pipe([]bson.M{{"$match": match}, {"$group": group}}).All(&qresults)
cursor, err := coll.Aggregate(ctx, []primitive.M{{"$match": match}, {"$group": group}})
if err != nil {
return nil, errors.Wrap(err, "GetShardingChangelogStatus.changelog.find")
}
defer cursor.Close(ctx)
for cursor.Next(ctx) {
res := proto.ShardingChangelogSummary{}
if err := cursor.Decode(&res); err != nil {
return nil, errors.Wrap(err, "cannot decode GetShardingChangelogStatus")
}
qresults = append(qresults, res)
}
return &proto.ShardingChangelogStats{
Items: &qresults,
@@ -863,8 +835,8 @@ func externalIP() (string, error) {
return "", errors.New("are you connected to the network?")
}
func parseFlags() (*options, error) {
opts := &options{
func parseFlags() (*cliOptions, error) {
opts := &cliOptions{
Host: DefaultHost,
LogLevel: DefaultLogLevel,
RunningOpsSamples: DefaultRunningOpsSamples,
@@ -906,6 +878,7 @@ func parseFlags() (*options, error) {
opts.Host = gop.Arg(0)
gop.Parse(gop.Args())
}
if gop.IsSet("password") && opts.Password == "" {
print("Password: ")
pass, err := gopass.GetPasswd()
@@ -914,6 +887,9 @@ func parseFlags() (*options, error) {
}
opts.Password = string(pass)
}
if !strings.HasPrefix(opts.Host, "mongodb://") {
opts.Host = "mongodb://" + opts.Host
}
if opts.Help {
gop.PrintUsage(os.Stdout)
return nil, nil
@@ -925,16 +901,38 @@ func parseFlags() (*options, error) {
return opts, nil
}
func getChunksCount(session pmgo.SessionManager) ([]proto.ChunksByCollection, error) {
func getChunksCount(ctx context.Context, client *mongo.Client) ([]proto.ChunksByCollection, error) {
var result []proto.ChunksByCollection
c := session.DB("config").C("chunks")
query := bson.M{"$group": bson.M{"_id": "$ns", "count": bson.M{"$sum": 1}}}
c := client.Database("config").Collection("chunks")
query := primitive.M{"$group": primitive.M{"_id": "$ns", "count": primitive.M{"$sum": 1}}}
// db.getSiblingDB('config').chunks.aggregate({$group:{_id:"$ns",count:{$sum:1}}})
err := c.Pipe([]bson.M{query}).All(&result)
cursor, err := c.Aggregate(ctx, []primitive.M{query})
if err != nil {
return nil, err
}
for cursor.Next(ctx) {
res := proto.ChunksByCollection{}
if err := cursor.Decode(&res); err != nil {
return nil, errors.Wrap(err, "cannot decode chunks aggregation")
}
result = append(result, res)
}
return result, nil
}
func getClientOptions(opts *cliOptions) *options.ClientOptions {
clientOptions := options.Client().ApplyURI(opts.Host)
credential := options.Credential{}
if opts.User != "" {
credential.Username = opts.User
clientOptions.SetAuth(credential)
}
if opts.Password != "" {
credential.Password = opts.Password
credential.PasswordSet = true
clientOptions.SetAuth(credential)
}
return clientOptions
}