From 25efaef471cdc610fbec95e455b30409039869a1 Mon Sep 17 00:00:00 2001 From: Carlos Salguero Date: Tue, 4 Apr 2017 15:13:31 -0300 Subject: [PATCH] WIP --- src/go/mongolib/profiler/profiler.go | 117 +++++++++++++++------------ 1 file changed, 67 insertions(+), 50 deletions(-) diff --git a/src/go/mongolib/profiler/profiler.go b/src/go/mongolib/profiler/profiler.go index 2f90aa27..02789f90 100644 --- a/src/go/mongolib/profiler/profiler.go +++ b/src/go/mongolib/profiler/profiler.go @@ -15,6 +15,7 @@ import ( var ( MAX_DEPTH_LEVEL = 10 + DOCS_BUFFER_SIZE = 100 CANNOT_GET_QUERY_ERROR = errors.New("cannot get query field from the profile document (it is not a map)") ) @@ -58,6 +59,7 @@ type Iter interface { } type Profiler interface { + GetLastError() error StatsChan() chan []Stat Start() Stop() @@ -69,10 +71,12 @@ type Profile struct { ticker chan time.Time statsChan chan []Stat stopChan chan bool - stats []Stat + docsChan chan proto.SystemProfile + rawStats map[StatsGroupKey]*Stat keyFilters []string fingerprinter fingerprinter.Fingerprinter running bool + lastError error } func NewProfiler(iterator Iter, filters []filter.Filter, ticker chan time.Time, fp fingerprinter.Fingerprinter) Profiler { @@ -82,11 +86,16 @@ func NewProfiler(iterator Iter, filters []filter.Filter, ticker chan time.Time, iterator: iterator, ticker: ticker, statsChan: make(chan []Stat), - stats: make([]Stat, 100), + docsChan: make(chan proto.SystemProfile, DOCS_BUFFER_SIZE), + rawStats: make(map[StatsGroupKey]*Stat), keyFilters: []string{"^shardVersion$", "^\\$"}, } } +func (p *Profile) GetLastError() error { + return p.lastError +} + func (p *Profile) StatsChan() chan []Stat { return p.statsChan } @@ -100,16 +109,33 @@ func (p *Profile) Start() { func (p *Profile) Stop() { if p.running { - p.stopChan <- true + p.iterator.Close() + close(p.stopChan) } } func (p *Profile) getData() { - var doc proto.SystemProfile - stop := false - stats := make(map[StatsGroupKey]*Stat) + go p.getDocs() +MAIN_GETDATA_LOOP: + for { + select { + case <-p.ticker: + p.statsChan <- statsToArray(p.rawStats) + p.rawStats = make(map[StatsGroupKey]*Stat) // Reset stats + case <-p.stopChan: + p.iterator.Close() + break MAIN_GETDATA_LOOP + } + } +} - for !stop && p.iterator.Next(&doc) && p.iterator.Err() == nil { +func (p *Profile) getDocs() { + var doc proto.SystemProfile + + for p.iterator.Next(&doc) || p.iterator.Timeout() { + if p.iterator.Timeout() { + continue + } valid := true for _, filter := range p.filters { if filter(doc) == false { @@ -120,58 +146,49 @@ func (p *Profile) getData() { if !valid { continue } + if len(doc.Query) > 0 { - select { - case <-p.ticker: - p.statsChan <- statsToArray(stats) - case <-p.stopChan: - stop = true - continue - default: - if len(doc.Query) > 0 { - - fp, err := p.fingerprinter.Fingerprint(doc.Query) - if err != nil { - log.Errorf("cannot get fingerprint: %s", err.Error()) - continue - } - var s *Stat - var ok bool - key := StatsGroupKey{ + fp, err := p.fingerprinter.Fingerprint(doc.Query) + if err != nil { + log.Errorf("cannot get fingerprint: %s", err.Error()) + continue + } + var s *Stat + var ok bool + key := StatsGroupKey{ + Operation: doc.Op, + Fingerprint: fp, + Namespace: doc.Ns, + } + if s, ok = p.rawStats[key]; !ok { + realQuery, _ := util.GetQueryField(doc.Query) + s = &Stat{ + ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))), Operation: doc.Op, Fingerprint: fp, Namespace: doc.Ns, + TableScan: false, + Query: realQuery, } - if s, ok = stats[key]; !ok { - realQuery, _ := util.GetQueryField(doc.Query) - s = &Stat{ - ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))), - Operation: doc.Op, - Fingerprint: fp, - Namespace: doc.Ns, - TableScan: false, - Query: realQuery, - } - stats[key] = s - } - s.Count++ - s.NScanned = append(s.NScanned, float64(doc.DocsExamined)) - s.NReturned = append(s.NReturned, float64(doc.Nreturned)) - s.QueryTime = append(s.QueryTime, float64(doc.Millis)) - s.ResponseLength = append(s.ResponseLength, float64(doc.ResponseLength)) - var zeroTime time.Time - if s.FirstSeen == zeroTime || s.FirstSeen.After(doc.Ts) { - s.FirstSeen = doc.Ts - } - if s.LastSeen == zeroTime || s.LastSeen.Before(doc.Ts) { - s.LastSeen = doc.Ts - } + p.rawStats[key] = s + } + s.Count++ + s.NScanned = append(s.NScanned, float64(doc.DocsExamined)) + s.NReturned = append(s.NReturned, float64(doc.Nreturned)) + s.QueryTime = append(s.QueryTime, float64(doc.Millis)) + s.ResponseLength = append(s.ResponseLength, float64(doc.ResponseLength)) + var zeroTime time.Time + if s.FirstSeen == zeroTime || s.FirstSeen.After(doc.Ts) { + s.FirstSeen = doc.Ts + } + if s.LastSeen == zeroTime || s.LastSeen.Before(doc.Ts) { + s.LastSeen = doc.Ts } } } - - p.statsChan <- statsToArray(stats) + p.statsChan <- statsToArray(p.rawStats) p.running = false + p.stopChan <- true } func statsToArray(stats map[StatsGroupKey]*Stat) []Stat {