From dd1acf9ad8c43e6b3b5d980443c63bcf1938fd05 Mon Sep 17 00:00:00 2001 From: Logan Lembke Date: Wed, 3 May 2023 13:15:22 -0600 Subject: [PATCH] Only maintain one cid's worth of max scores in the host collection (#801) * Only maintain one cid's worth of rollup max scores in the host collection * fix comment * remove unnecessary field from aggregation * fix field in sni summarizer --------- Co-authored-by: Logan L --- pkg/beacon/summarizer.go | 10 ++++------ pkg/beaconproxy/summarizer.go | 10 ++++------ pkg/beaconsni/summarizer.go | 10 ++++------ pkg/uconn/summarizer.go | 24 +++++------------------- 4 files changed, 17 insertions(+), 37 deletions(-) diff --git a/pkg/beacon/summarizer.go b/pkg/beacon/summarizer.go index c9385ddc..cac40d6f 100644 --- a/pkg/beacon/summarizer.go +++ b/pkg/beacon/summarizer.go @@ -95,7 +95,7 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection, Score float64 `bson:"score"` } - mbdstQuery := maxBeaconPipeline(datum, chunk) + mbdstQuery := maxBeaconPipeline(datum) err := beaconColl.Pipe(mbdstQuery).One(&maxBeaconIP) if err != nil { return nil, nil, err @@ -104,7 +104,7 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection, hostSelector := datum.BSONKey() hostWithDatEntrySelector := database.MergeBSONMaps( hostSelector, - bson.M{"dat": bson.M{"$elemMatch": maxBeaconIP.Dst.PrefixedBSONKey("mbdst")}}, + bson.M{"dat": bson.M{"$elemMatch": bson.M{"mbdst.ip": bson.M{"$exists": true}}}}, ) nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count() @@ -113,10 +113,9 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection, } if nExistingEntries > 0 { - // just need to update the cid and score if there is an - // an existing record updateQuery := bson.M{ "$set": bson.M{ + "dat.$.mbdst": maxBeaconIP.Dst, "dat.$.max_beacon_score": maxBeaconIP.Score, "dat.$.cid": chunk, }, @@ -139,12 +138,11 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection, return hostSelector, insertQuery, nil } -func maxBeaconPipeline(host data.UniqueIP, chunk int) []bson.M { +func maxBeaconPipeline(host data.UniqueIP) []bson.M { return []bson.M{ {"$match": bson.M{ "src": host.IP, "src_network_uuid": host.NetworkUUID, - "cid": chunk, }}, // drop unnecessary data {"$project": bson.M{ diff --git a/pkg/beaconproxy/summarizer.go b/pkg/beaconproxy/summarizer.go index d4ea906d..678ed74d 100644 --- a/pkg/beaconproxy/summarizer.go +++ b/pkg/beaconproxy/summarizer.go @@ -97,7 +97,7 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co Score float64 `bson:"score"` } - mbdstQuery := maxProxyBeaconPipeline(datum, chunk) + mbdstQuery := maxProxyBeaconPipeline(datum) err := beaconProxyColl.Pipe(mbdstQuery).One(&maxBeaconProxy) if err != nil { return nil, nil, err @@ -106,7 +106,7 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co hostSelector := datum.BSONKey() hostWithDatEntrySelector := database.MergeBSONMaps( hostSelector, - bson.M{"dat.mbproxy": maxBeaconProxy.Fqdn}, + bson.M{"dat": bson.M{"$elemMatch": bson.M{"mbproxy": bson.M{"$exists": true}}}}, ) nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count() @@ -115,10 +115,9 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co } if nExistingEntries > 0 { - // just need to update the cid and score if there is an - // an existing record updateQuery := bson.M{ "$set": bson.M{ + "dat.$.mbproxy": maxBeaconProxy.Fqdn, "dat.$.max_beacon_proxy_score": maxBeaconProxy.Score, "dat.$.cid": chunk, }, @@ -141,12 +140,11 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co return hostSelector, insertQuery, nil } -func maxProxyBeaconPipeline(host data.UniqueIP, chunk int) []bson.M { +func maxProxyBeaconPipeline(host data.UniqueIP) []bson.M { return []bson.M{ {"$match": bson.M{ "src": host.IP, "src_network_uuid": host.NetworkUUID, - "cid": chunk, }}, // drop unnecessary data {"$project": bson.M{ diff --git a/pkg/beaconsni/summarizer.go b/pkg/beaconsni/summarizer.go index 778e4c9b..314389d9 100644 --- a/pkg/beaconsni/summarizer.go +++ b/pkg/beaconsni/summarizer.go @@ -97,7 +97,7 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec Score float64 `bson:"score"` } - mbdstQuery := maxSNIBeaconPipeline(datum, chunk) + mbdstQuery := maxSNIBeaconPipeline(datum) err := beaconSNIColl.Pipe(mbdstQuery).One(&maxBeaconSNI) if err != nil { return nil, nil, err @@ -106,7 +106,7 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec hostSelector := datum.BSONKey() hostWithDatEntrySelector := database.MergeBSONMaps( hostSelector, - bson.M{"dat.mbsni": maxBeaconSNI.Fqdn}, + bson.M{"dat": bson.M{"$elemMatch": bson.M{"mbsni": bson.M{"$exists": true}}}}, ) nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count() @@ -115,10 +115,9 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec } if nExistingEntries > 0 { - // just need to update the cid and score if there is an - // an existing record updateQuery := bson.M{ "$set": bson.M{ + "dat.$.mbsni": maxBeaconSNI.Fqdn, "dat.$.max_beacon_sni_score": maxBeaconSNI.Score, "dat.$.cid": chunk, }, @@ -140,12 +139,11 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec return hostSelector, insertQuery, nil } -func maxSNIBeaconPipeline(host data.UniqueIP, chunk int) []bson.M { +func maxSNIBeaconPipeline(host data.UniqueIP) []bson.M { return []bson.M{ {"$match": bson.M{ "src": host.IP, "src_network_uuid": host.NetworkUUID, - "cid": chunk, }}, // drop unnecessary data {"$project": bson.M{ diff --git a/pkg/uconn/summarizer.go b/pkg/uconn/summarizer.go index 6a3656cc..1f0960af 100644 --- a/pkg/uconn/summarizer.go +++ b/pkg/uconn/summarizer.go @@ -104,7 +104,7 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec MaxTotalDur float64 `bson:"tdur"` } - mdipQuery := maxTotalDurationPipeline(datum, chunk) + mdipQuery := maxTotalDurationPipeline(datum) err := uconnColl.Pipe(mdipQuery).One(&maxDurIP) if err != nil { @@ -114,7 +114,7 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec hostSelector := datum.BSONKey() hostWithDatEntrySelector := database.MergeBSONMaps( hostSelector, - bson.M{"dat": bson.M{"$elemMatch": maxDurIP.Peer.PrefixedBSONKey("mdip")}}, + bson.M{"dat": bson.M{"$elemMatch": bson.M{"mdip": bson.M{"$exists": true}}}}, ) nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count() @@ -123,10 +123,9 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec } if nExistingEntries > 0 { - // just need to update the cid and score if there is an - // an existing record updateQuery := bson.M{ "$set": bson.M{ + "dat.$.mdip": maxDurIP.Peer, "dat.$.max_duration": maxDurIP.MaxTotalDur, "dat.$.cid": chunk, }, @@ -152,7 +151,7 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec return database.BulkChange{Selector: hostSelector, Update: insertQuery, Upsert: true}, nil } -func maxTotalDurationPipeline(host data.UniqueIP, chunk int) []bson.M { +func maxTotalDurationPipeline(host data.UniqueIP) []bson.M { return []bson.M{ {"$match": bson.M{ // match the host IP/ network @@ -168,8 +167,6 @@ func maxTotalDurationPipeline(host data.UniqueIP, chunk int) []bson.M { }}, }, }, - // match uconn records which have been updated this chunk - "dat.cid": chunk, }}, // drop unnecessary data {"$project": bson.M{ @@ -196,20 +193,9 @@ func maxTotalDurationPipeline(host data.UniqueIP, chunk int) []bson.M { }, }, }, - "dat.cid": 1, "dat.tdur": 1, }}, - // drop dat records that are not from this chunk - {"$project": bson.M{ - "peer": 1, - "dat": bson.M{"$filter": bson.M{ - "input": "$dat", - "cond": bson.M{ - "$eq": []interface{}{"$$this.cid", chunk}, - }, - }}, - }}, - // for each peer, combine the records that match the current chunk + // for each peer, combine the records {"$project": bson.M{ "peer": 1, "tdur": bson.M{"$sum": "$dat.tdur"},