Skip to content

Commit

Permalink
decoders: replace binary.Read with a version without reflection and a…
Browse files Browse the repository at this point in the history
…llocations (#141)

Instead of allocating small slices, we rely on the fact that most call
sites are providing a `bytes.Buffer` and use the `Next()` method. For
sFlow decoding, in my case, we get a 33% speedup.

A `bytes.Reader` would even be more efficient, but unfortunately, they
don't have a `Next()` method.

While Go should be smart enough to make the allocation of `bs` on the
stack, it does not, even when `io.ReadFull()` is inlines.

```
decoders/utils/utils.go:23:13: make([]byte, n) escapes to heap
```
  • Loading branch information
vincentbernat authored Aug 19, 2023
1 parent 7c52d81 commit 66b47ee
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 32 deletions.
4 changes: 2 additions & 2 deletions decoders/netflow/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func DecodeMessageContext(ctx context.Context, payload *bytes.Buffer, templateKe

var version uint16
var obsDomainId uint32
if err := binary.Read(payload, binary.BigEndian, &version); err != nil {
if err := utils.BinaryRead(payload, binary.BigEndian, &version); err != nil {
return nil, fmt.Errorf("Error decoding version: %v", err)
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func DecodeMessageContext(ctx context.Context, payload *bytes.Buffer, templateKe

for i := 0; ((i < int(size) && version == 9) || version == 10) && payload.Len() > 0; i++ {
fsheader := FlowSetHeader{}
if err := utils.BinaryDecoder(payload, &fsheader); err != nil {
if err := utils.BinaryDecoder(payload, &fsheader.Id, &fsheader.Length); err != nil {
return returnItem, fmt.Errorf("Error decoding FlowSet header: %v", err)
}

Expand Down
23 changes: 22 additions & 1 deletion decoders/netflowlegacy/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,28 @@ func DecodeMessage(payload *bytes.Buffer) (interface{}, error) {
packet.Records = make([]RecordsNetFlowV5, int(packet.Count))
for i := 0; i < int(packet.Count) && payload.Len() >= 48; i++ {
record := RecordsNetFlowV5{}
err := utils.BinaryDecoder(payload, &record)
err := utils.BinaryDecoder(payload,
&record.SrcAddr,
&record.DstAddr,
&record.NextHop,
&record.Input,
&record.Output,
&record.DPkts,
&record.DOctets,
&record.First,
&record.Last,
&record.SrcPort,
&record.DstPort,
&record.Pad1,
&record.TCPFlags,
&record.Proto,
&record.Tos,
&record.SrcAS,
&record.DstAS,
&record.SrcMask,
&record.DstMask,
&record.Pad2,
)
if err != nil {
return packet, err
}
Expand Down
82 changes: 58 additions & 24 deletions decoders/sflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,48 @@ func DecodeCounterRecord(header *RecordHeader, payload *bytes.Buffer) (CounterRe
switch (*header).DataFormat {
case 1:
ifCounters := IfCounters{}
err := utils.BinaryDecoder(payload, &ifCounters)
err := utils.BinaryDecoder(payload,
&ifCounters.IfIndex,
&ifCounters.IfType,
&ifCounters.IfSpeed,
&ifCounters.IfDirection,
&ifCounters.IfStatus,
&ifCounters.IfInOctets,
&ifCounters.IfInUcastPkts,
&ifCounters.IfInMulticastPkts,
&ifCounters.IfInBroadcastPkts,
&ifCounters.IfInDiscards,
&ifCounters.IfInErrors,
&ifCounters.IfInUnknownProtos,
&ifCounters.IfOutOctets,
&ifCounters.IfOutUcastPkts,
&ifCounters.IfOutMulticastPkts,
&ifCounters.IfOutBroadcastPkts,
&ifCounters.IfOutDiscards,
&ifCounters.IfOutErrors,
&ifCounters.IfPromiscuousMode,
)
if err != nil {
return counterRecord, err
}
counterRecord.Data = ifCounters
case 2:
ethernetCounters := EthernetCounters{}
err := utils.BinaryDecoder(payload, &ethernetCounters)
err := utils.BinaryDecoder(payload,
&ethernetCounters.Dot3StatsAlignmentErrors,
&ethernetCounters.Dot3StatsFCSErrors,
&ethernetCounters.Dot3StatsSingleCollisionFrames,
&ethernetCounters.Dot3StatsMultipleCollisionFrames,
&ethernetCounters.Dot3StatsSQETestErrors,
&ethernetCounters.Dot3StatsDeferredTransmissions,
&ethernetCounters.Dot3StatsLateCollisions,
&ethernetCounters.Dot3StatsExcessiveCollisions,
&ethernetCounters.Dot3StatsInternalMacTransmitErrors,
&ethernetCounters.Dot3StatsCarrierSenseErrors,
&ethernetCounters.Dot3StatsFrameTooLongs,
&ethernetCounters.Dot3StatsInternalMacReceiveErrors,
&ethernetCounters.Dot3StatsSymbolErrors,
)
if err != nil {
return counterRecord, err
}
Expand Down Expand Up @@ -117,7 +151,7 @@ func DecodeIP(payload *bytes.Buffer) (uint32, []byte, error) {
return ipVersion, ip, NewErrorIPVersion(ipVersion)
}
if payload.Len() >= len(ip) {
err := utils.BinaryDecoder(payload, &ip)
err := utils.BinaryDecoder(payload, ip)
if err != nil {
return 0, nil, err
}
Expand All @@ -134,14 +168,14 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
switch (*header).DataFormat {
case FORMAT_EXT_SWITCH:
extendedSwitch := ExtendedSwitch{}
err := utils.BinaryDecoder(payload, &extendedSwitch)
err := utils.BinaryDecoder(payload, &extendedSwitch.SrcVlan, &extendedSwitch.SrcPriority, &extendedSwitch.DstVlan, &extendedSwitch.DstPriority)
if err != nil {
return flowRecord, err
}
flowRecord.Data = extendedSwitch
case FORMAT_RAW_PKT:
sampledHeader := SampledHeader{}
err := utils.BinaryDecoder(payload, &(sampledHeader.Protocol), &(sampledHeader.FrameLength), &(sampledHeader.Stripped), &(sampledHeader.OriginalLength))
err := utils.BinaryDecoder(payload, &sampledHeader.Protocol, &sampledHeader.FrameLength, &sampledHeader.Stripped, &sampledHeader.OriginalLength)
if err != nil {
return flowRecord, err
}
Expand All @@ -152,7 +186,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
SrcIP: make([]byte, 4),
DstIP: make([]byte, 4),
}
err := utils.BinaryDecoder(payload, &sampledIPBase)
err := utils.BinaryDecoder(payload, &sampledIPBase.Length, &sampledIPBase.Protocol, sampledIPBase.SrcIP, sampledIPBase.DstIP, &sampledIPBase.SrcPort, &sampledIPBase.DstPort, &sampledIPBase.TcpFlags)
if err != nil {
return flowRecord, err
}
Expand All @@ -169,14 +203,14 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
SrcIP: make([]byte, 16),
DstIP: make([]byte, 16),
}
err := utils.BinaryDecoder(payload, &sampledIPBase)
err := utils.BinaryDecoder(payload, &sampledIPBase.Length, &sampledIPBase.Protocol, sampledIPBase.SrcIP, sampledIPBase.DstIP, &sampledIPBase.SrcPort, &sampledIPBase.DstPort, &sampledIPBase.TcpFlags)
if err != nil {
return flowRecord, err
}
sampledIPv6 := SampledIPv6{
Base: sampledIPBase,
}
err = utils.BinaryDecoder(payload, &(sampledIPv6.Priority))
err = utils.BinaryDecoder(payload, &sampledIPv6.Priority)
if err != nil {
return flowRecord, err
}
Expand All @@ -190,7 +224,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}
extendedRouter.NextHopIPVersion = ipVersion
extendedRouter.NextHop = ip
err = utils.BinaryDecoder(payload, &(extendedRouter.SrcMaskLen), &(extendedRouter.DstMaskLen))
err = utils.BinaryDecoder(payload, &extendedRouter.SrcMaskLen, &extendedRouter.DstMaskLen)
if err != nil {
return flowRecord, err
}
Expand All @@ -203,14 +237,14 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}
extendedGateway.NextHopIPVersion = ipVersion
extendedGateway.NextHop = ip
err = utils.BinaryDecoder(payload, &(extendedGateway.AS), &(extendedGateway.SrcAS), &(extendedGateway.SrcPeerAS),
&(extendedGateway.ASDestinations))
err = utils.BinaryDecoder(payload, &extendedGateway.AS, &extendedGateway.SrcAS, &extendedGateway.SrcPeerAS,
&extendedGateway.ASDestinations)
if err != nil {
return flowRecord, err
}
var asPath []uint32
if extendedGateway.ASDestinations != 0 {
err := utils.BinaryDecoder(payload, &(extendedGateway.ASPathType), &(extendedGateway.ASPathLength))
err := utils.BinaryDecoder(payload, &extendedGateway.ASPathType, &extendedGateway.ASPathLength)
if err != nil {
return flowRecord, err
}
Expand All @@ -227,7 +261,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}
extendedGateway.ASPath = asPath

err = utils.BinaryDecoder(payload, &(extendedGateway.CommunitiesLength))
err = utils.BinaryDecoder(payload, &extendedGateway.CommunitiesLength)
if err != nil {
return flowRecord, err
}
Expand All @@ -241,7 +275,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
return flowRecord, err
}
}
err = utils.BinaryDecoder(payload, &(extendedGateway.LocalPref))
err = utils.BinaryDecoder(payload, &extendedGateway.LocalPref)
if err != nil {
return flowRecord, err
}
Expand All @@ -258,10 +292,10 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}

func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, error) {
format := (*header).Format
format := header.Format
var sample interface{}

err := utils.BinaryDecoder(payload, &((*header).SampleSequenceNumber))
err := utils.BinaryDecoder(payload, &header.SampleSequenceNumber)
if err != nil {
return sample, err
}
Expand All @@ -275,7 +309,7 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
(*header).SourceIdType = sourceId >> 24
(*header).SourceIdValue = sourceId & 0x00ffffff
} else if format == FORMAT_IPV4 || format == FORMAT_IPV6 {
err = utils.BinaryDecoder(payload, &((*header).SourceIdType), &((*header).SourceIdValue))
err = utils.BinaryDecoder(payload, &header.SourceIdType, &header.SourceIdValue)
if err != nil {
return sample, err
}
Expand All @@ -291,8 +325,8 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
flowSample = FlowSample{
Header: *header,
}
err = utils.BinaryDecoder(payload, &(flowSample.SamplingRate), &(flowSample.SamplePool),
&(flowSample.Drops), &(flowSample.Input), &(flowSample.Output), &(flowSample.FlowRecordsCount))
err = utils.BinaryDecoder(payload, &flowSample.SamplingRate, &flowSample.SamplePool,
&flowSample.Drops, &flowSample.Input, &flowSample.Output, &flowSample.FlowRecordsCount)
if err != nil {
return sample, err
}
Expand All @@ -314,9 +348,9 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
expandedFlowSample = ExpandedFlowSample{
Header: *header,
}
err = utils.BinaryDecoder(payload, &(expandedFlowSample.SamplingRate), &(expandedFlowSample.SamplePool),
&(expandedFlowSample.Drops), &(expandedFlowSample.InputIfFormat), &(expandedFlowSample.InputIfValue),
&(expandedFlowSample.OutputIfFormat), &(expandedFlowSample.OutputIfValue), &(expandedFlowSample.FlowRecordsCount))
err = utils.BinaryDecoder(payload, &expandedFlowSample.SamplingRate, &expandedFlowSample.SamplePool,
&expandedFlowSample.Drops, &expandedFlowSample.InputIfFormat, &expandedFlowSample.InputIfValue,
&expandedFlowSample.OutputIfFormat, &expandedFlowSample.OutputIfValue, &expandedFlowSample.FlowRecordsCount)
if err != nil {
return sample, err
}
Expand All @@ -326,7 +360,7 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
}
for i := 0; i < int(recordsCount) && payload.Len() >= 8; i++ {
recordHeader := RecordHeader{}
err = utils.BinaryDecoder(payload, &(recordHeader.DataFormat), &(recordHeader.Length))
err = utils.BinaryDecoder(payload, &recordHeader.DataFormat, &recordHeader.Length)
if err != nil {
return sample, err
}
Expand Down Expand Up @@ -386,7 +420,7 @@ func DecodeMessage(payload *bytes.Buffer) (interface{}, error) {
}

packetV5.AgentIP = ip
err = utils.BinaryDecoder(payload, &(packetV5.SubAgentId), &(packetV5.SequenceNumber), &(packetV5.Uptime), &(packetV5.SamplesCount))
err = utils.BinaryDecoder(payload, &packetV5.SubAgentId, &packetV5.SequenceNumber, &packetV5.Uptime, &packetV5.SamplesCount)
if err != nil {
return packetV5, err
}
Expand Down
116 changes: 114 additions & 2 deletions decoders/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,128 @@
package utils

import (
"bytes"
"encoding/binary"
"errors"
"io"
"reflect"
)

func BinaryDecoder(payload io.Reader, dests ...interface{}) error {
type BytesBuffer interface {
io.Reader
Next(int) []byte
}

func BinaryDecoder(payload *bytes.Buffer, dests ...interface{}) error {
for _, dest := range dests {
err := binary.Read(payload, binary.BigEndian, dest)
err := BinaryRead(payload, binary.BigEndian, dest)
if err != nil {
return err
}
}
return nil
}
func BinaryRead(payload BytesBuffer, order binary.ByteOrder, data any) error {
// Fast path for basic types and slices.
if n := intDataSize(data); n != 0 {
bs := payload.Next(n)
if len(bs) < n {
return io.ErrUnexpectedEOF
}
switch data := data.(type) {
case *bool:
*data = bs[0] != 0
case *int8:
*data = int8(bs[0])
case *uint8:
*data = bs[0]
case *int16:
*data = int16(order.Uint16(bs))
case *uint16:
*data = order.Uint16(bs)
case *int32:
*data = int32(order.Uint32(bs))
case *uint32:
*data = order.Uint32(bs)
case *int64:
*data = int64(order.Uint64(bs))
case *uint64:
*data = order.Uint64(bs)
case []bool:
for i, x := range bs { // Easier to loop over the input for 8-bit values.
data[i] = x != 0
}
case []int8:
for i, x := range bs {
data[i] = int8(x)
}
case []uint8:
copy(data, bs)
case []int16:
for i := range data {
data[i] = int16(order.Uint16(bs[2*i:]))
}
case []uint16:
for i := range data {
data[i] = order.Uint16(bs[2*i:])
}
case []int32:
for i := range data {
data[i] = int32(order.Uint32(bs[4*i:]))
}
case []uint32:
for i := range data {
data[i] = order.Uint32(bs[4*i:])
}
case []int64:
for i := range data {
data[i] = int64(order.Uint64(bs[8*i:]))
}
case []uint64:
for i := range data {
data[i] = order.Uint64(bs[8*i:])
}
default:
n = 0 // fast path doesn't apply
}
if n != 0 {
return nil
}
}

return errors.New("binary.Read: invalid type " + reflect.TypeOf(data).String())
}

// intDataSize returns the size of the data required to represent the data when encoded.
// It returns zero if the type cannot be implemented by the fast path in Read or Write.
func intDataSize(data any) int {
switch data := data.(type) {
case bool, int8, uint8, *bool, *int8, *uint8:
return 1
case []bool:
return len(data)
case []int8:
return len(data)
case []uint8:
return len(data)
case int16, uint16, *int16, *uint16:
return 2
case []int16:
return 2 * len(data)
case []uint16:
return 2 * len(data)
case int32, uint32, *int32, *uint32:
return 4
case []int32:
return 4 * len(data)
case []uint32:
return 4 * len(data)
case int64, uint64, *int64, *uint64:
return 8
case []int64:
return 8 * len(data)
case []uint64:
return 8 * len(data)
}
return 0
}
Loading

0 comments on commit 66b47ee

Please sign in to comment.