New profiler and tests

This commit is contained in:
Carlos Salguero
2017-04-09 23:21:27 -03:00
parent 25efaef471
commit 64a5c6d8b6
5 changed files with 735 additions and 722 deletions

View File

@@ -2,23 +2,30 @@ package profiler
import (
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/montanaflynn/stats"
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
"github.com/percona/percona-toolkit/src/go/mongolib/util"
"github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter"
"github.com/prometheus/common/log"
"github.com/percona/pmgo"
)
var (
MAX_DEPTH_LEVEL = 10
DOCS_BUFFER_SIZE = 100
CANNOT_GET_QUERY_ERROR = errors.New("cannot get query field from the profile document (it is not a map)")
// MaxDepthLevel Max recursion level for the fingerprinter
MaxDepthLevel = 10
// DocsBufferSize is the buffer size to store documents from the MongoDB profiler
DocsBufferSize = 100
// ErrCannotGetQuery is the error returned if we cannot find a query into the profiler document
ErrCannotGetQuery = errors.New("cannot get query field from the profile document (it is not a map)")
)
// Times is an array of time.Time that implements the Sorter interface
type Times []time.Time
func (a Times) Len() int { return len(a) }
@@ -31,64 +38,98 @@ type StatsGroupKey struct {
Namespace string
}
type Stat struct {
BlockedTime Times
Count int
Fingerprint string
FirstSeen time.Time
ID string
LastSeen time.Time
LockTime Times
NReturned []float64
NScanned []float64
Namespace string
Operation string
Query map[string]interface{}
QueryTime []float64 // in milliseconds
ResponseLength []float64
TableScan bool
}
type Iter interface {
All(result interface{}) error
Close() error
Err() error
For(result interface{}, f func() error) (err error)
Next(result interface{}) bool
Timeout() bool
type totalCounters struct {
Count int
Scanned float64
Returned float64
QueryTime float64
Bytes float64
}
type Profiler interface {
GetLastError() error
StatsChan() chan []Stat
QueriesChan() chan []QueryInfoAndCounters
TimeoutsChan() <-chan time.Time
Start()
Stop()
}
type Profile struct {
filters []filter.Filter
iterator Iter
ticker chan time.Time
statsChan chan []Stat
stopChan chan bool
docsChan chan proto.SystemProfile
rawStats map[StatsGroupKey]*Stat
keyFilters []string
fingerprinter fingerprinter.Fingerprinter
running bool
lastError error
filters []filter.Filter
iterator pmgo.IterManager
ticker <-chan time.Time
queriesChan chan []QueryInfoAndCounters
stopChan chan bool
docsChan chan proto.SystemProfile
timeoutsChan chan time.Time
queriesInfoAndCounters map[StatsGroupKey]*QueryInfoAndCounters
keyFilters []string
fingerprinter fingerprinter.Fingerprinter
running bool
lastError error
stopWaitGroup sync.WaitGroup
}
func NewProfiler(iterator Iter, filters []filter.Filter, ticker chan time.Time, fp fingerprinter.Fingerprinter) Profiler {
type QueryStats struct {
ID string
Namespace string
Operation string
Query string
Fingerprint string
FirstSeen time.Time
LastSeen time.Time
Count int
QPS float64
Rank int
Ratio float64
QueryTime Statistics
ResponseLength Statistics
Returned Statistics
Scanned Statistics
}
type QueryInfoAndCounters struct {
ID string
Namespace string
Operation string
Query map[string]interface{}
Fingerprint string
FirstSeen time.Time
LastSeen time.Time
TableScan bool
Count int
BlockedTime Times
LockTime Times
NReturned []float64
NScanned []float64
QueryTime []float64 // in milliseconds
ResponseLength []float64
}
type Statistics struct {
Pct float64
Total float64
Min float64
Max float64
Avg float64
Pct95 float64
StdDev float64
Median float64
}
func NewProfiler(iterator pmgo.IterManager, filters []filter.Filter, ticker <-chan time.Time, fp fingerprinter.Fingerprinter) Profiler {
return &Profile{
filters: filters,
fingerprinter: fp,
iterator: iterator,
ticker: ticker,
statsChan: make(chan []Stat),
docsChan: make(chan proto.SystemProfile, DOCS_BUFFER_SIZE),
rawStats: make(map[StatsGroupKey]*Stat),
keyFilters: []string{"^shardVersion$", "^\\$"},
filters: filters,
fingerprinter: fp,
iterator: iterator,
ticker: ticker,
queriesChan: make(chan []QueryInfoAndCounters),
docsChan: make(chan proto.SystemProfile, DocsBufferSize),
timeoutsChan: nil,
queriesInfoAndCounters: make(map[StatsGroupKey]*QueryInfoAndCounters),
keyFilters: []string{"^shardVersion$", "^\\$"},
}
}
@@ -96,37 +137,54 @@ func (p *Profile) GetLastError() error {
return p.lastError
}
func (p *Profile) StatsChan() chan []Stat {
return p.statsChan
func (p *Profile) QueriesChan() chan []QueryInfoAndCounters {
return p.queriesChan
}
func (p *Profile) Start() {
if !p.running {
p.running = true
p.stopChan = make(chan bool)
go p.getData()
}
}
func (p *Profile) Stop() {
if p.running {
select {
case p.stopChan <- true:
default:
}
close(p.timeoutsChan)
// Wait for getData to receive the stop signal
p.stopWaitGroup.Wait()
p.iterator.Close()
close(p.stopChan)
}
}
func (p *Profile) TimeoutsChan() <-chan time.Time {
if p.timeoutsChan == nil {
p.timeoutsChan = make(chan time.Time)
}
return p.timeoutsChan
}
func (p *Profile) getData() {
go p.getDocs()
p.stopWaitGroup.Add(1)
MAIN_GETDATA_LOOP:
for {
select {
case <-p.ticker:
p.statsChan <- statsToArray(p.rawStats)
p.rawStats = make(map[StatsGroupKey]*Stat) // Reset stats
p.queriesChan <- mapToArray(p.queriesInfoAndCounters)
p.queriesInfoAndCounters = make(map[StatsGroupKey]*QueryInfoAndCounters) // Reset stats
case <-p.stopChan:
p.iterator.Close()
break MAIN_GETDATA_LOOP
}
}
p.stopWaitGroup.Done()
}
func (p *Profile) getDocs() {
@@ -134,6 +192,9 @@ func (p *Profile) getDocs() {
for p.iterator.Next(&doc) || p.iterator.Timeout() {
if p.iterator.Timeout() {
if p.timeoutsChan != nil {
p.timeoutsChan <- time.Now().UTC()
}
continue
}
valid := true
@@ -147,52 +208,161 @@ func (p *Profile) getDocs() {
continue
}
if len(doc.Query) > 0 {
fp, err := p.fingerprinter.Fingerprint(doc.Query)
if err != nil {
log.Errorf("cannot get fingerprint: %s", err.Error())
continue
}
var s *Stat
var ok bool
key := StatsGroupKey{
Operation: doc.Op,
Fingerprint: fp,
Namespace: doc.Ns,
}
if s, ok = p.rawStats[key]; !ok {
realQuery, _ := util.GetQueryField(doc.Query)
s = &Stat{
ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))),
Operation: doc.Op,
Fingerprint: fp,
Namespace: doc.Ns,
TableScan: false,
Query: realQuery,
}
p.rawStats[key] = s
}
s.Count++
s.NScanned = append(s.NScanned, float64(doc.DocsExamined))
s.NReturned = append(s.NReturned, float64(doc.Nreturned))
s.QueryTime = append(s.QueryTime, float64(doc.Millis))
s.ResponseLength = append(s.ResponseLength, float64(doc.ResponseLength))
var zeroTime time.Time
if s.FirstSeen == zeroTime || s.FirstSeen.After(doc.Ts) {
s.FirstSeen = doc.Ts
}
if s.LastSeen == zeroTime || s.LastSeen.Before(doc.Ts) {
s.LastSeen = doc.Ts
}
p.ProcessDoc(doc, p.queriesInfoAndCounters)
}
}
p.statsChan <- statsToArray(p.rawStats)
p.running = false
p.stopChan <- true
p.queriesChan <- mapToArray(p.queriesInfoAndCounters)
select {
case p.stopChan <- true:
default:
}
}
func statsToArray(stats map[StatsGroupKey]*Stat) []Stat {
sa := []Stat{}
func (p *Profile) ProcessDoc(doc proto.SystemProfile, stats map[StatsGroupKey]*QueryInfoAndCounters) error {
fp, err := p.fingerprinter.Fingerprint(doc.Query)
if err != nil {
return fmt.Errorf("cannot get fingerprint: %s", err.Error())
}
var s *QueryInfoAndCounters
var ok bool
key := StatsGroupKey{
Operation: doc.Op,
Fingerprint: fp,
Namespace: doc.Ns,
}
if s, ok = p.queriesInfoAndCounters[key]; !ok {
realQuery, _ := util.GetQueryField(doc.Query)
s = &QueryInfoAndCounters{
ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))),
Operation: doc.Op,
Fingerprint: fp,
Namespace: doc.Ns,
TableScan: false,
Query: realQuery,
}
p.queriesInfoAndCounters[key] = s
}
s.Count++
s.NScanned = append(s.NScanned, float64(doc.DocsExamined))
s.NReturned = append(s.NReturned, float64(doc.Nreturned))
s.QueryTime = append(s.QueryTime, float64(doc.Millis))
s.ResponseLength = append(s.ResponseLength, float64(doc.ResponseLength))
var zeroTime time.Time
if s.FirstSeen == zeroTime || s.FirstSeen.After(doc.Ts) {
s.FirstSeen = doc.Ts
}
if s.LastSeen == zeroTime || s.LastSeen.Before(doc.Ts) {
s.LastSeen = doc.Ts
}
return nil
}
func CalcQueriesStats(queries []QueryInfoAndCounters, uptime int64) []QueryStats {
stats := []QueryStats{}
tc := calcTotalCounters(queries)
for _, query := range queries {
queryStats := CountersToStats(query, uptime, tc)
stats = append(stats, queryStats)
}
return stats
}
func CalcTotalQueriesStats(queries []QueryInfoAndCounters, uptime int64) QueryStats {
tc := calcTotalCounters(queries)
totalQueryInfoAndCounters := aggregateCounters(queries)
totalStats := CountersToStats(totalQueryInfoAndCounters, uptime, tc)
return totalStats
}
func CountersToStats(query QueryInfoAndCounters, uptime int64, tc totalCounters) QueryStats {
buf, _ := json.Marshal(query.Query)
queryStats := QueryStats{
Count: query.Count,
ID: query.ID,
Operation: query.Operation,
Query: string(buf),
Fingerprint: query.Fingerprint,
Scanned: calcStats(query.NScanned),
Returned: calcStats(query.NReturned),
QueryTime: calcStats(query.QueryTime),
ResponseLength: calcStats(query.ResponseLength),
FirstSeen: query.FirstSeen,
LastSeen: query.LastSeen,
Namespace: query.Namespace,
QPS: float64(query.Count) / float64(uptime),
}
if tc.Scanned > 0 {
queryStats.Scanned.Pct = queryStats.Scanned.Total * 100 / tc.Scanned
}
if tc.Returned > 0 {
queryStats.Returned.Pct = queryStats.Returned.Total * 100 / tc.Returned
}
if tc.QueryTime > 0 {
queryStats.QueryTime.Pct = queryStats.QueryTime.Total * 100 / tc.QueryTime
}
if tc.Bytes > 0 {
queryStats.ResponseLength.Pct = queryStats.ResponseLength.Total / tc.Bytes
}
if queryStats.Returned.Total > 0 {
queryStats.Ratio = queryStats.Scanned.Total / queryStats.Returned.Total
}
return queryStats
}
func aggregateCounters(queries []QueryInfoAndCounters) QueryInfoAndCounters {
qt := QueryInfoAndCounters{}
for _, query := range queries {
qt.NScanned = append(qt.NScanned, query.NScanned...)
qt.NReturned = append(qt.NReturned, query.NReturned...)
qt.QueryTime = append(qt.QueryTime, query.QueryTime...)
qt.ResponseLength = append(qt.ResponseLength, query.ResponseLength...)
}
return qt
}
func calcTotalCounters(queries []QueryInfoAndCounters) totalCounters {
tc := totalCounters{}
for _, query := range queries {
tc.Count += query.Count
scanned, _ := stats.Sum(query.NScanned)
tc.Scanned += scanned
returned, _ := stats.Sum(query.NReturned)
tc.Returned += returned
queryTime, _ := stats.Sum(query.QueryTime)
tc.QueryTime += queryTime
bytes, _ := stats.Sum(query.ResponseLength)
tc.Bytes += bytes
}
return tc
}
func calcStats(samples []float64) Statistics {
var s Statistics
s.Total, _ = stats.Sum(samples)
s.Min, _ = stats.Min(samples)
s.Max, _ = stats.Max(samples)
s.Avg, _ = stats.Mean(samples)
s.Pct95, _ = stats.PercentileNearestRank(samples, 95)
s.StdDev, _ = stats.StandardDeviation(samples)
s.Median, _ = stats.Median(samples)
return s
}
func mapToArray(stats map[StatsGroupKey]*QueryInfoAndCounters) []QueryInfoAndCounters {
sa := []QueryInfoAndCounters{}
for _, s := range stats {
sa = append(sa, *s)
}

View File

@@ -0,0 +1,273 @@
package profiler
import (
"log"
"os"
"reflect"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/percona/percona-toolkit/src/go/lib/tutil"
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
"github.com/percona/percona-toolkit/src/go/pt-mongodb-query-digest/filter"
"github.com/percona/pmgo/pmgomock"
)
const (
samples = "/src/go/tests/"
)
type testVars struct {
RootPath string
}
var vars testVars
func TestMain(m *testing.M) {
var err error
if vars.RootPath, err = tutil.RootPath(); err != nil {
log.Printf("cannot get root path: %s", err.Error())
os.Exit(1)
}
os.Exit(m.Run())
}
func TestRegularIterator(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
docs := []proto.SystemProfile{}
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs.json", &docs)
if err != nil {
t.Fatalf("cannot load samples: %s", err.Error())
}
iter := pmgomock.NewMockIterManager(ctrl)
gomock.InOrder(
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[0]).Return(true),
iter.EXPECT().Timeout().Return(false),
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[1]).Return(true),
iter.EXPECT().Timeout().Return(false),
iter.EXPECT().Next(gomock.Any()).Return(false),
iter.EXPECT().Timeout().Return(false),
iter.EXPECT().Close(),
)
filters := []filter.Filter{}
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
prof := NewProfiler(iter, filters, nil, fp)
firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914-03:00")
lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:20.214-03:00")
want := []QueryInfoAndCounters{
QueryInfoAndCounters{
ID: "c6466139b21c392acd0699e863b50d81",
Namespace: "samples.col1",
Operation: "query",
Query: map[string]interface{}{
"find": "col1",
"shardVersion": []interface{}{float64(0), "000000000000000000000000"},
},
Fingerprint: "find",
FirstSeen: firstSeen,
LastSeen: lastSeen,
TableScan: false,
Count: 2,
BlockedTime: Times(nil),
LockTime: Times(nil),
NReturned: []float64{50, 75},
NScanned: []float64{100, 75},
QueryTime: []float64{0, 1},
ResponseLength: []float64{1.06123e+06, 1.06123e+06},
},
}
prof.Start()
select {
case queries := <-prof.QueriesChan():
if !reflect.DeepEqual(queries, want) {
t.Errorf("invalid queries. \nGot: %#v,\nWant: %#v\n", queries, want)
}
case <-time.After(2 * time.Second):
t.Error("Didn't get any query")
}
}
func TestIteratorTimeout(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
docs := []proto.SystemProfile{}
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs.json", &docs)
if err != nil {
t.Fatalf("cannot load samples: %s", err.Error())
}
iter := pmgomock.NewMockIterManager(ctrl)
gomock.InOrder(
iter.EXPECT().Next(gomock.Any()).Return(true),
iter.EXPECT().Timeout().Return(true),
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[1]).Return(true),
iter.EXPECT().Timeout().Return(false),
iter.EXPECT().Next(gomock.Any()).Return(false),
iter.EXPECT().Timeout().Return(false),
// When there are no more docs, iterator will close
iter.EXPECT().Close(),
// And we are closing it again (to force the getData go-routine to end)
// at the profiler.Stop() method
iter.EXPECT().Close(),
)
filters := []filter.Filter{}
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
prof := NewProfiler(iter, filters, nil, fp)
firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914-03:00")
lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914-03:00")
want := []QueryInfoAndCounters{
QueryInfoAndCounters{
ID: "c6466139b21c392acd0699e863b50d81",
Namespace: "samples.col1",
Operation: "query",
Query: map[string]interface{}{
"find": "col1",
"shardVersion": []interface{}{float64(0), "000000000000000000000000"},
},
Fingerprint: "find",
FirstSeen: firstSeen,
LastSeen: lastSeen,
TableScan: false,
Count: 1,
BlockedTime: Times(nil),
LockTime: Times(nil),
NReturned: []float64{75},
NScanned: []float64{75},
QueryTime: []float64{1},
ResponseLength: []float64{1.06123e+06},
},
}
prof.Start()
gotTimeout := false
// Get a timeout
select {
case <-prof.TimeoutsChan():
gotTimeout = true
case <-prof.QueriesChan():
t.Error("Got queries before timeout")
case <-time.After(2 * time.Second):
t.Error("Timeout checking timeout")
}
if !gotTimeout {
t.Error("Didn't get a timeout")
}
// After the first document returned a timeout, we should still receive the second document
select {
case queries := <-prof.QueriesChan():
if !reflect.DeepEqual(queries, want) {
t.Errorf("invalid queries. \nGot: %#v,\nWant: %#v\n", queries, want)
}
case <-time.After(2 * time.Second):
t.Error("Didn't get any query after 2 seconds")
}
prof.Stop()
}
func TestTailIterator(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
docs := []proto.SystemProfile{}
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs.json", &docs)
if err != nil {
t.Fatalf("cannot load samples: %s", err.Error())
}
sleep := func(param interface{}) {
time.Sleep(1500 * time.Millisecond)
}
iter := pmgomock.NewMockIterManager(ctrl)
gomock.InOrder(
iter.EXPECT().Next(gomock.Any()).SetArg(0, docs[0]).Return(true),
iter.EXPECT().Timeout().Return(false),
// A Tail iterator will wait if the are no available docs.
// Do a 1500 ms sleep before returning the second doc to simulate a tail wait
// and to let the ticker ticks
iter.EXPECT().Next(gomock.Any()).Do(sleep).SetArg(0, docs[1]).Return(true),
iter.EXPECT().Timeout().Return(false),
iter.EXPECT().Next(gomock.Any()).Return(false),
iter.EXPECT().Timeout().Return(false),
iter.EXPECT().Close(),
)
filters := []filter.Filter{}
ticker := time.NewTicker(time.Second)
fp := fingerprinter.NewFingerprinter(fingerprinter.DEFAULT_KEY_FILTERS)
prof := NewProfiler(iter, filters, ticker.C, fp)
firstSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:20.214-03:00")
lastSeen, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:20.214-03:00")
firstSeen2, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914-03:00")
lastSeen2, _ := time.Parse(time.RFC3339Nano, "2017-04-01T23:01:19.914-03:00")
want := []QueryInfoAndCounters{
QueryInfoAndCounters{
ID: "c6466139b21c392acd0699e863b50d81",
Namespace: "samples.col1",
Operation: "query",
Query: map[string]interface{}{
"find": "col1",
"shardVersion": []interface{}{float64(0), "000000000000000000000000"},
},
Fingerprint: "find",
FirstSeen: firstSeen,
LastSeen: lastSeen,
TableScan: false,
Count: 1,
BlockedTime: Times(nil),
LockTime: Times(nil),
NReturned: []float64{50},
NScanned: []float64{100},
QueryTime: []float64{0},
ResponseLength: []float64{1.06123e+06},
},
QueryInfoAndCounters{
ID: "c6466139b21c392acd0699e863b50d81",
Namespace: "samples.col1",
Operation: "query",
Query: map[string]interface{}{
"find": "col1",
"shardVersion": []interface{}{float64(0), "000000000000000000000000"},
},
Fingerprint: "find",
FirstSeen: firstSeen2,
LastSeen: lastSeen2,
TableScan: false,
Count: 1,
BlockedTime: Times(nil),
LockTime: Times(nil),
NReturned: []float64{75},
NScanned: []float64{75},
QueryTime: []float64{1},
ResponseLength: []float64{1.06123e+06},
},
}
prof.Start()
index := 0
// Since the mocked iterator has a Sleep(1500 ms) between Next methods calls,
// we are going to have two ticker ticks and on every tick it will return one document.
for index < 2 {
select {
case queries := <-prof.QueriesChan():
if !reflect.DeepEqual(queries, []QueryInfoAndCounters{want[index]}) {
t.Errorf("invalid queries. \nGot: %#v,\nWant: %#v\n", queries, want)
}
index++
}
}
}