diff --git a/src/go/mongolib/profiler/profiler.go b/src/go/mongolib/profiler/profiler.go index 9181a4cd..f920a28c 100644 --- a/src/go/mongolib/profiler/profiler.go +++ b/src/go/mongolib/profiler/profiler.go @@ -1,141 +1,63 @@ package profiler import ( - "crypto/md5" - "encoding/json" - "errors" - "fmt" "sync" "time" - "github.com/montanaflynn/stats" - "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" "github.com/percona/percona-toolkit/src/go/mongolib/proto" - "github.com/percona/percona-toolkit/src/go/mongolib/util" + "github.com/percona/percona-toolkit/src/go/mongolib/stats" "github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter" "github.com/percona/pmgo" ) var ( - // MaxDepthLevel Max recursion level for the fingerprinter - MaxDepthLevel = 10 // DocsBufferSize is the buffer size to store documents from the MongoDB profiler DocsBufferSize = 100 - // ErrCannotGetQuery is the error returned if we cannot find a query into the profiler document - ErrCannotGetQuery = errors.New("cannot get query field from the profile document (it is not a map)") ) -// Times is an array of time.Time that implements the Sorter interface -type Times []time.Time - -func (a Times) Len() int { return len(a) } -func (a Times) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a Times) Less(i, j int) bool { return a[i].Before(a[j]) } - -type StatsGroupKey struct { - Operation string - Fingerprint string - Namespace string -} - -type totalCounters struct { - Count int - Scanned float64 - Returned float64 - QueryTime float64 - Bytes float64 -} - type Profiler interface { GetLastError() error - QueriesChan() chan []QueryInfoAndCounters + QueriesChan() chan stats.Queries TimeoutsChan() <-chan time.Time - ProcessDoc(proto.SystemProfile, map[StatsGroupKey]*QueryInfoAndCounters) error Start() Stop() } type Profile struct { - filters []filter.Filter - iterator pmgo.IterManager - ticker <-chan time.Time - queriesChan chan []QueryInfoAndCounters + // dependencies + iterator pmgo.IterManager + filters []filter.Filter + ticker <-chan time.Time + stats Stats + + // internal + queriesChan chan stats.Queries stopChan chan bool docsChan chan proto.SystemProfile timeoutsChan chan time.Time // For the moment ProcessDoc is exportable to it could be called from the "outside" // For that reason, we need a mutex to make it thread safe. In the future this func // will be unexported - countersMapLock sync.Mutex - queriesInfoAndCounters map[StatsGroupKey]*QueryInfoAndCounters - keyFilters []string - fingerprinter fingerprinter.Fingerprinter - lock sync.Mutex - running bool - lastError error - stopWaitGroup sync.WaitGroup + countersMapLock sync.Mutex + keyFilters []string + lock sync.Mutex + running bool + lastError error + stopWaitGroup sync.WaitGroup } -type QueryStats struct { - ID string - Namespace string - Operation string - Query string - Fingerprint string - FirstSeen time.Time - LastSeen time.Time - - Count int - QPS float64 - Rank int - Ratio float64 - QueryTime Statistics - ResponseLength Statistics - Returned Statistics - Scanned Statistics -} - -type QueryInfoAndCounters struct { - ID string - Namespace string - Operation string - Query map[string]interface{} - Fingerprint string - FirstSeen time.Time - LastSeen time.Time - TableScan bool - - Count int - BlockedTime Times - LockTime Times - NReturned []float64 - NScanned []float64 - QueryTime []float64 // in milliseconds - ResponseLength []float64 -} - -type Statistics struct { - Pct float64 - Total float64 - Min float64 - Max float64 - Avg float64 - Pct95 float64 - StdDev float64 - Median float64 -} - -func NewProfiler(iterator pmgo.IterManager, filters []filter.Filter, ticker <-chan time.Time, fp fingerprinter.Fingerprinter) Profiler { +func NewProfiler(iterator pmgo.IterManager, filters []filter.Filter, ticker <-chan time.Time, stats Stats) Profiler { return &Profile{ - filters: filters, - fingerprinter: fp, - iterator: iterator, - ticker: ticker, - queriesChan: make(chan []QueryInfoAndCounters), - docsChan: make(chan proto.SystemProfile, DocsBufferSize), - timeoutsChan: nil, - queriesInfoAndCounters: make(map[StatsGroupKey]*QueryInfoAndCounters), - keyFilters: []string{"^shardVersion$", "^\\$"}, + // dependencies + iterator: iterator, + filters: filters, + ticker: ticker, + stats: stats, + + // internal + docsChan: make(chan proto.SystemProfile, DocsBufferSize), + timeoutsChan: nil, + keyFilters: []string{"^shardVersion$", "^\\$"}, } } @@ -143,7 +65,7 @@ func (p *Profile) GetLastError() error { return p.lastError } -func (p *Profile) QueriesChan() chan []QueryInfoAndCounters { +func (p *Profile) QueriesChan() chan stats.Queries { return p.queriesChan } @@ -152,6 +74,7 @@ func (p *Profile) Start() { defer p.lock.Unlock() if !p.running { p.running = true + p.queriesChan = make(chan stats.Queries) p.stopChan = make(chan bool) go p.getData() } @@ -185,8 +108,8 @@ MAIN_GETDATA_LOOP: for { select { case <-p.ticker: - p.queriesChan <- mapToArray(p.queriesInfoAndCounters) - p.queriesInfoAndCounters = make(map[StatsGroupKey]*QueryInfoAndCounters) // Reset stats + p.queriesChan <- p.stats.Queries() + p.stats.Reset() case <-p.stopChan: // Close the iterator to break the loop on getDocs p.iterator.Close() @@ -217,163 +140,9 @@ func (p *Profile) getDocs() { continue } if len(doc.Query) > 0 { - p.ProcessDoc(doc, p.queriesInfoAndCounters) + p.stats.Add(doc) } } - p.queriesChan <- mapToArray(p.queriesInfoAndCounters) + p.queriesChan <- p.stats.Queries() p.Stop() } - -func (p *Profile) ProcessDoc(doc proto.SystemProfile, stats map[StatsGroupKey]*QueryInfoAndCounters) error { - - fp, err := p.fingerprinter.Fingerprint(doc.Query) - if err != nil { - return fmt.Errorf("cannot get fingerprint: %s", err.Error()) - } - var s *QueryInfoAndCounters - var ok bool - p.countersMapLock.Lock() - defer p.countersMapLock.Unlock() - - key := StatsGroupKey{ - Operation: doc.Op, - Fingerprint: fp, - Namespace: doc.Ns, - } - if s, ok = stats[key]; !ok { - realQuery, _ := util.GetQueryField(doc.Query) - s = &QueryInfoAndCounters{ - ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))), - Operation: doc.Op, - Fingerprint: fp, - Namespace: doc.Ns, - TableScan: false, - Query: realQuery, - } - stats[key] = s - } - s.Count++ - s.NScanned = append(s.NScanned, float64(doc.DocsExamined)) - s.NReturned = append(s.NReturned, float64(doc.Nreturned)) - s.QueryTime = append(s.QueryTime, float64(doc.Millis)) - s.ResponseLength = append(s.ResponseLength, float64(doc.ResponseLength)) - var zeroTime time.Time - if s.FirstSeen == zeroTime || s.FirstSeen.After(doc.Ts) { - s.FirstSeen = doc.Ts - } - if s.LastSeen == zeroTime || s.LastSeen.Before(doc.Ts) { - s.LastSeen = doc.Ts - } - - return nil - -} - -func CalcQueriesStats(queries []QueryInfoAndCounters, uptime int64) []QueryStats { - stats := []QueryStats{} - tc := calcTotalCounters(queries) - - for _, query := range queries { - queryStats := CountersToStats(query, uptime, tc) - stats = append(stats, queryStats) - } - - return stats -} - -func CalcTotalQueriesStats(queries []QueryInfoAndCounters, uptime int64) QueryStats { - tc := calcTotalCounters(queries) - - totalQueryInfoAndCounters := aggregateCounters(queries) - totalStats := CountersToStats(totalQueryInfoAndCounters, uptime, tc) - - return totalStats -} - -func CountersToStats(query QueryInfoAndCounters, uptime int64, tc totalCounters) QueryStats { - buf, _ := json.Marshal(query.Query) - queryStats := QueryStats{ - Count: query.Count, - ID: query.ID, - Operation: query.Operation, - Query: string(buf), - Fingerprint: query.Fingerprint, - Scanned: calcStats(query.NScanned), - Returned: calcStats(query.NReturned), - QueryTime: calcStats(query.QueryTime), - ResponseLength: calcStats(query.ResponseLength), - FirstSeen: query.FirstSeen, - LastSeen: query.LastSeen, - Namespace: query.Namespace, - QPS: float64(query.Count) / float64(uptime), - } - if tc.Scanned > 0 { - queryStats.Scanned.Pct = queryStats.Scanned.Total * 100 / tc.Scanned - } - if tc.Returned > 0 { - queryStats.Returned.Pct = queryStats.Returned.Total * 100 / tc.Returned - } - if tc.QueryTime > 0 { - queryStats.QueryTime.Pct = queryStats.QueryTime.Total * 100 / tc.QueryTime - } - if tc.Bytes > 0 { - queryStats.ResponseLength.Pct = queryStats.ResponseLength.Total / tc.Bytes - } - if queryStats.Returned.Total > 0 { - queryStats.Ratio = queryStats.Scanned.Total / queryStats.Returned.Total - } - - return queryStats -} - -func aggregateCounters(queries []QueryInfoAndCounters) QueryInfoAndCounters { - qt := QueryInfoAndCounters{} - for _, query := range queries { - qt.NScanned = append(qt.NScanned, query.NScanned...) - qt.NReturned = append(qt.NReturned, query.NReturned...) - qt.QueryTime = append(qt.QueryTime, query.QueryTime...) - qt.ResponseLength = append(qt.ResponseLength, query.ResponseLength...) - } - return qt -} - -func calcTotalCounters(queries []QueryInfoAndCounters) totalCounters { - tc := totalCounters{} - - for _, query := range queries { - tc.Count += query.Count - - scanned, _ := stats.Sum(query.NScanned) - tc.Scanned += scanned - - returned, _ := stats.Sum(query.NReturned) - tc.Returned += returned - - queryTime, _ := stats.Sum(query.QueryTime) - tc.QueryTime += queryTime - - bytes, _ := stats.Sum(query.ResponseLength) - tc.Bytes += bytes - } - return tc -} - -func calcStats(samples []float64) Statistics { - var s Statistics - s.Total, _ = stats.Sum(samples) - s.Min, _ = stats.Min(samples) - s.Max, _ = stats.Max(samples) - s.Avg, _ = stats.Mean(samples) - s.Pct95, _ = stats.PercentileNearestRank(samples, 95) - s.StdDev, _ = stats.StandardDeviation(samples) - s.Median, _ = stats.Median(samples) - return s -} - -func mapToArray(stats map[StatsGroupKey]*QueryInfoAndCounters) []QueryInfoAndCounters { - sa := []QueryInfoAndCounters{} - for _, s := range stats { - sa = append(sa, *s) - } - return sa -} diff --git a/src/go/mongolib/profiler/profiler_test.go b/src/go/mongolib/profiler/profiler_test.go index 5182e82f..c3289396 100644 --- a/src/go/mongolib/profiler/profiler_test.go +++ b/src/go/mongolib/profiler/profiler_test.go @@ -12,6 +12,7 @@ import ( "github.com/percona/percona-toolkit/src/go/lib/tutil" "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "github.com/percona/percona-toolkit/src/go/mongolib/stats" "github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter" "github.com/percona/pmgo/pmgomock" ) @@ -62,12 +63,13 @@ func TestRegularIterator(t *testing.T) { ) filters := []filter.Filter{} fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, nil, fp) + s := stats.New(fp) + prof := NewProfiler(iter, filters, nil, s) firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914+00:00") lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:20.214+00:00") - want := []QueryInfoAndCounters{ - QueryInfoAndCounters{ + want := stats.Queries{ + { ID: "c6466139b21c392acd0699e863b50d81", Namespace: "samples.col1", Operation: "query", @@ -80,8 +82,6 @@ func TestRegularIterator(t *testing.T) { LastSeen: lastSeen, TableScan: false, Count: 2, - BlockedTime: Times(nil), - LockTime: Times(nil), NReturned: []float64{50, 75}, NScanned: []float64{100, 75}, QueryTime: []float64{0, 1}, @@ -123,12 +123,13 @@ func TestIteratorTimeout(t *testing.T) { filters := []filter.Filter{} fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, nil, fp) + s := stats.New(fp) + prof := NewProfiler(iter, filters, nil, s) firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914+00:00") lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914+00:00") - want := []QueryInfoAndCounters{ - QueryInfoAndCounters{ + want := stats.Queries{ + { ID: "c6466139b21c392acd0699e863b50d81", Namespace: "samples.col1", Operation: "query", @@ -141,8 +142,6 @@ func TestIteratorTimeout(t *testing.T) { LastSeen: lastSeen, TableScan: false, Count: 1, - BlockedTime: Times(nil), - LockTime: Times(nil), NReturned: []float64{75}, NScanned: []float64{75}, QueryTime: []float64{1}, @@ -210,10 +209,11 @@ func TestTailIterator(t *testing.T) { filters := []filter.Filter{} ticker := time.NewTicker(time.Second) fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, ticker.C, fp) + s := stats.New(fp) + prof := NewProfiler(iter, filters, ticker.C, s) - want := []QueryInfoAndCounters{ - QueryInfoAndCounters{ + want := stats.Queries{ + { ID: "c6466139b21c392acd0699e863b50d81", Namespace: "samples.col1", Operation: "query", @@ -226,14 +226,12 @@ func TestTailIterator(t *testing.T) { LastSeen: parseDate("2017-04-01T23:01:20.214+00:00"), TableScan: false, Count: 1, - BlockedTime: Times(nil), - LockTime: Times(nil), NReturned: []float64{50}, NScanned: []float64{100}, QueryTime: []float64{0}, ResponseLength: []float64{1.06123e+06}, }, - QueryInfoAndCounters{ + { ID: "c6466139b21c392acd0699e863b50d81", Namespace: "samples.col1", Operation: "query", @@ -246,8 +244,6 @@ func TestTailIterator(t *testing.T) { LastSeen: parseDate("2017-04-01T23:01:19.914+00:00"), TableScan: false, Count: 1, - BlockedTime: Times(nil), - LockTime: Times(nil), NReturned: []float64{75}, NScanned: []float64{75}, QueryTime: []float64{1}, @@ -261,7 +257,7 @@ func TestTailIterator(t *testing.T) { for index < 2 { select { case queries := <-prof.QueriesChan(): - if !reflect.DeepEqual(queries, []QueryInfoAndCounters{want[index]}) { + if !reflect.DeepEqual(queries, stats.Queries{want[index]}) { t.Errorf("invalid queries. \nGot: %#v,\nWant: %#v\n", queries, want) } index++ @@ -279,7 +275,7 @@ func TestCalcStats(t *testing.T) { t.Fatalf("cannot load samples: %s", err.Error()) } - want := []QueryStats{} + want := []stats.QueryStats{} err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.want.json", &want) if err != nil { t.Fatalf("cannot load expected results: %s", err.Error()) @@ -300,17 +296,18 @@ func TestCalcStats(t *testing.T) { filters := []filter.Filter{} fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, nil, fp) + s := stats.New(fp) + prof := NewProfiler(iter, filters, nil, s) prof.Start() select { case queries := <-prof.QueriesChan(): - stats := CalcQueriesStats(queries, 1) + s := queries.CalcQueriesStats(1) if os.Getenv("UPDATE_SAMPLES") != "" { - tutil.WriteJson(vars.RootPath+samples+"profiler_docs_stats.want.json", stats) + tutil.WriteJson(vars.RootPath+samples+"profiler_docs_stats.want.json", s) } - if !reflect.DeepEqual(stats, want) { - t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", stats, want) + if !reflect.DeepEqual(s, want) { + t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", s, want) } case <-time.After(2 * time.Second): t.Error("Didn't get any query") @@ -327,7 +324,7 @@ func TestCalcTotalStats(t *testing.T) { t.Fatalf("cannot load samples: %s", err.Error()) } - want := QueryStats{} + want := stats.QueryStats{} err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", &want) if err != nil && !tutil.ShouldUpdateSamples() { t.Fatalf("cannot load expected results: %s", err.Error()) @@ -348,206 +345,24 @@ func TestCalcTotalStats(t *testing.T) { filters := []filter.Filter{} fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, nil, fp) + s := stats.New(fp) + prof := NewProfiler(iter, filters, nil, s) prof.Start() select { case queries := <-prof.QueriesChan(): - stats := CalcTotalQueriesStats(queries, 1) + s := queries.CalcTotalQueriesStats(1) if os.Getenv("UPDATE_SAMPLES") != "" { fmt.Println("Updating samples: " + vars.RootPath + samples + "profiler_docs_total_stats.want.json") - err := tutil.WriteJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", stats) + err := tutil.WriteJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", s) if err != nil { fmt.Printf("cannot update samples: %s", err.Error()) } } - if !reflect.DeepEqual(stats, want) { - t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", stats, want) + if !reflect.DeepEqual(s, want) { + t.Errorf("Invalid stats.\nGot:%#v\nWant: %#v\n", s, want) } case <-time.After(2 * time.Second): t.Error("Didn't get any query") } } - -func TestCalcTotalCounters(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - docs := []proto.SystemProfile{} - err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs) - if err != nil { - t.Fatalf("cannot load samples: %s", err.Error()) - } - - want := totalCounters{} - err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_total_counters.want.json", &want) - if err != nil && !tutil.ShouldUpdateSamples() { - t.Fatalf("cannot load expected results: %s", err.Error()) - } - - iter := pmgomock.NewMockIterManager(ctrl) - gomock.InOrder( - iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[0]).Return(true), - iter.EXPECT().Timeout().Return(false), - iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[1]).Return(true), - iter.EXPECT().Timeout().Return(false), - iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[2]).Return(true), - iter.EXPECT().Timeout().Return(false), - iter.EXPECT().Next(gomock.Any()).Return(false), - iter.EXPECT().Timeout().Return(false), - iter.EXPECT().Close(), - ) - - filters := []filter.Filter{} - fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, nil, fp) - - prof.Start() - select { - case queries := <-prof.QueriesChan(): - counters := calcTotalCounters(queries) - if tutil.ShouldUpdateSamples() { - fmt.Println("Updating samples: " + vars.RootPath + samples + "profiler_docs_total_counters.want.json") - err := tutil.WriteJson(vars.RootPath+samples+"profiler_docs_total_counters.want.json", counters) - if err != nil { - fmt.Printf("cannot update samples: %s", err.Error()) - } - } - if !reflect.DeepEqual(counters, want) { - t.Errorf("Invalid counters.\nGot:%#v\nWant: %#v\n", counters, want) - } - case <-time.After(2 * time.Second): - t.Error("Didn't get any query") - } -} - -func TestProcessDoc(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - docs := []proto.SystemProfile{} - err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs) - if err != nil { - t.Fatalf("cannot load samples: %s", err.Error()) - } - - iter := pmgomock.NewMockIterManager(ctrl) - filters := []filter.Filter{} - fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := NewProfiler(iter, filters, nil, fp) - - var stats = make(map[StatsGroupKey]*QueryInfoAndCounters) - - err = prof.ProcessDoc(docs[1], stats) - if err != nil { - t.Errorf("Error processing doc: %s\n", err.Error()) - } - statsKey := StatsGroupKey{Operation: "query", Fingerprint: "s2", Namespace: "samples.col1"} - statsVal := &QueryInfoAndCounters{ - ID: "84e09ef6a3dc35f472df05fa98eee7d3", - Namespace: "samples.col1", - Operation: "query", - Query: map[string]interface{}{"s2": map[string]interface{}{"$gte": "41991", "$lt": "33754"}}, - Fingerprint: "s2", - FirstSeen: parseDate("2017-04-10T13:15:53.532-03:00"), - LastSeen: parseDate("2017-04-10T13:15:53.532-03:00"), - TableScan: false, - Count: 1, - BlockedTime: nil, - LockTime: nil, - NReturned: []float64{0}, - NScanned: []float64{10000}, - QueryTime: []float64{7}, - ResponseLength: []float64{215}, - } - - want := map[StatsGroupKey]*QueryInfoAndCounters{statsKey: statsVal} - - if !reflect.DeepEqual(stats, want) { - t.Errorf("Error in ProcessDoc.\nGot:%#v\nWant: %#v\n", stats, want) - } -} - -func TestTimesLen(t *testing.T) { - tests := []struct { - name string - a times - want int - }{ - { - name: "Times.Len", - a: []time.Time{time.Now()}, - want: 1, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.a.Len(); got != tt.want { - t.Errorf("times.Len() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestTimesSwap(t *testing.T) { - type args struct { - i int - j int - } - t1 := time.Now() - t2 := t1.Add(1 * time.Minute) - tests := []struct { - name string - a times - args args - }{ - { - name: "Times.Swap", - a: times{t1, t2}, - args: args{i: 0, j: 1}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.a.Swap(tt.args.i, tt.args.j) - if tt.a[0] != t2 || tt.a[1] != t1 { - t.Errorf("%s has (%v, %v) want (%v, %v)", tt.name, tt.a[0], tt.a[1], t2, t1) - } - }) - } -} - -func TestTimesLess(t *testing.T) { - type args struct { - i int - j int - } - t1 := time.Now() - t2 := t1.Add(1 * time.Minute) - tests := []struct { - name string - a times - args args - want bool - }{ - { - name: "Times.Swap", - a: times{t1, t2}, - args: args{i: 0, j: 1}, - want: true, - }, - { - name: "Times.Swap", - a: times{t2, t1}, - args: args{i: 0, j: 1}, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.a.Less(tt.args.i, tt.args.j); got != tt.want { - t.Errorf("times.Less() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/src/go/mongolib/profiler/stats.go b/src/go/mongolib/profiler/stats.go new file mode 100644 index 00000000..cf665ce2 --- /dev/null +++ b/src/go/mongolib/profiler/stats.go @@ -0,0 +1,12 @@ +package profiler + +import ( + "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "github.com/percona/percona-toolkit/src/go/mongolib/stats" +) + +type Stats interface { + Reset() + Add(doc proto.SystemProfile) error + Queries() stats.Queries +} diff --git a/src/go/mongolib/stats/stats.go b/src/go/mongolib/stats/stats.go new file mode 100644 index 00000000..a8db0461 --- /dev/null +++ b/src/go/mongolib/stats/stats.go @@ -0,0 +1,268 @@ +package stats + +import ( + "crypto/md5" + "encoding/json" + "fmt" + "time" + + "github.com/montanaflynn/stats" + "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" + "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "github.com/percona/percona-toolkit/src/go/mongolib/util" +) + +// New creates new instance of stats with given fingerprinter +func New(fingerprinter fingerprinter.Fingerprinter) *Stats { + s := &Stats{ + fingerprinter: fingerprinter, + } + + s.Reset() + return s +} + +// Stats is a collection of MongoDB statistics +type Stats struct { + // dependencies + fingerprinter fingerprinter.Fingerprinter + + // internal + queryInfoAndCounters map[GroupKey]*QueryInfoAndCounters +} + +// Reset clears the collection of statistics +func (s *Stats) Reset() { + s.queryInfoAndCounters = make(map[GroupKey]*QueryInfoAndCounters) +} + +// Add adds proto.SystemProfile to the collection of statistics +func (s *Stats) Add(doc proto.SystemProfile) error { + fp, err := s.fingerprinter.Fingerprint(doc.Query) + if err != nil { + return fmt.Errorf("cannot get fingerprint: %s", err) + } + var qiac *QueryInfoAndCounters + var ok bool + + key := GroupKey{ + Operation: doc.Op, + Fingerprint: fp, + Namespace: doc.Ns, + } + if qiac, ok = s.queryInfoAndCounters[key]; !ok { + realQuery, _ := util.GetQueryField(doc.Query) + qiac = &QueryInfoAndCounters{ + ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))), + Operation: doc.Op, + Fingerprint: fp, + Namespace: doc.Ns, + TableScan: false, + Query: realQuery, + } + s.queryInfoAndCounters[key] = qiac + } + qiac.Count++ + qiac.NScanned = append(qiac.NScanned, float64(doc.DocsExamined)) + qiac.NReturned = append(qiac.NReturned, float64(doc.Nreturned)) + qiac.QueryTime = append(qiac.QueryTime, float64(doc.Millis)) + qiac.ResponseLength = append(qiac.ResponseLength, float64(doc.ResponseLength)) + var zeroTime time.Time + if qiac.FirstSeen == zeroTime || qiac.FirstSeen.After(doc.Ts) { + qiac.FirstSeen = doc.Ts + } + if qiac.LastSeen == zeroTime || qiac.LastSeen.Before(doc.Ts) { + qiac.LastSeen = doc.Ts + } + + return nil +} + +// Queries returns all collected statistics +func (s *Stats) Queries() Queries { + return mapToArray(s.queryInfoAndCounters) +} + +// Queries is a slice of MongoDB statistics +type Queries []QueryInfoAndCounters + +// CalcQueriesStats calculates QueryStats for given uptime +func (q Queries) CalcQueriesStats(uptime int64) []QueryStats { + qs := []QueryStats{} + tc := calcTotalCounters(q) + + for _, query := range q { + queryStats := countersToStats(query, uptime, tc) + qs = append(qs, queryStats) + } + + return qs +} + +// CalcTotalQueriesStats calculates total QueryStats for given uptime +func (q Queries) CalcTotalQueriesStats(uptime int64) QueryStats { + tc := calcTotalCounters(q) + + totalQueryInfoAndCounters := aggregateCounters(q) + totalStats := countersToStats(totalQueryInfoAndCounters, uptime, tc) + + return totalStats +} + +// times is an array of time.Time that implements the Sorter interface +type Times []time.Time + +func (a Times) Len() int { return len(a) } +func (a Times) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a Times) Less(i, j int) bool { return a[i].Before(a[j]) } + +type GroupKey struct { + Operation string + Fingerprint string + Namespace string +} + +type totalCounters struct { + Count int + Scanned float64 + Returned float64 + QueryTime float64 + Bytes float64 +} + +type QueryStats struct { + ID string + Namespace string + Operation string + Query string + Fingerprint string + FirstSeen time.Time + LastSeen time.Time + + Count int + QPS float64 + Rank int + Ratio float64 + QueryTime Statistics + ResponseLength Statistics + Returned Statistics + Scanned Statistics +} + +type QueryInfoAndCounters struct { + ID string + Namespace string + Operation string + Query map[string]interface{} + Fingerprint string + FirstSeen time.Time + LastSeen time.Time + TableScan bool + + Count int + BlockedTime Times + LockTime Times + NReturned []float64 + NScanned []float64 + QueryTime []float64 // in milliseconds + ResponseLength []float64 +} + +type Statistics struct { + Pct float64 + Total float64 + Min float64 + Max float64 + Avg float64 + Pct95 float64 + StdDev float64 + Median float64 +} + +func countersToStats(query QueryInfoAndCounters, uptime int64, tc totalCounters) QueryStats { + buf, _ := json.Marshal(query.Query) + queryStats := QueryStats{ + Count: query.Count, + ID: query.ID, + Operation: query.Operation, + Query: string(buf), + Fingerprint: query.Fingerprint, + Scanned: calcStats(query.NScanned), + Returned: calcStats(query.NReturned), + QueryTime: calcStats(query.QueryTime), + ResponseLength: calcStats(query.ResponseLength), + FirstSeen: query.FirstSeen, + LastSeen: query.LastSeen, + Namespace: query.Namespace, + QPS: float64(query.Count) / float64(uptime), + } + if tc.Scanned > 0 { + queryStats.Scanned.Pct = queryStats.Scanned.Total * 100 / tc.Scanned + } + if tc.Returned > 0 { + queryStats.Returned.Pct = queryStats.Returned.Total * 100 / tc.Returned + } + if tc.QueryTime > 0 { + queryStats.QueryTime.Pct = queryStats.QueryTime.Total * 100 / tc.QueryTime + } + if tc.Bytes > 0 { + queryStats.ResponseLength.Pct = queryStats.ResponseLength.Total / tc.Bytes + } + if queryStats.Returned.Total > 0 { + queryStats.Ratio = queryStats.Scanned.Total / queryStats.Returned.Total + } + + return queryStats +} + +func aggregateCounters(queries []QueryInfoAndCounters) QueryInfoAndCounters { + qt := QueryInfoAndCounters{} + for _, query := range queries { + qt.NScanned = append(qt.NScanned, query.NScanned...) + qt.NReturned = append(qt.NReturned, query.NReturned...) + qt.QueryTime = append(qt.QueryTime, query.QueryTime...) + qt.ResponseLength = append(qt.ResponseLength, query.ResponseLength...) + } + return qt +} + +func calcTotalCounters(queries []QueryInfoAndCounters) totalCounters { + tc := totalCounters{} + + for _, query := range queries { + tc.Count += query.Count + + scanned, _ := stats.Sum(query.NScanned) + tc.Scanned += scanned + + returned, _ := stats.Sum(query.NReturned) + tc.Returned += returned + + queryTime, _ := stats.Sum(query.QueryTime) + tc.QueryTime += queryTime + + bytes, _ := stats.Sum(query.ResponseLength) + tc.Bytes += bytes + } + return tc +} + +func calcStats(samples []float64) Statistics { + var s Statistics + s.Total, _ = stats.Sum(samples) + s.Min, _ = stats.Min(samples) + s.Max, _ = stats.Max(samples) + s.Avg, _ = stats.Mean(samples) + s.Pct95, _ = stats.PercentileNearestRank(samples, 95) + s.StdDev, _ = stats.StandardDeviation(samples) + s.Median, _ = stats.Median(samples) + return s +} + +func mapToArray(stats map[GroupKey]*QueryInfoAndCounters) []QueryInfoAndCounters { + sa := []QueryInfoAndCounters{} + for _, s := range stats { + sa = append(sa, *s) + } + return sa +} diff --git a/src/go/mongolib/stats/stats_test.go b/src/go/mongolib/stats/stats_test.go new file mode 100644 index 00000000..d1e4b3f8 --- /dev/null +++ b/src/go/mongolib/stats/stats_test.go @@ -0,0 +1,166 @@ +package stats + +import ( + "github.com/golang/mock/gomock" + "github.com/percona/percona-toolkit/src/go/lib/tutil" + "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" + "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "log" + "os" + "reflect" + "testing" + "time" +) + +const ( + samples = "/src/go/tests/" +) + +type testVars struct { + RootPath string +} + +var vars testVars + +func parseDate(dateStr string) time.Time { + date, _ := time.Parse(time.RFC3339Nano, dateStr) + return date +} + +func TestMain(m *testing.M) { + var err error + if vars.RootPath, err = tutil.RootPath(); err != nil { + log.Printf("cannot get root path: %s", err.Error()) + os.Exit(1) + } + os.Exit(m.Run()) +} + +func TestTimesLen(t *testing.T) { + tests := []struct { + name string + a Times + want int + }{ + { + name: "Times.Len", + a: []time.Time{time.Now()}, + want: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.a.Len(); got != tt.want { + t.Errorf("times.Len() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTimesSwap(t *testing.T) { + type args struct { + i int + j int + } + t1 := time.Now() + t2 := t1.Add(1 * time.Minute) + tests := []struct { + name string + a Times + args args + }{ + { + name: "Times.Swap", + a: Times{t1, t2}, + args: args{i: 0, j: 1}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.a.Swap(tt.args.i, tt.args.j) + if tt.a[0] != t2 || tt.a[1] != t1 { + t.Errorf("%s has (%v, %v) want (%v, %v)", tt.name, tt.a[0], tt.a[1], t2, t1) + } + }) + } +} + +func TestTimesLess(t *testing.T) { + type args struct { + i int + j int + } + t1 := time.Now() + t2 := t1.Add(1 * time.Minute) + tests := []struct { + name string + a Times + args args + want bool + }{ + { + name: "Times.Swap", + a: Times{t1, t2}, + args: args{i: 0, j: 1}, + want: true, + }, + { + name: "Times.Swap", + a: Times{t2, t1}, + args: args{i: 0, j: 1}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.a.Less(tt.args.i, tt.args.j); got != tt.want { + t.Errorf("times.Less() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestStats(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + docs := []proto.SystemProfile{} + err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs) + if err != nil { + t.Fatalf("cannot load samples: %s", err.Error()) + } + + fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) + s := New(fp) + + err = s.Add(docs[1]) + if err != nil { + t.Errorf("Error processing doc: %s\n", err.Error()) + } + statsVal := QueryInfoAndCounters{ + ID: "84e09ef6a3dc35f472df05fa98eee7d3", + Namespace: "samples.col1", + Operation: "query", + Query: map[string]interface{}{"s2": map[string]interface{}{"$gte": "41991", "$lt": "33754"}}, + Fingerprint: "s2", + FirstSeen: parseDate("2017-04-10T13:15:53.532-03:00"), + LastSeen: parseDate("2017-04-10T13:15:53.532-03:00"), + TableScan: false, + Count: 1, + BlockedTime: nil, + LockTime: nil, + NReturned: []float64{0}, + NScanned: []float64{10000}, + QueryTime: []float64{7}, + ResponseLength: []float64{215}, + } + + want := Queries{ + statsVal, + } + got := s.Queries() + + if !reflect.DeepEqual(got, want) { + t.Errorf("Error \nGot:%#v\nWant: %#v\n", got, want) + } +} diff --git a/src/go/mongolib/util/main_test.go b/src/go/mongolib/util/main_test.go index 9521e6fb..60de575e 100644 --- a/src/go/mongolib/util/main_test.go +++ b/src/go/mongolib/util/main_test.go @@ -4,13 +4,13 @@ import ( "reflect" "testing" - mgo "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" - "github.com/golang/mock/gomock" "github.com/percona/percona-toolkit/src/go/lib/tutil" "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "github.com/percona/pmgo" "github.com/percona/pmgo/pmgomock" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" ) // OK @@ -145,9 +145,7 @@ func TestGetReplicasetMembers(t *testing.T) { database.EXPECT().Run(bson.D{{"serverStatus", 1}, {"recordStats", 1}}, gomock.Any()).SetArg(1, ss) session.EXPECT().Close() - session.EXPECT().Close() - - di := &mgo.DialInfo{Addrs: []string{"localhost"}} + di := &pmgo.DialInfo{Addrs: []string{"localhost"}} rss, err := GetReplicasetMembers(dialer, di) if err != nil { t.Errorf("getReplicasetMembers: %v", err) @@ -166,26 +164,67 @@ func TestGetHostnames(t *testing.T) { dialer := pmgomock.NewMockDialer(ctrl) session := pmgomock.NewMockSessionManager(ctrl) - mockShardsInfo := proto.ShardsInfo{ - Shards: []proto.Shard{ - proto.Shard{ - ID: "r1", - Host: "r1/localhost:17001,localhost:17002,localhost:17003", - }, - proto.Shard{ - ID: "r2", - Host: "r2/localhost:18001,localhost:18002,localhost:18003", - }, - }, - OK: 1, + mockrss := proto.ReplicaSetStatus{ + Date: "", + MyState: 1, + Term: 0, + HeartbeatIntervalMillis: 0, + Members: []proto.Members{ + proto.Members{ + Optime: nil, + OptimeDate: "", + InfoMessage: "", + ID: 0, + Name: "localhost:17001", + Health: 1, + StateStr: "PRIMARY", + Uptime: 113287, + ConfigVersion: 1, + Self: true, + State: 1, + ElectionTime: 6340960613392449537, + ElectionDate: "", + Set: ""}, + proto.Members{ + Optime: nil, + OptimeDate: "", + InfoMessage: "", + ID: 1, + Name: "localhost:17002", + Health: 1, + StateStr: "SECONDARY", + Uptime: 113031, + ConfigVersion: 1, + Self: false, + State: 2, + ElectionTime: 0, + ElectionDate: "", + Set: ""}, + proto.Members{ + Optime: nil, + OptimeDate: "", + InfoMessage: "", + ID: 2, + Name: "localhost:17003", + Health: 1, + StateStr: "SECONDARY", + Uptime: 113031, + ConfigVersion: 1, + Self: false, + State: 2, + ElectionTime: 0, + ElectionDate: "", + Set: ""}}, + Ok: 1, + Set: "r1", } dialer.EXPECT().DialWithInfo(gomock.Any()).Return(session, nil) - session.EXPECT().Run("getShardMap", gomock.Any()).SetArg(1, mockShardsInfo) - session.EXPECT().Close() + session.EXPECT().SetMode(mgo.Monotonic, true) + session.EXPECT().Run(bson.M{"replSetGetStatus": 1}, gomock.Any()).SetArg(1, mockrss) - expect := []string{"localhost", "localhost:17001", "localhost:18001"} - di := &mgo.DialInfo{Addrs: []string{"localhost"}} + expect := []string{"localhost:17001", "localhost:17002", "localhost:17003"} + di := &pmgo.DialInfo{Addrs: []string{"localhost"}} rss, err := GetHostnames(dialer, di) if err != nil { t.Errorf("getHostnames: %v", err) diff --git a/src/go/pt-mongodb-query-digest/main.go b/src/go/pt-mongodb-query-digest/main.go index 77806d9f..d6548fe1 100644 --- a/src/go/pt-mongodb-query-digest/main.go +++ b/src/go/pt-mongodb-query-digest/main.go @@ -15,6 +15,7 @@ import ( "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" "github.com/percona/percona-toolkit/src/go/mongolib/profiler" "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "github.com/percona/percona-toolkit/src/go/mongolib/stats" "github.com/percona/percona-toolkit/src/go/mongolib/util" "github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter" "github.com/percona/pmgo" @@ -136,18 +137,19 @@ func main() { i := session.DB(di.Database).C("system.profile").Find(query).Sort("-$natural").Iter() fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS) - prof := profiler.NewProfiler(i, filters, nil, fp) + s := stats.New(fp) + prof := profiler.NewProfiler(i, filters, nil, s) prof.Start() queries := <-prof.QueriesChan() uptime := uptime(session) - queriesStats := profiler.CalcQueriesStats(queries, uptime) + queriesStats := queries.CalcQueriesStats(uptime) sortedQueryStats := sortQueries(queriesStats, opts.OrderBy) printHeader(opts) - queryTotals := profiler.CalcTotalQueriesStats(queries, uptime) + queryTotals := queries.CalcTotalQueriesStats(uptime) tt, _ := template.New("query").Funcs(template.FuncMap{ "Format": format, }).Parse(getTotalsTemplate()) @@ -347,15 +349,15 @@ func getTotalsTemplate() string { return t } -type lessFunc func(p1, p2 *profiler.QueryStats) bool +type lessFunc func(p1, p2 *stats.QueryStats) bool type multiSorter struct { - queries []profiler.QueryStats + queries []stats.QueryStats less []lessFunc } // Sort sorts the argument slice according to the less functions passed to OrderedBy. -func (ms *multiSorter) Sort(queries []profiler.QueryStats) { +func (ms *multiSorter) Sort(queries []stats.QueryStats) { ms.queries = queries sort.Sort(ms) } @@ -404,29 +406,29 @@ func (ms *multiSorter) Less(i, j int) bool { return ms.less[k](p, q) } -func sortQueries(queries []profiler.QueryStats, orderby []string) []profiler.QueryStats { +func sortQueries(queries []stats.QueryStats, orderby []string) []stats.QueryStats { sortFuncs := []lessFunc{} for _, field := range orderby { var f lessFunc switch field { // case "count": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.Count < c2.Count } case "-count": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.Count > c2.Count } case "ratio": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { ratio1 := c1.Scanned.Max / c1.Returned.Max ratio2 := c2.Scanned.Max / c2.Returned.Max return ratio1 < ratio2 } case "-ratio": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { ratio1 := c1.Scanned.Max / c1.Returned.Max ratio2 := c2.Scanned.Max / c2.Returned.Max return ratio1 > ratio2 @@ -434,31 +436,31 @@ func sortQueries(queries []profiler.QueryStats, orderby []string) []profiler.Que // case "query-time": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.QueryTime.Max < c2.QueryTime.Max } case "-query-time": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.QueryTime.Max > c2.QueryTime.Max } // case "docs-scanned": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.Scanned.Max < c2.Scanned.Max } case "-docs-scanned": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.Scanned.Max > c2.Scanned.Max } // case "docs-returned": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.Returned.Max < c2.Scanned.Max } case "-docs-returned": - f = func(c1, c2 *profiler.QueryStats) bool { + f = func(c1, c2 *stats.QueryStats) bool { return c1.Returned.Max > c2.Scanned.Max } }