Skip to content

Commit

Permalink
Only maintain one cid's worth of max scores in the host collection (#801
Browse files Browse the repository at this point in the history
)

* Only maintain one cid's worth of rollup max scores in the host collection

* fix comment

* remove unnecessary field from aggregation

* fix field in sni summarizer

---------

Co-authored-by: Logan L <[email protected]>
  • Loading branch information
Zalgo2462 and Logan L authored May 3, 2023
1 parent 833dca8 commit dd1acf9
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 37 deletions.
10 changes: 4 additions & 6 deletions pkg/beacon/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection,
Score float64 `bson:"score"`
}

mbdstQuery := maxBeaconPipeline(datum, chunk)
mbdstQuery := maxBeaconPipeline(datum)
err := beaconColl.Pipe(mbdstQuery).One(&maxBeaconIP)
if err != nil {
return nil, nil, err
Expand All @@ -104,7 +104,7 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection,
hostSelector := datum.BSONKey()
hostWithDatEntrySelector := database.MergeBSONMaps(
hostSelector,
bson.M{"dat": bson.M{"$elemMatch": maxBeaconIP.Dst.PrefixedBSONKey("mbdst")}},
bson.M{"dat": bson.M{"$elemMatch": bson.M{"mbdst.ip": bson.M{"$exists": true}}}},
)

nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count()
Expand All @@ -113,10 +113,9 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection,
}

