Fixes for PT-61 & PT-62

This commit is contained in:
Carlos Salguero
2017-01-30 14:51:16 -03:00
parent cf06b4bd20
commit 774178d3af
9 changed files with 304 additions and 124 deletions

View File

@@ -0,0 +1,6 @@
package proto
type ShardsMap struct {
Map map[string]string `bson:"map"`
OK int `bson:"ok"`
}

View File

@@ -1,15 +1,28 @@
package proto
const (
REPLICA_SET_MEMBER_STARTUP = iota
REPLICA_SET_MEMBER_PRIMARY
REPLICA_SET_MEMBER_SECONDARY
REPLICA_SET_MEMBER_RECOVERING
REPLICA_SET_MEMBER_STARTUP2
REPLICA_SET_MEMBER_UNKNOWN
REPLICA_SET_MEMBER_ARBITER
REPLICA_SET_MEMBER_DOWN
REPLICA_SET_MEMBER_ROOLBACK
REPLICA_SET_MEMBER_REMOVED
)
type Optime struct {
Ts float64 `bson:"ts"` // the Timestamp of the last operation applied to this member of the replica set from the oplog.
T float64 `bson:"t"` //the term in which the last applied operation was originally generated on the primary.
Ts float64 `bson:"ts"` // The Timestamp of the last operation applied to this member of the replica set from the oplog.
T float64 `bson:"t"` // The term in which the last applied operation was originally generated on the primary.
}
type Members struct {
Optime *Optime `bson:"optime"` // See Optime struct
OptimeDate string `bson:"optimeDate"` //the last entry from the oplog that this member applied.
OptimeDate string `bson:"optimeDate"` // The last entry from the oplog that this member applied.
InfoMessage string `bson:"infoMessage"` // A message
Id int64 `bson:"_id"` // Server ID
ID int64 `bson:"_id"` // Server ID
Name string `bson:"name"` // server name
Health float64 `bson:"health"` // This field conveys if the member is up (i.e. 1) or down (i.e. 0).
StateStr string `bson:"stateStr"` // A string that describes state.
@@ -26,7 +39,7 @@ type Members struct {
// Struct for replSetGetStatus
type ReplicaSetStatus struct {
Date string `bson:"date"` // Current date
MyState float64 `bson:"myState"` // integer between 0 and 10 that represents the replica state of the current member
MyState float64 `bson:"myState"` // Integer between 0 and 10 that represents the replica state of the current member
Term float64 `bson:"term"` // The election count for the replica set, as known to this replica set member. Mongo 3.2+
HeartbeatIntervalMillis float64 `bson:"heartbeatIntervalMillis"` // The frequency in milliseconds of the heartbeats. 3.2+
Members []Members `bson:"members"` //

View File

@@ -0,0 +1,200 @@
package util
import (
"fmt"
"sort"
"strings"
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
"github.com/percona/pmgo"
"github.com/pkg/errors"
mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
func GetReplicasetMembersNew(dialer pmgo.Dialer, di *mgo.DialInfo) ([]proto.Members, error) {
hostnames, err := GetHostnames(dialer, di)
if err != nil {
return nil, err
}
replicaMembers := []proto.Members{}
for _, hostname := range hostnames {
if serverStatus, err := GetServerStatus(dialer, di, hostname); err == nil {
m := proto.Members{
ID: serverStatus.Pid,
Name: hostname,
StorageEngine: serverStatus.StorageEngine,
Set: serverStatus.Repl.SetName,
}
if serverStatus.Repl.IsMaster != nil && serverStatus.Repl.IsMaster.(bool) {
m.StateStr = "PRIMARY"
}
if serverStatus.Repl.Secondary != nil && serverStatus.Repl.Secondary.(bool) {
m.StateStr = "SECONDARY"
}
replicaMembers = append(replicaMembers, m)
}
}
return replicaMembers, nil
}
func GetReplicasetMembers(dialer pmgo.Dialer, di *mgo.DialInfo) ([]proto.Members, error) {
hostnames, err := GetHostnames(dialer, di)
if err != nil {
return nil, err
}
membersMap := make(map[string]proto.Members)
members := []proto.Members{}
for _, hostname := range hostnames {
tmpdi := *di
tmpdi.Addrs = []string{hostname}
session, err := dialer.DialWithInfo(&tmpdi)
if err != nil {
return nil, errors.Wrapf(err, "getReplicasetMembers. cannot connect to %s", hostname)
}
cmdOpts := proto.CommandLineOptions{}
session.DB("admin").Run(bson.D{{"getCmdLineOpts", 1}, {"recordStats", 1}}, &cmdOpts)
rss := proto.ReplicaSetStatus{}
if err = session.Run(bson.M{"replSetGetStatus": 1}, &rss); err != nil {
m := proto.Members{
Name: hostname,
}
m.StateStr = cmdOpts.Parsed.Sharding.ClusterRole
if serverStatus, err := GetServerStatus(dialer, di, m.Name); err == nil {
m.ID = serverStatus.Pid
m.StorageEngine = serverStatus.StorageEngine
}
membersMap[m.Name] = m
continue // If a host is a mongos we cannot get info but is not a real error
}
for _, m := range rss.Members {
if _, ok := membersMap[m.Name]; ok {
continue // already exists
}
m.Set = rss.Set
if serverStatus, err := GetServerStatus(dialer, di, m.Name); err == nil {
m.ID = serverStatus.Pid
m.StorageEngine = serverStatus.StorageEngine
m.StateStr = cmdOpts.Parsed.Sharding.ClusterRole + "/" + m.StateStr
}
membersMap[m.Name] = m
}
session.Close()
}
for _, member := range membersMap {
members = append(members, member)
}
sort.Slice(members, func(i, j int) bool { return members[i].Name < members[j].Name })
return members, nil
}
func GetHostnames(dialer pmgo.Dialer, di *mgo.DialInfo) ([]string, error) {
hostnames := []string{di.Addrs[0]}
session, err := dialer.DialWithInfo(di)
if err != nil {
return hostnames, err
}
defer session.Close()
var shardsMap proto.ShardsMap
err = session.Run("getShardMap", &shardsMap)
if err != nil {
return hostnames, errors.Wrap(err, "cannot list shards")
}
/* Example
mongos> db.getSiblingDB('admin').runCommand('getShardMap')
{
"map" : {
"config" : "localhost:19001,localhost:19002,localhost:19003",
"localhost:17001" : "r1/localhost:17001,localhost:17002,localhost:17003",
"r1" : "r1/localhost:17001,localhost:17002,localhost:17003",
"r1/localhost:17001,localhost:17002,localhost:17003" : "r1/localhost:17001,localhost:17002,localhost:17003",
},
"ok" : 1
}
*/
hm := make(map[string]bool)
if shardsMap.Map != nil {
for _, val := range shardsMap.Map {
m := strings.Split(val, "/")
hostsStr := ""
switch len(m) {
case 1:
hostsStr = m[0] // there is no / in the hosts list
case 2:
hostsStr = m[1] // there is a / in the string. Remove the prefix until the / and keep the rest
}
// since there is no Sets in Go, build a map where the value is the map key
hosts := strings.Split(hostsStr, ",")
for _, host := range hosts {
hm[host] = false
}
}
hostnames = []string{} // re-init because it has di.Addr[0]
for host := range hm {
hostnames = append(hostnames, host)
}
}
return hostnames, nil
}
func GetHostnamesOld(dialer pmgo.Dialer, di *mgo.DialInfo) ([]string, error) {
hostnames := []string{di.Addrs[0]}
session, err := dialer.DialWithInfo(di)
if err != nil {
return hostnames, err
}
defer session.Close()
shardsInfo := &proto.ShardsInfo{}
err = session.Run("listShards", shardsInfo)
if err != nil {
return hostnames, errors.Wrap(err, "cannot list shards")
}
if shardsInfo != nil {
for _, shardInfo := range shardsInfo.Shards {
m := strings.Split(shardInfo.Host, "/")
h := strings.Split(m[1], ",")
hostnames = append(hostnames, h[0])
}
}
return hostnames, nil
}
func GetServerStatus(dialer pmgo.Dialer, di *mgo.DialInfo, hostname string) (proto.ServerStatus, error) {
ss := proto.ServerStatus{}
tmpdi := *di
tmpdi.Addrs = []string{hostname}
// tmpdi.Direct = true
// tmpdi.Timeout = 5 * time.Second
// tmpdi.FailFast = false
session, err := dialer.DialWithInfo(&tmpdi)
if err != nil {
fmt.Printf("error %s\n", err.Error())
return ss, errors.Wrapf(err, "getReplicasetMembers. cannot connect to %s", hostname)
}
defer session.Close()
if err := session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, &ss); err != nil {
fmt.Printf("error 2%s\n", err.Error())
return ss, errors.Wrap(err, "GetHostInfo.serverStatus")
}
return ss, nil
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/percona/percona-toolkit/src/go/lib/config"
"github.com/percona/percona-toolkit/src/go/lib/versioncheck"
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
"github.com/percona/percona-toolkit/src/go/mongolib/util"
"github.com/percona/pmgo"
log "github.com/sirupsen/logrus"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
@@ -162,29 +164,31 @@ func main() {
os.Exit(2)
}
session, err := mgo.DialWithInfo(di)
dialer := pmgo.NewDialer()
session, err := dialer.DialWithInfo(di)
if err != nil {
log.Printf("error connecting to the db %s", err)
os.Exit(3)
}
var ps proto.ProfilerStatus
if err := session.DB(di.Database).Run(bson.M{"profile": -1}, &ps); err != nil {
isProfilerEnabled, err := isProfilerEnabled(dialer, di)
if err != nil {
log.Errorf("Cannot get profiler status: %s", err.Error())
os.Exit(2)
os.Exit(4)
}
if ps.Was == 0 {
log.Errorf("Profiler is not enabled for the %s database", di.Database)
os.Exit(3)
if isProfilerEnabled == false {
log.Errorf("Cannot get profiler status: %s", err.Error())
os.Exit(5)
}
i := session.DB(di.Database).C("system.profile").Find(bson.M{"op": bson.M{"$nin": []string{"getmore", "delete"}}}).Sort("-$natural").Iter()
queries := sortQueries(getData(i), opts.OrderBy)
pretty.Print(queries)
uptime := uptime(session)
queryTotals := aggregateQueryStats(queries, uptime)
queryTotals := calcTotalQueryStats(queries, uptime)
tt, _ := template.New("query").Funcs(template.FuncMap{
"Format": format,
}).Parse(getTotalsTemplate())
@@ -228,7 +232,7 @@ func format(val float64, size float64) string {
return fmt.Sprintf("%s%s", fval, unit)
}
func uptime(session *mgo.Session) int64 {
func uptime(session pmgo.SessionManager) int64 {
ss := proto.ServerStatus{}
if err := session.Ping(); err != nil {
return 0
@@ -240,7 +244,7 @@ func uptime(session *mgo.Session) int64 {
return ss.Uptime
}
func aggregateQueryStats(queries []stat, uptime int64) queryInfo {
func calcTotalQueryStats(queries []stat, uptime int64) queryInfo {
qi := queryInfo{}
qs := stat{}
_, totalScanned, totalReturned, totalQueryTime, totalBytes := calcTotals(queries)
@@ -251,11 +255,11 @@ func aggregateQueryStats(queries []stat, uptime int64) queryInfo {
qs.ResponseLength = append(qs.ResponseLength, query.ResponseLength...)
qi.Count += query.Count
}
qi.Scanned = calcStats(qs.NScanned)
qi.Returned = calcStats(qs.NReturned)
qi.QueryTime = calcStats(qs.QueryTime)
qi.ResponseLength = calcStats(qs.ResponseLength)
qi.QPS = float64(int64(qs.Count) / uptime)
if totalScanned > 0 {
qi.Scanned.Pct = qi.Scanned.Total * 100 / totalScanned
@@ -292,8 +296,9 @@ func calcQueryStats(queries []stat, uptime int64) []queryInfo {
FirstSeen: query.FirstSeen,
LastSeen: query.LastSeen,
Namespace: query.Namespace,
QPS: float64(int64(query.Count) / uptime),
QPS: float64(query.Count) / float64(uptime),
}
fmt.Printf("QPS>> query.Count: %v, uptime: %v, QPS: %v\n", query.Count, uptime, qi.QPS)
if totalScanned > 0 {
qi.Scanned.Pct = qi.Scanned.Total * 100 / totalScanned
}
@@ -698,3 +703,26 @@ func sortQueries(queries []stat, orderby []string) []stat {
return queries
}
func isProfilerEnabled(dialer pmgo.Dialer, di *mgo.DialInfo) (bool, error) {
session, err := dialer.DialWithInfo(di)
if err != nil {
return false, fmt.Errorf("error connecting to the db %s", err)
}
var ps proto.ProfilerStatus
replicaMembers, err := util.GetReplicasetMembers(dialer, di)
if err != nil {
return false, err
}
for _, member := range replicaMembers {
if member.State == proto.REPLICA_SET_MEMBER_PRIMARY {
if err := session.DB(di.Database).Run(bson.M{"profile": -1}, &ps); err == nil {
if ps.Was == 0 {
return false, nil
}
}
}
}
return true, nil
}

View File

@@ -12,9 +12,9 @@ import (
"github.com/howeyc/gopass"
"github.com/pborman/getopt"
"github.com/percona/percona-toolkit/src/go/lib/config"
"github.com/percona/percona-toolkit/src/go/lib/util"
"github.com/percona/percona-toolkit/src/go/lib/versioncheck"
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
"github.com/percona/percona-toolkit/src/go/mongolib/util"
"github.com/percona/percona-toolkit/src/go/pt-mongodb-summary/oplog"
"github.com/percona/percona-toolkit/src/go/pt-mongodb-summary/templates"
"github.com/percona/pmgo"
@@ -115,26 +115,42 @@ type clusterwideInfo struct {
}
type options struct {
Host string
User string
Password string
AuthDB string
LogLevel string
Version bool
NoVersionCheck bool
Host string
User string
Password string
AuthDB string
LogLevel string
Version bool
NoVersionCheck bool
NoRunningOps bool
RunningOpsSamples int
RunningOpsInterval int
}
func main() {
opts := options{Host: "localhost:27017", LogLevel: "error"}
opts := options{
Host: "localhost:27017",
LogLevel: "error",
RunningOpsSamples: 5,
RunningOpsInterval: 1000, // milliseconds
}
help := getopt.BoolLong("help", '?', "Show help")
getopt.BoolVarLong(&opts.Version, "version", 'v', "", "Show version & exit")
getopt.BoolVarLong(&opts.NoVersionCheck, "no-version-check", 'c', "", "Don't check for updates")
getopt.StringVarLong(&opts.User, "user", 'u', "", "User name")
getopt.StringVarLong(&opts.Password, "password", 'p', "", "Password").SetOptional()
getopt.StringVarLong(&opts.AuthDB, "authenticationDatabase", 'a', "admin", "Database used to establish credentials and privileges with a MongoDB server")
getopt.StringVarLong(&opts.AuthDB, "authenticationDatabase", 'a', "admin",
"Database used to establish credentials and privileges with a MongoDB server")
getopt.StringVarLong(&opts.LogLevel, "log-level", 'l', "error", "Log level:, panic, fatal, error, warn, info, debug")
getopt.IntVarLong(&opts.RunningOpsSamples, "running-ops-samples", 's',
fmt.Sprintf("Number of samples to collect for running ops. Default: %d", opts.RunningOpsSamples))
getopt.IntVarLong(&opts.RunningOpsInterval, "running-ops-interval", 'i',
fmt.Sprintf("Interval to wait betwwen running ops samples in milliseconds. Default %d milliseconds", opts.RunningOpsInterval))
getopt.SetParameters("host[:port]")
getopt.Parse()
@@ -195,7 +211,7 @@ func main() {
log.Debugf("Connecting to the db using:\n%+v", di)
dialer := pmgo.NewDialer()
hostnames, err := getHostnames(dialer, di)
hostnames, err := util.GetHostnames(dialer, di)
log.Debugf("hostnames: %v", hostnames)
session, err := dialer.DialWithInfo(di)
@@ -205,7 +221,7 @@ func main() {
}
defer session.Close()
if replicaMembers, err := GetReplicasetMembers(dialer, hostnames, di); err != nil {
if replicaMembers, err := util.GetReplicasetMembers(dialer, di); err != nil {
log.Printf("[Error] cannot get replicaset members: %v\n", err)
} else {
log.Debugf("replicaMembers:\n%+v\n", replicaMembers)
@@ -224,13 +240,13 @@ func main() {
t.Execute(os.Stdout, hostInfo)
}
var sampleCount int64 = 5
var sampleRate time.Duration = 1 * time.Second // in seconds
if rops, err := GetOpCountersStats(session, sampleCount, sampleRate); err != nil {
log.Printf("[Error] cannot get Opcounters stats: %v\n", err)
} else {
t := template.Must(template.New("runningOps").Parse(templates.RunningOps))
t.Execute(os.Stdout, rops)
if opts.RunningOpsSamples > 0 {
if rops, err := GetOpCountersStats(session, opts.RunningOpsSamples, time.Duration(opts.RunningOpsInterval)*time.Millisecond); err != nil {
log.Printf("[Error] cannot get Opcounters stats: %v\n", err)
} else {
t := template.Must(template.New("runningOps").Parse(templates.RunningOps))
t.Execute(os.Stdout, rops)
}
}
if security, err := GetSecuritySettings(session, hostInfo.Version); err != nil {
@@ -335,33 +351,6 @@ func countMongodProcesses() (int, error) {
return count, nil
}
func getHostnames(dialer pmgo.Dialer, di *mgo.DialInfo) ([]string, error) {
hostnames := []string{di.Addrs[0]}
session, err := dialer.DialWithInfo(di)
if err != nil {
return hostnames, err
}
defer session.Close()
shardsInfo := &proto.ShardsInfo{}
log.Debugf("Running 'listShards' command")
err = session.Run("listShards", shardsInfo)
if err != nil {
return hostnames, errors.Wrap(err, "cannot list shards")
}
log.Debugf("listShards raw response: %+v", util.Pretty(shardsInfo))
if shardsInfo != nil {
for _, shardInfo := range shardsInfo.Shards {
m := strings.Split(shardInfo.Host, "/")
h := strings.Split(m[1], ",")
hostnames = append(hostnames, h[0])
}
}
return hostnames, nil
}
func GetClusterwideInfo(session pmgo.SessionManager) (*clusterwideInfo, error) {
var databases databases
@@ -418,63 +407,6 @@ func sizeAndUnit(size int64) (float64, string) {
return newSize, unit[idx]
}
func GetReplicasetMembers(dialer pmgo.Dialer, hostnames []string, di *mgo.DialInfo) ([]proto.Members, error) {
replicaMembers := []proto.Members{}
log.Debugf("hostnames: %+v", hostnames)
for _, hostname := range hostnames {
tmpdi := *di
tmpdi.Addrs = []string{hostname}
log.Debugf("GetReplicasetMembers connecting to %s", hostname)
session, err := dialer.DialWithInfo(&tmpdi)
if err != nil {
log.Debugf("getReplicasetMembers. cannot connect to %s: %s", hostname, err.Error())
return nil, errors.Wrapf(err, "getReplicasetMembers. cannot connect to %s", hostname)
}
rss := proto.ReplicaSetStatus{}
err = session.Run(bson.M{"replSetGetStatus": 1}, &rss)
if err != nil {
log.Debugf("error in replSetGetStatus on host %s: %s", hostname, err.Error())
continue // If a host is a mongos we cannot get info but is not a real error
}
log.Debugf("replSetGetStatus result:\n%#v", rss)
for _, m := range rss.Members {
m.Set = rss.Set
if serverStatus, err := getServerStatus(dialer, di, m.Name); err == nil {
m.StorageEngine = serverStatus.StorageEngine
} else {
log.Warnf("getReplicasetMembers. cannot get server status: %v", err.Error())
}
replicaMembers = append(replicaMembers, m)
}
session.Close()
}
return replicaMembers, nil
}
func getServerStatus(dialer pmgo.Dialer, di *mgo.DialInfo, hostname string) (proto.ServerStatus, error) {
ss := proto.ServerStatus{}
tmpdi := *di
tmpdi.Addrs = []string{hostname}
log.Debugf("GetReplicasetMembers connecting to %s", hostname)
session, err := dialer.DialWithInfo(&tmpdi)
if err != nil {
return ss, errors.Wrapf(err, "getReplicasetMembers. cannot connect to %s", hostname)
}
defer session.Close()
if err := session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, &ss); err != nil {
return ss, errors.Wrap(err, "GetHostInfo.serverStatus")
}
return ss, nil
}
func GetSecuritySettings(session pmgo.SessionManager, ver string) (*security, error) {
s := security{
Auth: "disabled",
@@ -559,7 +491,7 @@ func getNodeType(session pmgo.SessionManager) (string, error) {
return "mongod", nil
}
func GetOpCountersStats(session pmgo.SessionManager, count int64, sleep time.Duration) (*opCounters, error) {
func GetOpCountersStats(session pmgo.SessionManager, count int, sleep time.Duration) (*opCounters, error) {
oc := &opCounters{}
prevOpCount := &opCounters{}
ss := proto.ServerStatus{}
@@ -568,7 +500,8 @@ func GetOpCountersStats(session pmgo.SessionManager, count int64, sleep time.Dur
}
ticker := time.NewTicker(sleep)
for i := int64(0); i < count+1; i++ {
// count + 1 because we need 1st reading to stablish a base to measure variation
for i := 0; i < count+1; i++ {
<-ticker.C
err := session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, &ss)
if err != nil {

View File

@@ -2,10 +2,10 @@ package templates
const Replicas = `
# Instances ##############################################################################################
ID Host Type ReplSet Engine
PID Host Type ReplSet Engine
{{- if . -}}
{{- range . }}
{{printf "% 3d" .Id}} {{printf "%-30s" .Name}} {{printf "%-30s" .StateStr}} {{printf "%10s" .Set }} {{printf "%20s" .StorageEngine.Name -}}
{{printf "% 3d" .ID}} {{printf "%-30s" .Name}} {{printf "%-30s" .StateStr}} {{ if .Set }}{{printf "%-10s" .Set }}{{else}}- {{end}} {{printf "%20s" .StorageEngine.Name -}}
{{end}}
{{else}}
no replica sets found