From ace7194de2d1763eb81a1c3c4a33f980df4ed3cf Mon Sep 17 00:00:00 2001 From: Paul Prechtel Date: Sun, 10 Nov 2024 13:05:18 +0100 Subject: [PATCH] Use the packet trace timestamps in netflow records Instead of the current time. Equally, copy the incoming stream (on the in channel) timestamps. The aggregate segment will take the first timestamp as flow start and the last timestamp as flow end from all the connected sub-flows for the final flow (at least when the packet trace/flow data is sorted by time). --- segments/filter/aggregate/flowcache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/segments/filter/aggregate/flowcache.go b/segments/filter/aggregate/flowcache.go index 2b88aa5..4ca2720 100644 --- a/segments/filter/aggregate/flowcache.go +++ b/segments/filter/aggregate/flowcache.go @@ -265,10 +265,10 @@ func (f *FlowExporter) Insert(pkt gopacket.Packet) { f.mutex.Lock() if record, exists = f.cache[key]; !exists { f.cache[key] = new(FlowRecord) - f.cache[key].TimeReceived = time.Now() + f.cache[key].TimeReceived = pkt.Metadata().Timestamp record = f.cache[key] } - record.LastUpdated = time.Now() + record.LastUpdated = pkt.Metadata().Timestamp record.SamplerAddress = f.samplerAddress record.Packets = append(record.Packets, pkt) @@ -291,10 +291,10 @@ func (f *FlowExporter) InsertFlow(flow *pb.EnrichedFlow) { f.mutex.Lock() if record, exists = f.cache[key]; !exists { f.cache[key] = new(FlowRecord) - f.cache[key].TimeReceived = time.Now() + f.cache[key].TimeReceived = time.Unix(int64(flow.TimeFlowStart), 0) record = f.cache[key] } - record.LastUpdated = time.Now() + record.LastUpdated = time.Unix(int64(flow.TimeFlowEnd), 0) record.SamplerAddress = f.samplerAddress record.Flows = append(record.Flows, flow) }