if nExistingEntries > 0 {
// just need to update the cid and score if there is an
// an existing record
updateQuery := bson.M{
"$set": bson.M{
"dat.$.mbdst": maxBeaconIP.Dst,
"dat.$.max_beacon_score": maxBeaconIP.Score,
"dat.$.cid": chunk,
},
Expand All @@ -139,12 +138,11 @@ func maxBeaconUpdate(datum data.UniqueIP, beaconColl, hostColl *mgo.Collection,
return hostSelector, insertQuery, nil
}

func maxBeaconPipeline(host data.UniqueIP, chunk int) []bson.M {
func maxBeaconPipeline(host data.UniqueIP) []bson.M {
return []bson.M{
{"$match": bson.M{
"src": host.IP,
"src_network_uuid": host.NetworkUUID,
"cid": chunk,
}},
// drop unnecessary data
{"$project": bson.M{
Expand Down
10 changes: 4 additions & 6 deletions pkg/beaconproxy/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co
Score float64 `bson:"score"`
}

mbdstQuery := maxProxyBeaconPipeline(datum, chunk)
mbdstQuery := maxProxyBeaconPipeline(datum)
err := beaconProxyColl.Pipe(mbdstQuery).One(&maxBeaconProxy)
if err != nil {
return nil, nil, err
Expand All @@ -106,7 +106,7 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co
hostSelector := datum.BSONKey()
hostWithDatEntrySelector := database.MergeBSONMaps(
hostSelector,
bson.M{"dat.mbproxy": maxBeaconProxy.Fqdn},
bson.M{"dat": bson.M{"$elemMatch": bson.M{"mbproxy": bson.M{"$exists": true}}}},
)

nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count()
Expand All @@ -115,10 +115,9 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co
}

if nExistingEntries > 0 {
// just need to update the cid and score if there is an
// an existing record
updateQuery := bson.M{
"$set": bson.M{
"dat.$.mbproxy": maxBeaconProxy.Fqdn,
"dat.$.max_beacon_proxy_score": maxBeaconProxy.Score,
"dat.$.cid": chunk,
},
Expand All @@ -141,12 +140,11 @@ func maxProxyBeaconUpdate(datum data.UniqueIP, beaconProxyColl, hostColl *mgo.Co
return hostSelector, insertQuery, nil
}

func maxProxyBeaconPipeline(host data.UniqueIP, chunk int) []bson.M {
func maxProxyBeaconPipeline(host data.UniqueIP) []bson.M {
return []bson.M{
{"$match": bson.M{
"src": host.IP,
"src_network_uuid": host.NetworkUUID,
"cid": chunk,
}},
// drop unnecessary data
{"$project": bson.M{
Expand Down
10 changes: 4 additions & 6 deletions pkg/beaconsni/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec
Score float64 `bson:"score"`
}

mbdstQuery := maxSNIBeaconPipeline(datum, chunk)
mbdstQuery := maxSNIBeaconPipeline(datum)
err := beaconSNIColl.Pipe(mbdstQuery).One(&maxBeaconSNI)
if err != nil {
return nil, nil, err
Expand All @@ -106,7 +106,7 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec
hostSelector := datum.BSONKey()
hostWithDatEntrySelector := database.MergeBSONMaps(
hostSelector,
bson.M{"dat.mbsni": maxBeaconSNI.Fqdn},
bson.M{"dat": bson.M{"$elemMatch": bson.M{"mbsni": bson.M{"$exists": true}}}},
)

nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count()
Expand All @@ -115,10 +115,9 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec
}

if nExistingEntries > 0 {
// just need to update the cid and score if there is an
// an existing record
updateQuery := bson.M{
"$set": bson.M{
"dat.$.mbsni": maxBeaconSNI.Fqdn,
"dat.$.max_beacon_sni_score": maxBeaconSNI.Score,
"dat.$.cid": chunk,
},
Expand All @@ -140,12 +139,11 @@ func maxSNIBeaconUpdate(datum data.UniqueIP, beaconSNIColl, hostColl *mgo.Collec
return hostSelector, insertQuery, nil
}

func maxSNIBeaconPipeline(host data.UniqueIP, chunk int) []bson.M {
func maxSNIBeaconPipeline(host data.UniqueIP) []bson.M {
return []bson.M{
{"$match": bson.M{
"src": host.IP,
"src_network_uuid": host.NetworkUUID,
"cid": chunk,
}},
// drop unnecessary data
{"$project": bson.M{
Expand Down
24 changes: 5 additions & 19 deletions pkg/uconn/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec
MaxTotalDur float64 `bson:"tdur"`
}

mdipQuery := maxTotalDurationPipeline(datum, chunk)
mdipQuery := maxTotalDurationPipeline(datum)

err := uconnColl.Pipe(mdipQuery).One(&maxDurIP)
if err != nil {
Expand All @@ -114,7 +114,7 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec
hostSelector := datum.BSONKey()
hostWithDatEntrySelector := database.MergeBSONMaps(
hostSelector,
bson.M{"dat": bson.M{"$elemMatch": maxDurIP.Peer.PrefixedBSONKey("mdip")}},
bson.M{"dat": bson.M{"$elemMatch": bson.M{"mdip": bson.M{"$exists": true}}}},
)

nExistingEntries, err := hostColl.Find(hostWithDatEntrySelector).Count()
Expand All @@ -123,10 +123,9 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec
}

if nExistingEntries > 0 {
// just need to update the cid and score if there is an
// an existing record
updateQuery := bson.M{
"$set": bson.M{
"dat.$.mdip": maxDurIP.Peer,
"dat.$.max_duration": maxDurIP.MaxTotalDur,
"dat.$.cid": chunk,
},
Expand All @@ -152,7 +151,7 @@ func maxTotalDurationUpdate(datum data.UniqueIP, uconnColl, hostColl *mgo.Collec
return database.BulkChange{Selector: hostSelector, Update: insertQuery, Upsert: true}, nil
}

func maxTotalDurationPipeline(host data.UniqueIP, chunk int) []bson.M {
func maxTotalDurationPipeline(host data.UniqueIP) []bson.M {
return []bson.M{
{"$match": bson.M{
// match the host IP/ network
Expand All @@ -168,8 +167,6 @@ func maxTotalDurationPipeline(host data.UniqueIP, chunk int) []bson.M {
}},
},
},
// match uconn records which have been updated this chunk
"dat.cid": chunk,
}},
// drop unnecessary data
{"$project": bson.M{
Expand All @@ -196,20 +193,9 @@ func maxTotalDurationPipeline(host data.UniqueIP, chunk int) []bson.M {
},
},
},
"dat.cid": 1,
"dat.tdur": 1,
}},
// drop dat records that are not from this chunk
{"$project": bson.M{
"peer": 1,
"dat": bson.M{"$filter": bson.M{
"input": "$dat",
"cond": bson.M{
"$eq": []interface{}{"$$this.cid", chunk},
},
}},
}},
// for each peer, combine the records that match the current chunk
// for each peer, combine the records
{"$project": bson.M{
"peer": 1,
"tdur": bson.M{"$sum": "$dat.tdur"},
Expand Down

0 comments on commit dd1acf9

Please sign in to comment.