This commit is contained in:
Carlos Salguero
2017-04-04 15:13:31 -03:00
parent 1a8e3a0b76
commit 25efaef471

View File

@@ -15,6 +15,7 @@ import (
var (
MAX_DEPTH_LEVEL = 10
DOCS_BUFFER_SIZE = 100
CANNOT_GET_QUERY_ERROR = errors.New("cannot get query field from the profile document (it is not a map)")
)
@@ -58,6 +59,7 @@ type Iter interface {
}
type Profiler interface {
GetLastError() error
StatsChan() chan []Stat
Start()
Stop()
@@ -69,10 +71,12 @@ type Profile struct {
ticker chan time.Time
statsChan chan []Stat
stopChan chan bool
stats []Stat
docsChan chan proto.SystemProfile
rawStats map[StatsGroupKey]*Stat
keyFilters []string
fingerprinter fingerprinter.Fingerprinter
running bool
lastError error
}
func NewProfiler(iterator Iter, filters []filter.Filter, ticker chan time.Time, fp fingerprinter.Fingerprinter) Profiler {
@@ -82,11 +86,16 @@ func NewProfiler(iterator Iter, filters []filter.Filter, ticker chan time.Time,
iterator: iterator,
ticker: ticker,
statsChan: make(chan []Stat),
stats: make([]Stat, 100),
docsChan: make(chan proto.SystemProfile, DOCS_BUFFER_SIZE),
rawStats: make(map[StatsGroupKey]*Stat),
keyFilters: []string{"^shardVersion$", "^\\$"},
}
}
func (p *Profile) GetLastError() error {
return p.lastError
}
func (p *Profile) StatsChan() chan []Stat {
return p.statsChan
}
@@ -100,16 +109,33 @@ func (p *Profile) Start() {
func (p *Profile) Stop() {
if p.running {
p.stopChan <- true
p.iterator.Close()
close(p.stopChan)
}
}
func (p *Profile) getData() {
var doc proto.SystemProfile
stop := false
stats := make(map[StatsGroupKey]*Stat)
go p.getDocs()
MAIN_GETDATA_LOOP:
for {
select {
case <-p.ticker:
p.statsChan <- statsToArray(p.rawStats)
p.rawStats = make(map[StatsGroupKey]*Stat) // Reset stats
case <-p.stopChan:
p.iterator.Close()
break MAIN_GETDATA_LOOP
}
}
}
for !stop && p.iterator.Next(&doc) && p.iterator.Err() == nil {
func (p *Profile) getDocs() {
var doc proto.SystemProfile
for p.iterator.Next(&doc) || p.iterator.Timeout() {
if p.iterator.Timeout() {
continue
}
valid := true
for _, filter := range p.filters {
if filter(doc) == false {
@@ -120,14 +146,6 @@ func (p *Profile) getData() {
if !valid {
continue
}
select {
case <-p.ticker:
p.statsChan <- statsToArray(stats)
case <-p.stopChan:
stop = true
continue
default:
if len(doc.Query) > 0 {
fp, err := p.fingerprinter.Fingerprint(doc.Query)
@@ -142,7 +160,7 @@ func (p *Profile) getData() {
Fingerprint: fp,
Namespace: doc.Ns,
}
if s, ok = stats[key]; !ok {
if s, ok = p.rawStats[key]; !ok {
realQuery, _ := util.GetQueryField(doc.Query)
s = &Stat{
ID: fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s", key)))),
@@ -152,7 +170,7 @@ func (p *Profile) getData() {
TableScan: false,
Query: realQuery,
}
stats[key] = s
p.rawStats[key] = s
}
s.Count++
s.NScanned = append(s.NScanned, float64(doc.DocsExamined))
@@ -168,10 +186,9 @@ func (p *Profile) getData() {
}
}
}
}
p.statsChan <- statsToArray(stats)
p.statsChan <- statsToArray(p.rawStats)
p.running = false
p.stopChan <- true
}
func statsToArray(stats map[StatsGroupKey]*Stat) []Stat {