mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-12 14:18:32 +00:00
PMM-720: encapsulate stats, separate them from profiler
This commit is contained in:
@@ -1,141 +1,63 @@
|
||||
package profiler
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/montanaflynn/stats"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/util"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/stats"
|
||||
"github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter"
|
||||
"github.com/percona/pmgo"
|
||||
)
|
||||
|
||||
var (
|
||||
// MaxDepthLevel Max recursion level for the fingerprinter
|
||||
MaxDepthLevel = 10
|
||||
// DocsBufferSize is the buffer size to store documents from the MongoDB profiler
|
||||
DocsBufferSize = 100
|
||||
// ErrCannotGetQuery is the error returned if we cannot find a query into the profiler document
|
||||
ErrCannotGetQuery = errors.New("cannot get query field from the profile document (it is not a map)")
|
||||
)
|
||||
|
||||
// Times is an array of time.Time that implements the Sorter interface
|
||||
type Times []time.Time
|
||||
|
||||
func (a Times) Len() int { return len(a) }
|
||||
func (a Times) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a Times) Less(i, j int) bool { return a[i].Before(a[j]) }
|
||||
|
||||
type StatsGroupKey struct {
|
||||
Operation string
|
||||
Fingerprint string
|
||||
Namespace string
|
||||
}
|
||||
|
||||
type totalCounters struct {
|
||||
Count int
|
||||
Scanned float64
|
||||
Returned float64
|
||||
QueryTime float64
|
||||
Bytes float64
|
||||
}
|
||||
|
||||
type Profiler interface {
|
||||
GetLastError() error
|
||||
QueriesChan() chan []QueryInfoAndCounters
|
||||
QueriesChan() chan stats.Queries
|
||||
TimeoutsChan() <-chan time.Time
|
||||
ProcessDoc(proto.SystemProfile, map[StatsGroupKey]*QueryInfoAndCounters) error
|
||||
Start()
|
||||
Stop()
|
||||
}
|
||||
|
||||
type Profile struct {
|
||||
filters []filter.Filter
|
||||
iterator pmgo.IterManager
|
||||
ticker <-chan time.Time
|
||||
queriesChan chan []QueryInfoAndCounters
|
||||
// dependencies
|
||||
iterator pmgo.IterManager
|
||||
filters []filter.Filter
|
||||
ticker <-chan time.Time
|
||||
stats Stats
|
||||
|
||||
// internal
|
||||
queriesChan chan stats.Queries
|
||||
stopChan chan bool
|
||||
docsChan chan proto.SystemProfile
|
||||
timeoutsChan chan time.Time
|
||||
// For the moment ProcessDoc is exportable to it could be called from the "outside"
|
||||
// For that reason, we need a mutex to make it thread safe. In the future this func
|
||||
// will be unexported
|
||||
countersMapLock sync.Mutex
|
||||
queriesInfoAndCounters map[StatsGroupKey]*QueryInfoAndCounters
|
||||
keyFilters []string
|
||||
fingerprinter fingerprinter.Fingerprinter
|
||||
lock sync.Mutex
|
||||
running bool
|
||||
lastError error
|
||||
stopWaitGroup sync.WaitGroup
|
||||
countersMapLock sync.Mutex
|
||||
keyFilters []string
|
||||
lock sync.Mutex
|
||||
running bool
|
||||
lastError error
|
||||
stopWaitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
type QueryStats struct {
|
||||
ID string
|
||||
Namespace string
|
||||
Operation string
|
||||
Query string
|
||||
Fingerprint string
|
||||
FirstSeen time.Time
|
||||
LastSeen time.Time
|
||||
|
||||
Count int
|
||||
QPS float64
|
||||
Rank int
|
||||
Ratio float64
|
||||
QueryTime Statistics
|
||||
ResponseLength Statistics
|
||||
Returned Statistics
|
||||
Scanned Statistics
|
||||
}
|
||||
|
||||
type QueryInfoAndCounters struct {
|
||||
ID string
|
||||
Namespace string
|
||||
Operation string
|
||||
Query map[string]interface{}
|
||||
Fingerprint string
|
||||
FirstSeen time.Time
|
||||
LastSeen time.Time
|
||||
TableScan bool
|
||||
|
||||
Count int
|
||||
BlockedTime Times
|
||||
LockTime Times
|
||||
NReturned []float64
|
||||
NScanned []float64
|
||||
QueryTime []float64 // in milliseconds
|
||||
ResponseLength []float64
|
||||
}
|
||||
|
||||
type Statistics struct {
|
||||
Pct float64
|
||||
Total float64
|
||||
Min float64
|
||||
Max float64
|
||||
Avg float64
|
||||
Pct95 float64
|
||||
StdDev float64
|
||||
Median float64
|
||||
}
|
||||
|
||||
func NewProfiler(iterator pmgo.IterManager, filters []filter.Filter, ticker <-chan time.Time, fp fingerprinter.Fingerprinter) Profiler {
|
||||
func NewProfiler(iterator pmgo.IterManager, filters []filter.Filter, ticker <-chan time.Time, stats Stats) Profiler {
|
||||
return &Profile{
|
||||
filters: filters,
|
||||
fingerprinter: fp,
|
||||
iterator: iterator,
|
||||
ticker: ticker,
|
||||
queriesChan: make(chan []QueryInfoAndCounters),
|
||||
docsChan: make(chan proto.SystemProfile, DocsBufferSize),
|
||||
timeoutsChan: nil,
|
||||
queriesInfoAndCounters: make(map[StatsGroupKey]*QueryInfoAndCounters),
|
||||
keyFilters: []string{"^shardVersion$", "^\\$"},
|
||||
// dependencies
|
||||
iterator: iterator,
|
||||
filters: filters,
|
||||
ticker: ticker,
|
||||
stats: stats,
|
||||
|
||||
// internal
|
||||
docsChan: make(chan proto.SystemProfile, DocsBufferSize),
|
||||
timeoutsChan: nil,
|
||||
keyFilters: []string{"^shardVersion$", "^\\$"},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +65,7 @@ func (p *Profile) GetLastError() error {
|
||||
return p.lastError
|
||||
}
|
||||
|
||||
func (p *Profile) QueriesChan() chan []QueryInfoAndCounters {
|
||||
func (p *Profile) QueriesChan() chan stats.Queries {
|
||||
return p.queriesChan
|
||||
}
|
||||
|
||||
@@ -152,6 +74,7 @@ func (p *Profile) Start() {
|
||||
defer p.lock.Unlock()
|
||||
if !p.running {
|
||||
p.running = true
|
||||
p.queriesChan = make(chan stats.Queries)
|
||||
p.stopChan = make(chan bool)
|
||||
go p.getData()
|
||||
}
|
||||
@@ -185,8 +108,8 @@ MAIN_GETDATA_LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-p.ticker:
|
||||
p.queriesChan <- mapToArray(p.queriesInfoAndCounters)
|
||||
p.queriesInfoAndCounters = make(map[StatsGroupKey]*QueryInfoAndCounters) // Reset stats
|
||||
p.queriesChan <- p.stats.Queries()
|
||||
p.stats.Reset()
|
||||
case <-p.stopChan:
|
||||
// Close the iterator to break the loop on getDocs
|
||||
p.iterator.Close()
|
||||
@@ -217,163 +140,9 @@ func (p *Profile) getDocs() {
|
||||
continue
|
||||
}
|
||||
if len(doc.Query) > 0 {
|
||||
p.ProcessDoc(doc, p.queriesInfoAndCounters)
|
||||
p.stats.Add(doc)
|
||||
}
|
||||
}
|
||||
p.queriesChan <- mapToArray(p.queriesInfoAndCounters)
|
||||
p.queriesChan <- p.stats.Queries()
|
||||
p.Stop()
|
||||
}
|
||||
|
||||
func (p *Profile) ProcessDoc(doc proto.SystemProfile, stats map[StatsGroupKey]*QueryInfoAndCounters) error {
|
||||
|
||||
fp, err := p.fingerprinter.Fingerprint(doc.Query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get fingerprint: %s", err.Error())
|
||||
}
|
||||
var s *QueryInfoAndCounters
|
||||
var ok bool
|
||||
p.countersMapLock.Lock()
|
||||
defer p.countersMapLock.Unlock()
|
||||
|
||||
key := StatsGroupKey{
|
||||
Operation: doc.Op,
|
||||
Fingerprint: fp,
|
||||
Namespace: doc.Ns,
|
||||
}
|
||||
if s, ok = stats[key]; !ok {
|
||||
realQuery, _ := util.GetQueryField(doc.Query)
|
||||
s = &QueryInfoAndCounters{
|
||||
ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))),
|
||||
Operation: doc.Op,
|
||||
Fingerprint: fp,
|
||||
Namespace: doc.Ns,
|
||||
TableScan: false,
|
||||
Query: realQuery,
|
||||
}
|
||||
stats[key] = s
|
||||
}
|
||||
s.Count++
|
||||
s.NScanned = append(s.NScanned, float64(doc.DocsExamined))
|
||||
s.NReturned = append(s.NReturned, float64(doc.Nreturned))
|
||||
s.QueryTime = append(s.QueryTime, float64(doc.Millis))
|
||||
s.ResponseLength = append(s.ResponseLength, float64(doc.ResponseLength))
|
||||
var zeroTime time.Time
|
||||
if s.FirstSeen == zeroTime || s.FirstSeen.After(doc.Ts) {
|
||||
s.FirstSeen = doc.Ts
|
||||
}
|
||||
if s.LastSeen == zeroTime || s.LastSeen.Before(doc.Ts) {
|
||||
s.LastSeen = doc.Ts
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func CalcQueriesStats(queries []QueryInfoAndCounters, uptime int64) []QueryStats {
|
||||
stats := []QueryStats{}
|
||||
tc := calcTotalCounters(queries)
|
||||
|
||||
for _, query := range queries {
|
||||
queryStats := CountersToStats(query, uptime, tc)
|
||||
stats = append(stats, queryStats)
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
func CalcTotalQueriesStats(queries []QueryInfoAndCounters, uptime int64) QueryStats {
|
||||
tc := calcTotalCounters(queries)
|
||||
|
||||
totalQueryInfoAndCounters := aggregateCounters(queries)
|
||||
totalStats := CountersToStats(totalQueryInfoAndCounters, uptime, tc)
|
||||
|
||||
return totalStats
|
||||
}
|
||||
|
||||
func CountersToStats(query QueryInfoAndCounters, uptime int64, tc totalCounters) QueryStats {
|
||||
buf, _ := json.Marshal(query.Query)
|
||||
queryStats := QueryStats{
|
||||
Count: query.Count,
|
||||
ID: query.ID,
|
||||
Operation: query.Operation,
|
||||
Query: string(buf),
|
||||
Fingerprint: query.Fingerprint,
|
||||
Scanned: calcStats(query.NScanned),
|
||||
Returned: calcStats(query.NReturned),
|
||||
QueryTime: calcStats(query.QueryTime),
|
||||
ResponseLength: calcStats(query.ResponseLength),
|
||||
FirstSeen: query.FirstSeen,
|
||||
LastSeen: query.LastSeen,
|
||||
Namespace: query.Namespace,
|
||||
QPS: float64(query.Count) / float64(uptime),
|
||||
}
|
||||
if tc.Scanned > 0 {
|
||||
queryStats.Scanned.Pct = queryStats.Scanned.Total * 100 / tc.Scanned
|
||||
}
|
||||
if tc.Returned > 0 {
|
||||
queryStats.Returned.Pct = queryStats.Returned.Total * 100 / tc.Returned
|
||||
}
|
||||
if tc.QueryTime > 0 {
|
||||
queryStats.QueryTime.Pct = queryStats.QueryTime.Total * 100 / tc.QueryTime
|
||||
}
|
||||
if tc.Bytes > 0 {
|
||||
queryStats.ResponseLength.Pct = queryStats.ResponseLength.Total / tc.Bytes
|
||||
}
|
||||
if queryStats.Returned.Total > 0 {
|
||||
queryStats.Ratio = queryStats.Scanned.Total / queryStats.Returned.Total
|
||||
}
|
||||
|
||||
return queryStats
|
||||
}
|
||||
|
||||
func aggregateCounters(queries []QueryInfoAndCounters) QueryInfoAndCounters {
|
||||
qt := QueryInfoAndCounters{}
|
||||
for _, query := range queries {
|
||||
qt.NScanned = append(qt.NScanned, query.NScanned...)
|
||||
qt.NReturned = append(qt.NReturned, query.NReturned...)
|
||||
qt.QueryTime = append(qt.QueryTime, query.QueryTime...)
|
||||
qt.ResponseLength = append(qt.ResponseLength, query.ResponseLength...)
|
||||
}
|
||||
return qt
|
||||
}
|
||||
|
||||
func calcTotalCounters(queries []QueryInfoAndCounters) totalCounters {
|
||||
tc := totalCounters{}
|
||||
|
||||
for _, query := range queries {
|
||||
tc.Count += query.Count
|
||||
|
||||
scanned, _ := stats.Sum(query.NScanned)
|
||||
tc.Scanned += scanned
|
||||
|
||||
returned, _ := stats.Sum(query.NReturned)
|
||||
tc.Returned += returned
|
||||
|
||||
queryTime, _ := stats.Sum(query.QueryTime)
|
||||
tc.QueryTime += queryTime
|
||||
|
||||
bytes, _ := stats.Sum(query.ResponseLength)
|
||||
tc.Bytes += bytes
|
||||
}
|
||||
return tc
|
||||
}
|
||||
|
||||
func calcStats(samples []float64) Statistics {
|
||||
var s Statistics
|
||||
s.Total, _ = stats.Sum(samples)
|
||||
s.Min, _ = stats.Min(samples)
|
||||
s.Max, _ = stats.Max(samples)
|
||||
s.Avg, _ = stats.Mean(samples)
|
||||
s.Pct95, _ = stats.PercentileNearestRank(samples, 95)
|
||||
s.StdDev, _ = stats.StandardDeviation(samples)
|
||||
s.Median, _ = stats.Median(samples)
|
||||
return s
|
||||
}
|
||||
|
||||
func mapToArray(stats map[StatsGroupKey]*QueryInfoAndCounters) []QueryInfoAndCounters {
|
||||
sa := []QueryInfoAndCounters{}
|
||||
for _, s := range stats {
|
||||
sa = append(sa, *s)
|
||||
}
|
||||
return sa
|
||||
}
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/percona/percona-toolkit/src/go/lib/tutil"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/stats"
|
||||
"github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter"
|
||||
"github.com/percona/pmgo/pmgomock"
|
||||
)
|
||||
@@ -62,12 +63,13 @@ func TestRegularIterator(t *testing.T) {
|
||||
)
|
||||
filters := []filter.Filter{}
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, nil, fp)
|
||||
s := stats.New(fp)
|
||||
prof := NewProfiler(iter, filters, nil, s)
|
||||
|
||||
firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914+00:00")
|
||||
lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:20.214+00:00")
|
||||
want := []QueryInfoAndCounters{
|
||||
QueryInfoAndCounters{
|
||||
want := stats.Queries{
|
||||
{
|
||||
ID: "c6466139b21c392acd0699e863b50d81",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
@@ -80,8 +82,6 @@ func TestRegularIterator(t *testing.T) {
|
||||
LastSeen: lastSeen,
|
||||
TableScan: false,
|
||||
Count: 2,
|
||||
BlockedTime: Times(nil),
|
||||
LockTime: Times(nil),
|
||||
NReturned: []float64{50, 75},
|
||||
NScanned: []float64{100, 75},
|
||||
QueryTime: []float64{0, 1},
|
||||
@@ -123,12 +123,13 @@ func TestIteratorTimeout(t *testing.T) {
|
||||
filters := []filter.Filter{}
|
||||
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, nil, fp)
|
||||
s := stats.New(fp)
|
||||
prof := NewProfiler(iter, filters, nil, s)
|
||||
|
||||
firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914+00:00")
|
||||
lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914+00:00")
|
||||
want := []QueryInfoAndCounters{
|
||||
QueryInfoAndCounters{
|
||||
want := stats.Queries{
|
||||
{
|
||||
ID: "c6466139b21c392acd0699e863b50d81",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
@@ -141,8 +142,6 @@ func TestIteratorTimeout(t *testing.T) {
|
||||
LastSeen: lastSeen,
|
||||
TableScan: false,
|
||||
Count: 1,
|
||||
BlockedTime: Times(nil),
|
||||
LockTime: Times(nil),
|
||||
NReturned: []float64{75},
|
||||
NScanned: []float64{75},
|
||||
QueryTime: []float64{1},
|
||||
@@ -210,10 +209,11 @@ func TestTailIterator(t *testing.T) {
|
||||
filters := []filter.Filter{}
|
||||
ticker := time.NewTicker(time.Second)
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, ticker.C, fp)
|
||||
s := stats.New(fp)
|
||||
prof := NewProfiler(iter, filters, ticker.C, s)
|
||||
|
||||
want := []QueryInfoAndCounters{
|
||||
QueryInfoAndCounters{
|
||||
want := stats.Queries{
|
||||
{
|
||||
ID: "c6466139b21c392acd0699e863b50d81",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
@@ -226,14 +226,12 @@ func TestTailIterator(t *testing.T) {
|
||||
LastSeen: parseDate("2017-04-01T23:01:20.214+00:00"),
|
||||
TableScan: false,
|
||||
Count: 1,
|
||||
BlockedTime: Times(nil),
|
||||
LockTime: Times(nil),
|
||||
NReturned: []float64{50},
|
||||
NScanned: []float64{100},
|
||||
QueryTime: []float64{0},
|
||||
ResponseLength: []float64{1.06123e+06},
|
||||
},
|
||||
QueryInfoAndCounters{
|
||||
{
|
||||
ID: "c6466139b21c392acd0699e863b50d81",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
@@ -246,8 +244,6 @@ func TestTailIterator(t *testing.T) {
|
||||
LastSeen: parseDate("2017-04-01T23:01:19.914+00:00"),
|
||||
TableScan: false,
|
||||
Count: 1,
|
||||
BlockedTime: Times(nil),
|
||||
LockTime: Times(nil),
|
||||
NReturned: []float64{75},
|
||||
NScanned: []float64{75},
|
||||
QueryTime: []float64{1},
|
||||
@@ -261,7 +257,7 @@ func TestTailIterator(t *testing.T) {
|
||||
for index < 2 {
|
||||
select {
|
||||
case queries := <-prof.QueriesChan():
|
||||
if !reflect.DeepEqual(queries, []QueryInfoAndCounters{want[index]}) {
|
||||
if !reflect.DeepEqual(queries, stats.Queries{want[index]}) {
|
||||
t.Errorf("invalid queries. \nGot: %#v,\nWant: %#v\n", queries, want)
|
||||
}
|
||||
index++
|
||||
@@ -279,7 +275,7 @@ func TestCalcStats(t *testing.T) {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
want := []QueryStats{}
|
||||
want := []stats.QueryStats{}
|
||||
err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.want.json", &want)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load expected results: %s", err.Error())
|
||||
@@ -300,17 +296,18 @@ func TestCalcStats(t *testing.T) {
|
||||
|
||||
filters := []filter.Filter{}
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, nil, fp)
|
||||
s := stats.New(fp)
|
||||
prof := NewProfiler(iter, filters, nil, s)
|
||||
|
||||
prof.Start()
|
||||
select {
|
||||
case queries := <-prof.QueriesChan():
|
||||
stats := CalcQueriesStats(queries, 1)
|
||||
s := queries.CalcQueriesStats(1)
|
||||
if os.Getenv("UPDATE_SAMPLES") != "" {
|
||||
tutil.WriteJson(vars.RootPath+samples+"profiler_docs_stats.want.json", stats)
|
||||
tutil.WriteJson(vars.RootPath+samples+"profiler_docs_stats.want.json", s)
|
||||
}
|
||||
if !reflect.DeepEqual(stats, want) {
|
||||
t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", stats, want)
|
||||
if !reflect.DeepEqual(s, want) {
|
||||
t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", s, want)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Didn't get any query")
|
||||
@@ -327,7 +324,7 @@ func TestCalcTotalStats(t *testing.T) {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
want := QueryStats{}
|
||||
want := stats.QueryStats{}
|
||||
err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", &want)
|
||||
if err != nil && !tutil.ShouldUpdateSamples() {
|
||||
t.Fatalf("cannot load expected results: %s", err.Error())
|
||||
@@ -348,206 +345,24 @@ func TestCalcTotalStats(t *testing.T) {
|
||||
|
||||
filters := []filter.Filter{}
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, nil, fp)
|
||||
s := stats.New(fp)
|
||||
prof := NewProfiler(iter, filters, nil, s)
|
||||
|
||||
prof.Start()
|
||||
select {
|
||||
case queries := <-prof.QueriesChan():
|
||||
stats := CalcTotalQueriesStats(queries, 1)
|
||||
s := queries.CalcTotalQueriesStats(1)
|
||||
if os.Getenv("UPDATE_SAMPLES") != "" {
|
||||
fmt.Println("Updating samples: " + vars.RootPath + samples + "profiler_docs_total_stats.want.json")
|
||||
err := tutil.WriteJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", stats)
|
||||
err := tutil.WriteJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", s)
|
||||
if err != nil {
|
||||
fmt.Printf("cannot update samples: %s", err.Error())
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(stats, want) {
|
||||
t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", stats, want)
|
||||
if !reflect.DeepEqual(s, want) {
|
||||
t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", s, want)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Didn't get any query")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCalcTotalCounters(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
docs := []proto.SystemProfile{}
|
||||
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
want := totalCounters{}
|
||||
err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_total_counters.want.json", &want)
|
||||
if err != nil && !tutil.ShouldUpdateSamples() {
|
||||
t.Fatalf("cannot load expected results: %s", err.Error())
|
||||
}
|
||||
|
||||
iter := pmgomock.NewMockIterManager(ctrl)
|
||||
gomock.InOrder(
|
||||
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[0]).Return(true),
|
||||
iter.EXPECT().Timeout().Return(false),
|
||||
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[1]).Return(true),
|
||||
iter.EXPECT().Timeout().Return(false),
|
||||
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[2]).Return(true),
|
||||
iter.EXPECT().Timeout().Return(false),
|
||||
iter.EXPECT().Next(gomock.Any()).Return(false),
|
||||
iter.EXPECT().Timeout().Return(false),
|
||||
iter.EXPECT().Close(),
|
||||
)
|
||||
|
||||
filters := []filter.Filter{}
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, nil, fp)
|
||||
|
||||
prof.Start()
|
||||
select {
|
||||
case queries := <-prof.QueriesChan():
|
||||
counters := calcTotalCounters(queries)
|
||||
if tutil.ShouldUpdateSamples() {
|
||||
fmt.Println("Updating samples: " + vars.RootPath + samples + "profiler_docs_total_counters.want.json")
|
||||
err := tutil.WriteJson(vars.RootPath+samples+"profiler_docs_total_counters.want.json", counters)
|
||||
if err != nil {
|
||||
fmt.Printf("cannot update samples: %s", err.Error())
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(counters, want) {
|
||||
t.Errorf("Invalid counters.\nGot:%#v\nWant: %#v\n", counters, want)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Didn't get any query")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessDoc(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
docs := []proto.SystemProfile{}
|
||||
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
iter := pmgomock.NewMockIterManager(ctrl)
|
||||
filters := []filter.Filter{}
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := NewProfiler(iter, filters, nil, fp)
|
||||
|
||||
var stats = make(map[StatsGroupKey]*QueryInfoAndCounters)
|
||||
|
||||
err = prof.ProcessDoc(docs[1], stats)
|
||||
if err != nil {
|
||||
t.Errorf("Error processing doc: %s\n", err.Error())
|
||||
}
|
||||
statsKey := StatsGroupKey{Operation: "query", Fingerprint: "s2", Namespace: "samples.col1"}
|
||||
statsVal := &QueryInfoAndCounters{
|
||||
ID: "84e09ef6a3dc35f472df05fa98eee7d3",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: map[string]interface{}{"s2": map[string]interface{}{"$gte": "41991", "$lt": "33754"}},
|
||||
Fingerprint: "s2",
|
||||
FirstSeen: parseDate("2017-04-10T13:15:53.532-03:00"),
|
||||
LastSeen: parseDate("2017-04-10T13:15:53.532-03:00"),
|
||||
TableScan: false,
|
||||
Count: 1,
|
||||
BlockedTime: nil,
|
||||
LockTime: nil,
|
||||
NReturned: []float64{0},
|
||||
NScanned: []float64{10000},
|
||||
QueryTime: []float64{7},
|
||||
ResponseLength: []float64{215},
|
||||
}
|
||||
|
||||
want := map[StatsGroupKey]*QueryInfoAndCounters{statsKey: statsVal}
|
||||
|
||||
if !reflect.DeepEqual(stats, want) {
|
||||
t.Errorf("Error in ProcessDoc.\nGot:%#v\nWant: %#v\n", stats, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimesLen(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
a times
|
||||
want int
|
||||
}{
|
||||
{
|
||||
name: "Times.Len",
|
||||
a: []time.Time{time.Now()},
|
||||
want: 1,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := tt.a.Len(); got != tt.want {
|
||||
t.Errorf("times.Len() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimesSwap(t *testing.T) {
|
||||
type args struct {
|
||||
i int
|
||||
j int
|
||||
}
|
||||
t1 := time.Now()
|
||||
t2 := t1.Add(1 * time.Minute)
|
||||
tests := []struct {
|
||||
name string
|
||||
a times
|
||||
args args
|
||||
}{
|
||||
{
|
||||
name: "Times.Swap",
|
||||
a: times{t1, t2},
|
||||
args: args{i: 0, j: 1},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tt.a.Swap(tt.args.i, tt.args.j)
|
||||
if tt.a[0] != t2 || tt.a[1] != t1 {
|
||||
t.Errorf("%s has (%v, %v) want (%v, %v)", tt.name, tt.a[0], tt.a[1], t2, t1)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimesLess(t *testing.T) {
|
||||
type args struct {
|
||||
i int
|
||||
j int
|
||||
}
|
||||
t1 := time.Now()
|
||||
t2 := t1.Add(1 * time.Minute)
|
||||
tests := []struct {
|
||||
name string
|
||||
a times
|
||||
args args
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "Times.Swap",
|
||||
a: times{t1, t2},
|
||||
args: args{i: 0, j: 1},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "Times.Swap",
|
||||
a: times{t2, t1},
|
||||
args: args{i: 0, j: 1},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := tt.a.Less(tt.args.i, tt.args.j); got != tt.want {
|
||||
t.Errorf("times.Less() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
12
src/go/mongolib/profiler/stats.go
Normal file
12
src/go/mongolib/profiler/stats.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package profiler
|
||||
|
||||
import (
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/stats"
|
||||
)
|
||||
|
||||
type Stats interface {
|
||||
Reset()
|
||||
Add(doc proto.SystemProfile) error
|
||||
Queries() stats.Queries
|
||||
}
|
268
src/go/mongolib/stats/stats.go
Normal file
268
src/go/mongolib/stats/stats.go
Normal file
@@ -0,0 +1,268 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/montanaflynn/stats"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/util"
|
||||
)
|
||||
|
||||
// New creates new instance of stats with given fingerprinter
|
||||
func New(fingerprinter fingerprinter.Fingerprinter) *Stats {
|
||||
s := &Stats{
|
||||
fingerprinter: fingerprinter,
|
||||
}
|
||||
|
||||
s.Reset()
|
||||
return s
|
||||
}
|
||||
|
||||
// Stats is a collection of MongoDB statistics
|
||||
type Stats struct {
|
||||
// dependencies
|
||||
fingerprinter fingerprinter.Fingerprinter
|
||||
|
||||
// internal
|
||||
queryInfoAndCounters map[GroupKey]*QueryInfoAndCounters
|
||||
}
|
||||
|
||||
// Reset clears the collection of statistics
|
||||
func (s *Stats) Reset() {
|
||||
s.queryInfoAndCounters = make(map[GroupKey]*QueryInfoAndCounters)
|
||||
}
|
||||
|
||||
// Add adds proto.SystemProfile to the collection of statistics
|
||||
func (s *Stats) Add(doc proto.SystemProfile) error {
|
||||
fp, err := s.fingerprinter.Fingerprint(doc.Query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get fingerprint: %s", err)
|
||||
}
|
||||
var qiac *QueryInfoAndCounters
|
||||
var ok bool
|
||||
|
||||
key := GroupKey{
|
||||
Operation: doc.Op,
|
||||
Fingerprint: fp,
|
||||
Namespace: doc.Ns,
|
||||
}
|
||||
if qiac, ok = s.queryInfoAndCounters[key]; !ok {
|
||||
realQuery, _ := util.GetQueryField(doc.Query)
|
||||
qiac = &QueryInfoAndCounters{
|
||||
ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))),
|
||||
Operation: doc.Op,
|
||||
Fingerprint: fp,
|
||||
Namespace: doc.Ns,
|
||||
TableScan: false,
|
||||
Query: realQuery,
|
||||
}
|
||||
s.queryInfoAndCounters[key] = qiac
|
||||
}
|
||||
qiac.Count++
|
||||
qiac.NScanned = append(qiac.NScanned, float64(doc.DocsExamined))
|
||||
qiac.NReturned = append(qiac.NReturned, float64(doc.Nreturned))
|
||||
qiac.QueryTime = append(qiac.QueryTime, float64(doc.Millis))
|
||||
qiac.ResponseLength = append(qiac.ResponseLength, float64(doc.ResponseLength))
|
||||
var zeroTime time.Time
|
||||
if qiac.FirstSeen == zeroTime || qiac.FirstSeen.After(doc.Ts) {
|
||||
qiac.FirstSeen = doc.Ts
|
||||
}
|
||||
if qiac.LastSeen == zeroTime || qiac.LastSeen.Before(doc.Ts) {
|
||||
qiac.LastSeen = doc.Ts
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Queries returns all collected statistics
|
||||
func (s *Stats) Queries() Queries {
|
||||
return mapToArray(s.queryInfoAndCounters)
|
||||
}
|
||||
|
||||
// Queries is a slice of MongoDB statistics
|
||||
type Queries []QueryInfoAndCounters
|
||||
|
||||
// CalcQueriesStats calculates QueryStats for given uptime
|
||||
func (q Queries) CalcQueriesStats(uptime int64) []QueryStats {
|
||||
qs := []QueryStats{}
|
||||
tc := calcTotalCounters(q)
|
||||
|
||||
for _, query := range q {
|
||||
queryStats := countersToStats(query, uptime, tc)
|
||||
qs = append(qs, queryStats)
|
||||
}
|
||||
|
||||
return qs
|
||||
}
|
||||
|
||||
// CalcTotalQueriesStats calculates total QueryStats for given uptime
|
||||
func (q Queries) CalcTotalQueriesStats(uptime int64) QueryStats {
|
||||
tc := calcTotalCounters(q)
|
||||
|
||||
totalQueryInfoAndCounters := aggregateCounters(q)
|
||||
totalStats := countersToStats(totalQueryInfoAndCounters, uptime, tc)
|
||||
|
||||
return totalStats
|
||||
}
|
||||
|
||||
// times is an array of time.Time that implements the Sorter interface
|
||||
type Times []time.Time
|
||||
|
||||
func (a Times) Len() int { return len(a) }
|
||||
func (a Times) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a Times) Less(i, j int) bool { return a[i].Before(a[j]) }
|
||||
|
||||
type GroupKey struct {
|
||||
Operation string
|
||||
Fingerprint string
|
||||
Namespace string
|
||||
}
|
||||
|
||||
type totalCounters struct {
|
||||
Count int
|
||||
Scanned float64
|
||||
Returned float64
|
||||
QueryTime float64
|
||||
Bytes float64
|
||||
}
|
||||
|
||||
type QueryStats struct {
|
||||
ID string
|
||||
Namespace string
|
||||
Operation string
|
||||
Query string
|
||||
Fingerprint string
|
||||
FirstSeen time.Time
|
||||
LastSeen time.Time
|
||||
|
||||
Count int
|
||||
QPS float64
|
||||
Rank int
|
||||
Ratio float64
|
||||
QueryTime Statistics
|
||||
ResponseLength Statistics
|
||||
Returned Statistics
|
||||
Scanned Statistics
|
||||
}
|
||||
|
||||
type QueryInfoAndCounters struct {
|
||||
ID string
|
||||
Namespace string
|
||||
Operation string
|
||||
Query map[string]interface{}
|
||||
Fingerprint string
|
||||
FirstSeen time.Time
|
||||
LastSeen time.Time
|
||||
TableScan bool
|
||||
|
||||
Count int
|
||||
BlockedTime Times
|
||||
LockTime Times
|
||||
NReturned []float64
|
||||
NScanned []float64
|
||||
QueryTime []float64 // in milliseconds
|
||||
ResponseLength []float64
|
||||
}
|
||||
|
||||
type Statistics struct {
|
||||
Pct float64
|
||||
Total float64
|
||||
Min float64
|
||||
Max float64
|
||||
Avg float64
|
||||
Pct95 float64
|
||||
StdDev float64
|
||||
Median float64
|
||||
}
|
||||
|
||||
func countersToStats(query QueryInfoAndCounters, uptime int64, tc totalCounters) QueryStats {
|
||||
buf, _ := json.Marshal(query.Query)
|
||||
queryStats := QueryStats{
|
||||
Count: query.Count,
|
||||
ID: query.ID,
|
||||
Operation: query.Operation,
|
||||
Query: string(buf),
|
||||
Fingerprint: query.Fingerprint,
|
||||
Scanned: calcStats(query.NScanned),
|
||||
Returned: calcStats(query.NReturned),
|
||||
QueryTime: calcStats(query.QueryTime),
|
||||
ResponseLength: calcStats(query.ResponseLength),
|
||||
FirstSeen: query.FirstSeen,
|
||||
LastSeen: query.LastSeen,
|
||||
Namespace: query.Namespace,
|
||||
QPS: float64(query.Count) / float64(uptime),
|
||||
}
|
||||
if tc.Scanned > 0 {
|
||||
queryStats.Scanned.Pct = queryStats.Scanned.Total * 100 / tc.Scanned
|
||||
}
|
||||
if tc.Returned > 0 {
|
||||
queryStats.Returned.Pct = queryStats.Returned.Total * 100 / tc.Returned
|
||||
}
|
||||
if tc.QueryTime > 0 {
|
||||
queryStats.QueryTime.Pct = queryStats.QueryTime.Total * 100 / tc.QueryTime
|
||||
}
|
||||
if tc.Bytes > 0 {
|
||||
queryStats.ResponseLength.Pct = queryStats.ResponseLength.Total / tc.Bytes
|
||||
}
|
||||
if queryStats.Returned.Total > 0 {
|
||||
queryStats.Ratio = queryStats.Scanned.Total / queryStats.Returned.Total
|
||||
}
|
||||
|
||||
return queryStats
|
||||
}
|
||||
|
||||
func aggregateCounters(queries []QueryInfoAndCounters) QueryInfoAndCounters {
|
||||
qt := QueryInfoAndCounters{}
|
||||
for _, query := range queries {
|
||||
qt.NScanned = append(qt.NScanned, query.NScanned...)
|
||||
qt.NReturned = append(qt.NReturned, query.NReturned...)
|
||||
qt.QueryTime = append(qt.QueryTime, query.QueryTime...)
|
||||
qt.ResponseLength = append(qt.ResponseLength, query.ResponseLength...)
|
||||
}
|
||||
return qt
|
||||
}
|
||||
|
||||
func calcTotalCounters(queries []QueryInfoAndCounters) totalCounters {
|
||||
tc := totalCounters{}
|
||||
|
||||
for _, query := range queries {
|
||||
tc.Count += query.Count
|
||||
|
||||
scanned, _ := stats.Sum(query.NScanned)
|
||||
tc.Scanned += scanned
|
||||
|
||||
returned, _ := stats.Sum(query.NReturned)
|
||||
tc.Returned += returned
|
||||
|
||||
queryTime, _ := stats.Sum(query.QueryTime)
|
||||
tc.QueryTime += queryTime
|
||||
|
||||
bytes, _ := stats.Sum(query.ResponseLength)
|
||||
tc.Bytes += bytes
|
||||
}
|
||||
return tc
|
||||
}
|
||||
|
||||
func calcStats(samples []float64) Statistics {
|
||||
var s Statistics
|
||||
s.Total, _ = stats.Sum(samples)
|
||||
s.Min, _ = stats.Min(samples)
|
||||
s.Max, _ = stats.Max(samples)
|
||||
s.Avg, _ = stats.Mean(samples)
|
||||
s.Pct95, _ = stats.PercentileNearestRank(samples, 95)
|
||||
s.StdDev, _ = stats.StandardDeviation(samples)
|
||||
s.Median, _ = stats.Median(samples)
|
||||
return s
|
||||
}
|
||||
|
||||
func mapToArray(stats map[GroupKey]*QueryInfoAndCounters) []QueryInfoAndCounters {
|
||||
sa := []QueryInfoAndCounters{}
|
||||
for _, s := range stats {
|
||||
sa = append(sa, *s)
|
||||
}
|
||||
return sa
|
||||
}
|
166
src/go/mongolib/stats/stats_test.go
Normal file
166
src/go/mongolib/stats/stats_test.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/percona/percona-toolkit/src/go/lib/tutil"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
samples = "/src/go/tests/"
|
||||
)
|
||||
|
||||
type testVars struct {
|
||||
RootPath string
|
||||
}
|
||||
|
||||
var vars testVars
|
||||
|
||||
func parseDate(dateStr string) time.Time {
|
||||
date, _ := time.Parse(time.RFC3339Nano, dateStr)
|
||||
return date
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
var err error
|
||||
if vars.RootPath, err = tutil.RootPath(); err != nil {
|
||||
log.Printf("cannot get root path: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func TestTimesLen(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
a Times
|
||||
want int
|
||||
}{
|
||||
{
|
||||
name: "Times.Len",
|
||||
a: []time.Time{time.Now()},
|
||||
want: 1,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := tt.a.Len(); got != tt.want {
|
||||
t.Errorf("times.Len() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimesSwap(t *testing.T) {
|
||||
type args struct {
|
||||
i int
|
||||
j int
|
||||
}
|
||||
t1 := time.Now()
|
||||
t2 := t1.Add(1 * time.Minute)
|
||||
tests := []struct {
|
||||
name string
|
||||
a Times
|
||||
args args
|
||||
}{
|
||||
{
|
||||
name: "Times.Swap",
|
||||
a: Times{t1, t2},
|
||||
args: args{i: 0, j: 1},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tt.a.Swap(tt.args.i, tt.args.j)
|
||||
if tt.a[0] != t2 || tt.a[1] != t1 {
|
||||
t.Errorf("%s has (%v, %v) want (%v, %v)", tt.name, tt.a[0], tt.a[1], t2, t1)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimesLess(t *testing.T) {
|
||||
type args struct {
|
||||
i int
|
||||
j int
|
||||
}
|
||||
t1 := time.Now()
|
||||
t2 := t1.Add(1 * time.Minute)
|
||||
tests := []struct {
|
||||
name string
|
||||
a Times
|
||||
args args
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "Times.Swap",
|
||||
a: Times{t1, t2},
|
||||
args: args{i: 0, j: 1},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "Times.Swap",
|
||||
a: Times{t2, t1},
|
||||
args: args{i: 0, j: 1},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := tt.a.Less(tt.args.i, tt.args.j); got != tt.want {
|
||||
t.Errorf("times.Less() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStats(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
docs := []proto.SystemProfile{}
|
||||
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
s := New(fp)
|
||||
|
||||
err = s.Add(docs[1])
|
||||
if err != nil {
|
||||
t.Errorf("Error processing doc: %s\n", err.Error())
|
||||
}
|
||||
statsVal := QueryInfoAndCounters{
|
||||
ID: "84e09ef6a3dc35f472df05fa98eee7d3",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: map[string]interface{}{"s2": map[string]interface{}{"$gte": "41991", "$lt": "33754"}},
|
||||
Fingerprint: "s2",
|
||||
FirstSeen: parseDate("2017-04-10T13:15:53.532-03:00"),
|
||||
LastSeen: parseDate("2017-04-10T13:15:53.532-03:00"),
|
||||
TableScan: false,
|
||||
Count: 1,
|
||||
BlockedTime: nil,
|
||||
LockTime: nil,
|
||||
NReturned: []float64{0},
|
||||
NScanned: []float64{10000},
|
||||
QueryTime: []float64{7},
|
||||
ResponseLength: []float64{215},
|
||||
}
|
||||
|
||||
want := Queries{
|
||||
statsVal,
|
||||
}
|
||||
got := s.Queries()
|
||||
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("Error \nGot:%#v\nWant: %#v\n", got, want)
|
||||
}
|
||||
}
|
@@ -4,13 +4,13 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
mgo "gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/percona/percona-toolkit/src/go/lib/tutil"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/pmgo"
|
||||
"github.com/percona/pmgo/pmgomock"
|
||||
"gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
// OK
|
||||
@@ -145,9 +145,7 @@ func TestGetReplicasetMembers(t *testing.T) {
|
||||
database.EXPECT().Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, gomock.Any()).SetArg(1, ss)
|
||||
session.EXPECT().Close()
|
||||
|
||||
session.EXPECT().Close()
|
||||
|
||||
di := &mgo.DialInfo{Addrs: []string{"localhost"}}
|
||||
di := &pmgo.DialInfo{Addrs: []string{"localhost"}}
|
||||
rss, err := GetReplicasetMembers(dialer, di)
|
||||
if err != nil {
|
||||
t.Errorf("getReplicasetMembers: %v", err)
|
||||
@@ -166,26 +164,67 @@ func TestGetHostnames(t *testing.T) {
|
||||
dialer := pmgomock.NewMockDialer(ctrl)
|
||||
session := pmgomock.NewMockSessionManager(ctrl)
|
||||
|
||||
mockShardsInfo := proto.ShardsInfo{
|
||||
Shards: []proto.Shard{
|
||||
proto.Shard{
|
||||
ID: "r1",
|
||||
Host: "r1/localhost:17001,localhost:17002,localhost:17003",
|
||||
},
|
||||
proto.Shard{
|
||||
ID: "r2",
|
||||
Host: "r2/localhost:18001,localhost:18002,localhost:18003",
|
||||
},
|
||||
},
|
||||
OK: 1,
|
||||
mockrss := proto.ReplicaSetStatus{
|
||||
Date: "",
|
||||
MyState: 1,
|
||||
Term: 0,
|
||||
HeartbeatIntervalMillis: 0,
|
||||
Members: []proto.Members{
|
||||
proto.Members{
|
||||
Optime: nil,
|
||||
OptimeDate: "",
|
||||
InfoMessage: "",
|
||||
ID: 0,
|
||||
Name: "localhost:17001",
|
||||
Health: 1,
|
||||
StateStr: "PRIMARY",
|
||||
Uptime: 113287,
|
||||
ConfigVersion: 1,
|
||||
Self: true,
|
||||
State: 1,
|
||||
ElectionTime: 6340960613392449537,
|
||||
ElectionDate: "",
|
||||
Set: ""},
|
||||
proto.Members{
|
||||
Optime: nil,
|
||||
OptimeDate: "",
|
||||
InfoMessage: "",
|
||||
ID: 1,
|
||||
Name: "localhost:17002",
|
||||
Health: 1,
|
||||
StateStr: "SECONDARY",
|
||||
Uptime: 113031,
|
||||
ConfigVersion: 1,
|
||||
Self: false,
|
||||
State: 2,
|
||||
ElectionTime: 0,
|
||||
ElectionDate: "",
|
||||
Set: ""},
|
||||
proto.Members{
|
||||
Optime: nil,
|
||||
OptimeDate: "",
|
||||
InfoMessage: "",
|
||||
ID: 2,
|
||||
Name: "localhost:17003",
|
||||
Health: 1,
|
||||
StateStr: "SECONDARY",
|
||||
Uptime: 113031,
|
||||
ConfigVersion: 1,
|
||||
Self: false,
|
||||
State: 2,
|
||||
ElectionTime: 0,
|
||||
ElectionDate: "",
|
||||
Set: ""}},
|
||||
Ok: 1,
|
||||
Set: "r1",
|
||||
}
|
||||
|
||||
dialer.EXPECT().DialWithInfo(gomock.Any()).Return(session, nil)
|
||||
session.EXPECT().Run("getShardMap", gomock.Any()).SetArg(1, mockShardsInfo)
|
||||
session.EXPECT().Close()
|
||||
session.EXPECT().SetMode(mgo.Monotonic, true)
|
||||
session.EXPECT().Run(bson.M{"replSetGetStatus": 1}, gomock.Any()).SetArg(1, mockrss)
|
||||
|
||||
expect := []string{"localhost", "localhost:17001", "localhost:18001"}
|
||||
di := &mgo.DialInfo{Addrs: []string{"localhost"}}
|
||||
expect := []string{"localhost:17001", "localhost:17002", "localhost:17003"}
|
||||
di := &pmgo.DialInfo{Addrs: []string{"localhost"}}
|
||||
rss, err := GetHostnames(dialer, di)
|
||||
if err != nil {
|
||||
t.Errorf("getHostnames: %v", err)
|
||||
|
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/profiler"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/stats"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/util"
|
||||
"github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter"
|
||||
"github.com/percona/pmgo"
|
||||
@@ -136,18 +137,19 @@ func main() {
|
||||
i := session.DB(di.Database).C("system.profile").Find(query).Sort("-$natural").Iter()
|
||||
|
||||
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
|
||||
prof := profiler.NewProfiler(i, filters, nil, fp)
|
||||
s := stats.New(fp)
|
||||
prof := profiler.NewProfiler(i, filters, nil, s)
|
||||
prof.Start()
|
||||
queries := <-prof.QueriesChan()
|
||||
|
||||
uptime := uptime(session)
|
||||
|
||||
queriesStats := profiler.CalcQueriesStats(queries, uptime)
|
||||
queriesStats := queries.CalcQueriesStats(uptime)
|
||||
sortedQueryStats := sortQueries(queriesStats, opts.OrderBy)
|
||||
|
||||
printHeader(opts)
|
||||
|
||||
queryTotals := profiler.CalcTotalQueriesStats(queries, uptime)
|
||||
queryTotals := queries.CalcTotalQueriesStats(uptime)
|
||||
tt, _ := template.New("query").Funcs(template.FuncMap{
|
||||
"Format": format,
|
||||
}).Parse(getTotalsTemplate())
|
||||
@@ -347,15 +349,15 @@ func getTotalsTemplate() string {
|
||||
return t
|
||||
}
|
||||
|
||||
type lessFunc func(p1, p2 *profiler.QueryStats) bool
|
||||
type lessFunc func(p1, p2 *stats.QueryStats) bool
|
||||
|
||||
type multiSorter struct {
|
||||
queries []profiler.QueryStats
|
||||
queries []stats.QueryStats
|
||||
less []lessFunc
|
||||
}
|
||||
|
||||
// Sort sorts the argument slice according to the less functions passed to OrderedBy.
|
||||
func (ms *multiSorter) Sort(queries []profiler.QueryStats) {
|
||||
func (ms *multiSorter) Sort(queries []stats.QueryStats) {
|
||||
ms.queries = queries
|
||||
sort.Sort(ms)
|
||||
}
|
||||
@@ -404,29 +406,29 @@ func (ms *multiSorter) Less(i, j int) bool {
|
||||
return ms.less[k](p, q)
|
||||
}
|
||||
|
||||
func sortQueries(queries []profiler.QueryStats, orderby []string) []profiler.QueryStats {
|
||||
func sortQueries(queries []stats.QueryStats, orderby []string) []stats.QueryStats {
|
||||
sortFuncs := []lessFunc{}
|
||||
for _, field := range orderby {
|
||||
var f lessFunc
|
||||
switch field {
|
||||
//
|
||||
case "count":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.Count < c2.Count
|
||||
}
|
||||
case "-count":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.Count > c2.Count
|
||||
}
|
||||
|
||||
case "ratio":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
ratio1 := c1.Scanned.Max / c1.Returned.Max
|
||||
ratio2 := c2.Scanned.Max / c2.Returned.Max
|
||||
return ratio1 < ratio2
|
||||
}
|
||||
case "-ratio":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
ratio1 := c1.Scanned.Max / c1.Returned.Max
|
||||
ratio2 := c2.Scanned.Max / c2.Returned.Max
|
||||
return ratio1 > ratio2
|
||||
@@ -434,31 +436,31 @@ func sortQueries(queries []profiler.QueryStats, orderby []string) []profiler.Que
|
||||
|
||||
//
|
||||
case "query-time":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.QueryTime.Max < c2.QueryTime.Max
|
||||
}
|
||||
case "-query-time":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.QueryTime.Max > c2.QueryTime.Max
|
||||
}
|
||||
|
||||
//
|
||||
case "docs-scanned":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.Scanned.Max < c2.Scanned.Max
|
||||
}
|
||||
case "-docs-scanned":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.Scanned.Max > c2.Scanned.Max
|
||||
}
|
||||
|
||||
//
|
||||
case "docs-returned":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.Returned.Max < c2.Scanned.Max
|
||||
}
|
||||
case "-docs-returned":
|
||||
f = func(c1, c2 *profiler.QueryStats) bool {
|
||||
f = func(c1, c2 *stats.QueryStats) bool {
|
||||
return c1.Returned.Max > c2.Scanned.Max
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user