mirror of
https://github.com/percona/percona-toolkit.git
synced 2025-09-16 16:23:30 +00:00
PMM-1057: let's try to bypass limitations of go/bson
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/util"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -68,7 +69,7 @@ func (f *Fingerprint) Fingerprint(doc proto.SystemProfile) (string, error) {
|
||||
// if there is a sort clause in the query, we have to add all fields in the sort
|
||||
// fields list that are not in the query keys list (retKeys)
|
||||
if sortKeys, ok := query.Map()["sort"]; ok {
|
||||
if sortKeysMap, ok := sortKeys.(map[string]interface{}); ok {
|
||||
if sortKeysMap, ok := sortKeys.(bson.M); ok {
|
||||
sortKeys := keys(sortKeysMap, f.keyFilters)
|
||||
retKeys = append(retKeys, sortKeys...)
|
||||
}
|
||||
@@ -102,20 +103,20 @@ func (f *Fingerprint) Fingerprint(doc proto.SystemProfile) (string, error) {
|
||||
break
|
||||
}
|
||||
// first key is operation type
|
||||
op = query.D[0].Name
|
||||
collection, _ = query.D[0].Value.(string)
|
||||
op = query[0].Name
|
||||
collection, _ = query[0].Value.(string)
|
||||
switch op {
|
||||
case "group":
|
||||
retKeys = []string{}
|
||||
if g, ok := query.Map()["group"]; ok {
|
||||
if m, ok := g.(map[string]interface{}); ok {
|
||||
if m, ok := g.(bson.M); ok {
|
||||
if f, ok := m["key"]; ok {
|
||||
if keysMap, ok := f.(map[string]interface{}); ok {
|
||||
if keysMap, ok := f.(bson.M); ok {
|
||||
retKeys = append(retKeys, keys(keysMap, []string{})...)
|
||||
}
|
||||
}
|
||||
if f, ok := m["cond"]; ok {
|
||||
if keysMap, ok := f.(map[string]interface{}); ok {
|
||||
if keysMap, ok := f.(bson.M); ok {
|
||||
retKeys = append(retKeys, keys(keysMap, []string{})...)
|
||||
}
|
||||
}
|
||||
@@ -167,18 +168,19 @@ func keys(query interface{}, keyFilters []string) []string {
|
||||
|
||||
func getKeys(query interface{}, keyFilters []string, level int) []string {
|
||||
ks := []string{}
|
||||
var q []interface{}
|
||||
var q []bson.M
|
||||
switch v := query.(type) {
|
||||
case map[string]interface{}:
|
||||
case bson.M:
|
||||
q = append(q, v)
|
||||
case []interface{}:
|
||||
case []bson.M:
|
||||
q = v
|
||||
default:
|
||||
return ks
|
||||
}
|
||||
|
||||
if level <= MAX_DEPTH_LEVEL {
|
||||
for i := range q {
|
||||
if query, ok := q[i].(map[string]interface{}); ok {
|
||||
for key, value := range query {
|
||||
for key, value := range q[i] {
|
||||
if shouldSkipKey(key, keyFilters) {
|
||||
continue
|
||||
}
|
||||
@@ -188,9 +190,6 @@ func getKeys(query interface{}, keyFilters []string, level int) []string {
|
||||
|
||||
ks = append(ks, getKeys(value, keyFilters, level)...)
|
||||
}
|
||||
} else {
|
||||
ks = append(ks, getKeys(q[i], keyFilters, level)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ks
|
||||
|
@@ -50,15 +50,15 @@ func ExampleFingerprint() {
|
||||
|
||||
func TestFingerprint(t *testing.T) {
|
||||
doc := proto.SystemProfile{}
|
||||
doc.Query = proto.BsonD{bson.D{
|
||||
doc.Query = proto.BsonD{
|
||||
{"find", "feedback"},
|
||||
{"filter", map[string]interface{}{
|
||||
{"filter", bson.M{
|
||||
"tool": "Atlas",
|
||||
"potId": "2c9180865ae33e85015af1cc29243dc5",
|
||||
}},
|
||||
{"limit", 1},
|
||||
{"singleBatch", true},
|
||||
}}
|
||||
}
|
||||
want := "FIND feedback potId,tool"
|
||||
|
||||
fp := NewFingerprinter(nil)
|
||||
|
@@ -73,7 +73,7 @@ func TestRegularIterator(t *testing.T) {
|
||||
ID: "16196765fb4c14edb91efdbe4f5c5973",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: "{\"find\":\"col1\",\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
Query: "{\n \"find\": \"col1\",\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
Fingerprint: "FIND col1 find",
|
||||
FirstSeen: firstSeen,
|
||||
LastSeen: lastSeen,
|
||||
@@ -130,7 +130,7 @@ func TestIteratorTimeout(t *testing.T) {
|
||||
ID: "16196765fb4c14edb91efdbe4f5c5973",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: "{\"find\":\"col1\",\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
Query: "{\n \"find\": \"col1\",\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
Fingerprint: "FIND col1 find",
|
||||
FirstSeen: firstSeen,
|
||||
LastSeen: lastSeen,
|
||||
@@ -211,7 +211,7 @@ func TestTailIterator(t *testing.T) {
|
||||
ID: "16196765fb4c14edb91efdbe4f5c5973",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: "{\"find\":\"col1\",\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
Query: "{\n \"find\": \"col1\",\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
Fingerprint: "FIND col1 find",
|
||||
FirstSeen: parseDate("2017-04-01T23:01:20.214+00:00"),
|
||||
LastSeen: parseDate("2017-04-01T23:01:20.214+00:00"),
|
||||
@@ -226,7 +226,7 @@ func TestTailIterator(t *testing.T) {
|
||||
ID: "16196765fb4c14edb91efdbe4f5c5973",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: "{\"find\":\"col1\",\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
Query: "{\n \"find\": \"col1\",\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
Fingerprint: "FIND col1 find",
|
||||
FirstSeen: parseDate("2017-04-01T23:01:19.914+00:00"),
|
||||
LastSeen: parseDate("2017-04-01T23:01:19.914+00:00"),
|
||||
@@ -258,13 +258,13 @@ func TestCalcStats(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
|
||||
docs := []proto.SystemProfile{}
|
||||
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
err := tutil.LoadBson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
want := []stats.QueryStats{}
|
||||
err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.want.json", &want)
|
||||
err = tutil.LoadBson(vars.RootPath+samples+"profiler_docs_stats.want.json", &want)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load expected results: %s", err.Error())
|
||||
}
|
||||
@@ -307,13 +307,13 @@ func TestCalcTotalStats(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
|
||||
docs := []proto.SystemProfile{}
|
||||
err := tutil.LoadJson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
err := tutil.LoadBson(vars.RootPath+samples+"profiler_docs_stats.json", &docs)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load samples: %s", err.Error())
|
||||
}
|
||||
|
||||
want := stats.QueryStats{}
|
||||
err = tutil.LoadJson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", &want)
|
||||
err = tutil.LoadBson(vars.RootPath+samples+"profiler_docs_total_stats.want.json", &want)
|
||||
if err != nil && !tutil.ShouldUpdateSamples() {
|
||||
t.Fatalf("cannot load expected results: %s", err.Error())
|
||||
}
|
||||
|
@@ -8,9 +8,7 @@ import (
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type BsonD struct {
|
||||
bson.D
|
||||
}
|
||||
type BsonD bson.D
|
||||
|
||||
func (d *BsonD) UnmarshalJSON(data []byte) error {
|
||||
dec := json.NewDecoder(bytes.NewReader(data))
|
||||
@@ -45,19 +43,33 @@ func (d *BsonD) UnmarshalJSON(data []byte) error {
|
||||
return fmt.Errorf("missing value for key %s", key)
|
||||
}
|
||||
|
||||
v := BsonD{}
|
||||
r := dec.Buffered()
|
||||
ndec := json.NewDecoder(r)
|
||||
err = ndec.Decode(&v)
|
||||
var raw json.RawMessage
|
||||
err = dec.Decode(&raw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var v BsonD
|
||||
err = bson.UnmarshalJSON(raw, &v)
|
||||
if err != nil {
|
||||
var v []BsonD
|
||||
err = bson.UnmarshalJSON(raw, &v)
|
||||
if err != nil {
|
||||
var v interface{}
|
||||
dec.Decode(&v)
|
||||
err = bson.UnmarshalJSON(raw, &v)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
de.Value = v
|
||||
}
|
||||
} else {
|
||||
de.Value = v
|
||||
}
|
||||
} else {
|
||||
de.Value = v
|
||||
}
|
||||
|
||||
d.D = append(d.D, de)
|
||||
*d = append(*d, de)
|
||||
if !dec.More() {
|
||||
break
|
||||
}
|
||||
@@ -74,12 +86,12 @@ func (d *BsonD) UnmarshalJSON(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *BsonD) MarshalJSON() ([]byte, error) {
|
||||
func (d BsonD) MarshalJSON() ([]byte, error) {
|
||||
var b bytes.Buffer
|
||||
|
||||
b.WriteByte('{')
|
||||
|
||||
for i, v := range d.D {
|
||||
for i, v := range d {
|
||||
if i > 0 {
|
||||
b.WriteByte(',')
|
||||
}
|
||||
@@ -106,5 +118,35 @@ func (d *BsonD) MarshalJSON() ([]byte, error) {
|
||||
}
|
||||
|
||||
func (d BsonD) Len() int {
|
||||
return len(d.D)
|
||||
return len(d)
|
||||
}
|
||||
|
||||
// Map returns a map out of the ordered element name/value pairs in d.
|
||||
func (d BsonD) Map() (m bson.M) {
|
||||
m = make(bson.M, len(d))
|
||||
for _, item := range d {
|
||||
switch v := item.Value.(type) {
|
||||
case BsonD:
|
||||
m[item.Name] = v.Map()
|
||||
case []BsonD:
|
||||
el := []bson.M{}
|
||||
for i := range v {
|
||||
el = append(el, v[i].Map())
|
||||
}
|
||||
m[item.Name] = el
|
||||
case []interface{}:
|
||||
// mgo/bson doesn't expose UnmarshalBSON interface
|
||||
// so we can't create custom bson.Unmarshal()
|
||||
el := []bson.M{}
|
||||
for i := range v {
|
||||
if b, ok := v[i].(BsonD); ok {
|
||||
el = append(el, b.Map())
|
||||
}
|
||||
}
|
||||
m[item.Name] = el
|
||||
default:
|
||||
m[item.Name] = item.Value
|
||||
}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
@@ -6,11 +6,11 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/montanaflynn/stats"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter"
|
||||
"github.com/percona/percona-toolkit/src/go/mongolib/proto"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type StatsError struct {
|
||||
@@ -82,7 +82,7 @@ func (s *Stats) Add(doc proto.SystemProfile) error {
|
||||
if err != nil {
|
||||
return &StatsGetQueryFieldError{err}
|
||||
}
|
||||
queryBson, err := bson.MarshalJSON(&query)
|
||||
queryBson, err := json.MarshalIndent(query, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -142,7 +142,7 @@ func TestStats(t *testing.T) {
|
||||
ID: "4e4774ad26f934a193757002a991ebb8",
|
||||
Namespace: "samples.col1",
|
||||
Operation: "query",
|
||||
Query: "{\"find\":\"col1\",\"filter\":{\"s2\":{\"$gte\":\"41991\",\"$lt\":\"33754\"}},\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
Query: "{\n \"find\": \"col1\",\n \"filter\": {\n \"s2\": {\n \"$gte\": \"41991\",\n \"$lt\": \"33754\"\n }\n },\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
Fingerprint: "FIND col1 s2",
|
||||
FirstSeen: parseDate("2017-04-10T13:15:53.532-03:00"),
|
||||
LastSeen: parseDate("2017-04-10T13:15:53.532-03:00"),
|
||||
|
@@ -237,7 +237,7 @@ func GetServerStatus(dialer pmgo.Dialer, di *pmgo.DialInfo, hostname string) (pr
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
func GetQueryField(doc proto.SystemProfile) (map[string]interface{}, error) {
|
||||
func GetQueryField(doc proto.SystemProfile) (bson.M, error) {
|
||||
// Proper way to detect if protocol used is "op_msg" or "op_command"
|
||||
// would be to look at "doc.Protocol" field,
|
||||
// however MongoDB 3.0 doesn't have that field
|
||||
@@ -248,7 +248,7 @@ func GetQueryField(doc proto.SystemProfile) (map[string]interface{}, error) {
|
||||
if doc.Op == "update" || doc.Op == "remove" {
|
||||
if squery, ok := query.Map()["q"]; ok {
|
||||
// just an extra check to ensure this type assertion won't fail
|
||||
if ssquery, ok := squery.(map[string]interface{}); ok {
|
||||
if ssquery, ok := squery.(bson.M); ok {
|
||||
return ssquery, nil
|
||||
}
|
||||
return nil, CANNOT_GET_QUERY_ERROR
|
||||
@@ -318,7 +318,7 @@ func GetQueryField(doc proto.SystemProfile) (map[string]interface{}, error) {
|
||||
//
|
||||
if squery, ok := query.Map()["query"]; ok {
|
||||
// just an extra check to ensure this type assertion won't fail
|
||||
if ssquery, ok := squery.(map[string]interface{}); ok {
|
||||
if ssquery, ok := squery.(bson.M); ok {
|
||||
return ssquery, nil
|
||||
}
|
||||
return nil, CANNOT_GET_QUERY_ERROR
|
||||
@@ -326,7 +326,7 @@ func GetQueryField(doc proto.SystemProfile) (map[string]interface{}, error) {
|
||||
|
||||
// "query" in MongoDB 3.2+ is better structured and always has a "filter" subkey:
|
||||
if squery, ok := query.Map()["filter"]; ok {
|
||||
if ssquery, ok := squery.(map[string]interface{}); ok {
|
||||
if ssquery, ok := squery.(bson.M); ok {
|
||||
return ssquery, nil
|
||||
}
|
||||
return nil, CANNOT_GET_QUERY_ERROR
|
||||
|
@@ -1,4 +1,10 @@
|
||||
var coll = db.coll
|
||||
|
||||
coll.createIndex({a: 1})
|
||||
coll.find({a: 1})
|
||||
|
||||
var i;
|
||||
for (i = 0; i < 10; ++i) {
|
||||
coll.insert({a: i % 5});
|
||||
}
|
||||
|
||||
coll.find({a: 1}).pretty()
|
||||
|
@@ -3,7 +3,7 @@
|
||||
"ID": "16196765fb4c14edb91efdbe4f5c5973",
|
||||
"Namespace": "samples.col1",
|
||||
"Operation": "query",
|
||||
"Query": "{\"find\":\"col1\",\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
"Query": "{\n \"find\": \"col1\",\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
"Fingerprint": "FIND col1 find",
|
||||
"FirstSeen": "2017-04-10T13:16:23.29-03:00",
|
||||
"LastSeen": "2017-04-10T13:16:23.29-03:00",
|
||||
@@ -56,7 +56,7 @@
|
||||
"ID": "4e4774ad26f934a193757002a991ebb8",
|
||||
"Namespace": "samples.col1",
|
||||
"Operation": "query",
|
||||
"Query": "{\"find\":\"col1\",\"filter\":{\"s2\":{\"$gte\":\"41991\",\"$lt\":\"33754\"}},\"shardVersion\":[0,\"000000000000000000000000\"]}\n",
|
||||
"Query": "{\n \"find\": \"col1\",\n \"filter\": {\n \"s2\": {\n \"$gte\": \"41991\",\n \"$lt\": \"33754\"\n }\n },\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ]\n}",
|
||||
"Fingerprint": "FIND col1 s2",
|
||||
"FirstSeen": "2017-04-10T13:15:53.532-03:00",
|
||||
"LastSeen": "2017-04-10T13:15:53.532-03:00",
|
||||
@@ -109,7 +109,7 @@
|
||||
"ID": "8cb8666ace7e54767b4d3c4f61860cf9",
|
||||
"Namespace": "samples.col1",
|
||||
"Operation": "query",
|
||||
"Query": "{\"find\":\"col1\",\"filter\":{\"user_id\":{\"$gte\":3384024924,\"$lt\":195092007}},\"projection\":{\"$sortKey\":{\"$meta\":\"sortKey\"}},\"shardVersion\":[0,\"000000000000000000000000\"],\"sort\":{\"user_id\":1}}\n",
|
||||
"Query": "{\n \"find\": \"col1\",\n \"filter\": {\n \"user_id\": {\n \"$gte\": 3384024924,\n \"$lt\": 195092007\n }\n },\n \"projection\": {\n \"$sortKey\": {\n \"$meta\": \"sortKey\"\n }\n },\n \"shardVersion\": [\n 0,\n \"000000000000000000000000\"\n ],\n \"sort\": {\n \"user_id\": 1\n }\n}",
|
||||
"Fingerprint": "FIND col1 user_id",
|
||||
"FirstSeen": "2017-04-10T13:15:53.524-03:00",
|
||||
"LastSeen": "2017-04-10T13:15:53.524-03:00",
|
||||
|
Reference in New Issue
Block a user