Skip to content

Commit

Permalink
conntrack: Add operational metrics on errors for hash computation and…
Browse files Browse the repository at this point in the history
… aggregators (#416)

* Add operational metrics to hash computation and aggregators

* Update docs

* Fix rebase

* Add conntrack_end_connections operational metric
  • Loading branch information
ronensc authored Apr 17, 2023
1 parent 381890d commit 4aee357
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 47 deletions.
32 changes: 28 additions & 4 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,58 @@ Each table below provides documentation for an exported flowlogs-pipeline operat



### conntrack_aggregator_errors
| **Name** | conntrack_aggregator_errors |
|:---|:---|
| **Description** | The total number of errors during aggregation |
| **Type** | counter |
| **Labels** | error, field |


### conntrack_end_connections
| **Name** | conntrack_end_connections |
|:---|:---|
| **Description** | The total number of connections ended per group and reason |
| **Type** | counter |
| **Labels** | group, reason |


### conntrack_hash_errors
| **Name** | conntrack_hash_errors |
|:---|:---|
| **Description** | The total number of errors during hash computation |
| **Type** | counter |
| **Labels** | error, field |


### conntrack_input_records
| **Name** | conntrack_input_records |
|:---|:---|
| **Description** | The total number of input records per classification. |
| **Description** | The total number of input records per classification |
| **Type** | counter |
| **Labels** | classification |


### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
|:---|:---|
| **Description** | The total number of tracked connections in memory per group and phase. |
| **Description** | The total number of tracked connections in memory per group and phase |
| **Type** | gauge |
| **Labels** | group, phase |


### conntrack_output_records
| **Name** | conntrack_output_records |
|:---|:---|
| **Description** | The total number of output records. |
| **Description** | The total number of output records |
| **Type** | counter |
| **Labels** | type |


### conntrack_tcp_flags
| **Name** | conntrack_tcp_flags |
|:---|:---|
| **Description** | The total number of actions taken based on TCP flags. |
| **Description** | The total number of actions taken based on TCP flags |
| **Type** | counter |
| **Labels** | action |

Expand Down
13 changes: 10 additions & 3 deletions pkg/pipeline/extract/conntrack/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type aggregateBase struct {
outputField string
splitAB bool
initVal interface{}
metrics *metricsType
}

type aSum struct{ aggregateBase }
Expand All @@ -53,7 +54,7 @@ type aLast struct{ aggregateBase }
// TODO: think of adding a more complex operation such as Average Packet Size which involves 2 input fields: Bytes/Packets

// newAggregator returns a new aggregator depending on the output field operation
func newAggregator(of api.OutputField) (aggregator, error) {
func newAggregator(of api.OutputField, metrics *metricsType) (aggregator, error) {
if of.Name == "" {
return nil, fmt.Errorf("empty name %v", of)
}
Expand All @@ -63,7 +64,7 @@ func newAggregator(of api.OutputField) (aggregator, error) {
} else {
inputField = of.Name
}
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB}
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics}
var agg aggregator
switch of.Operation {
case api.ConnTrackOperationName("Sum"):
Expand Down Expand Up @@ -108,11 +109,17 @@ func (agg *aggregateBase) getOutputField(d direction) string {
func (agg *aggregateBase) getInputFieldValue(flowLog config.GenericMap) (float64, error) {
rawValue, ok := flowLog[agg.inputField]
if !ok {
if agg.metrics != nil {
agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc()
}
return 0, fmt.Errorf("missing field %v", agg.inputField)
}
floatValue, err := utils.ConvertToFloat64(rawValue)
if err != nil {
return 0, fmt.Errorf("cannot convert %v to float64: %w", rawValue, err)
if agg.metrics != nil {
agg.metrics.aggregatorErrors.WithLabelValues("Float64ConversionError", agg.inputField).Inc()
}
return 0, fmt.Errorf("cannot convert %q to float64: %w", rawValue, err)
}
return floatValue, nil
}
Expand Down
63 changes: 48 additions & 15 deletions pkg/pipeline/extract/conntrack/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

Expand All @@ -34,7 +35,7 @@ func TestNewAggregator_Invalid(t *testing.T) {
Operation: "sum",
SplitAB: true,
Input: "Input",
})
}, nil)
require.NotNil(t, err)

// unknown OperationType
Expand All @@ -43,15 +44,15 @@ func TestNewAggregator_Invalid(t *testing.T) {
Operation: "unknown",
SplitAB: true,
Input: "Input",
})
}, nil)
require.NotNil(t, err)

// invalid first agg
_, err = newAggregator(api.OutputField{
Operation: "first",
SplitAB: true,
Input: "Input",
})
}, nil)
require.NotNil(t, err)
}

Expand All @@ -64,58 +65,58 @@ func TestNewAggregator_Valid(t *testing.T) {
{
name: "Default SplitAB",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
},
{
name: "Default input",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0)}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil}},
},
{
name: "Custom input",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"},
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0)}},
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil}},
},
{
name: "OperationType sum",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
},
{
name: "OperationType count",
outputField: api.OutputField{Name: "MyAgg", Operation: "count"},
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
},
{
name: "OperationType max",
outputField: api.OutputField{Name: "MyAgg", Operation: "max"},
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64}},
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil}},
},
{
name: "OperationType min",
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64}},
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil}},
},
{
name: "Default first",
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
},
{
name: "Custom input first",
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil}},
},
{
name: "Default last",
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
},
}

for _, test := range table {
t.Run(test.name, func(t *testing.T) {
agg, err := newAggregator(test.outputField)
agg, err := newAggregator(test.outputField, nil)
require.NoError(t, err)
require.Equal(t, test.expected, agg)
})
Expand All @@ -134,7 +135,7 @@ func TestAddField_and_Update(t *testing.T) {
}
var aggs []aggregator
for _, of := range ofs {
agg, err := newAggregator(of)
agg, err := newAggregator(of, nil)
require.NoError(t, err)
aggs = append(aggs, agg)
}
Expand Down Expand Up @@ -183,3 +184,35 @@ func TestAddField_and_Update(t *testing.T) {
})
}
}

func TestMissingFieldError(t *testing.T) {
test.ResetPromRegistry()
metrics := newMetrics(opMetrics)
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
require.NoError(t, err)

conn := NewConnBuilder(metrics).Build()
agg.addField(conn)

flowLog := config.GenericMap{}
agg.update(conn, flowLog, dirAB, true)

exposed := test.ReadExposedMetrics(t)
require.Contains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"} 1`)
}

func TestFloat64ConversionError(t *testing.T) {
test.ResetPromRegistry()
metrics := newMetrics(opMetrics)
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
require.NoError(t, err)

conn := NewConnBuilder(metrics).Build()
agg.addField(conn)

flowLog := config.GenericMap{"Bytes": "float64 inconvertible value"}
agg.update(conn, flowLog, dirAB, true)

exposed := test.ReadExposedMetrics(t)
require.Contains(t, exposed, `conntrack_aggregator_errors{error="Float64ConversionError",field="Bytes"} 1`)
}
7 changes: 4 additions & 3 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM

var outputRecords []config.GenericMap
for _, fl := range flowLogs {
computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider())
computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider(), ct.metrics)
if err != nil {
log.Warningf("skipping flow log %v: %v", fl, err)
ct.metrics.inputRecords.WithLabelValues("rejected").Inc()
Expand Down Expand Up @@ -220,9 +220,11 @@ func NewConnectionTrack(opMetrics *operational.Metrics, params config.StageParam
return nil, fmt.Errorf("ConnectionTrack config is invalid: %w", err)
}

metrics := newMetrics(opMetrics)

var aggregators []aggregator
for _, of := range cfg.OutputFields {
agg, err := newAggregator(of)
agg, err := newAggregator(of, metrics)
if err != nil {
return nil, fmt.Errorf("error creating aggregator: %w", err)
}
Expand All @@ -248,7 +250,6 @@ func NewConnectionTrack(opMetrics *operational.Metrics, params config.StageParam
}

endpointAFields, endpointBFields := cfg.GetABFields()
metrics := newMetrics(opMetrics)
conntrack := &conntrackImpl{
clock: clock,
connStore: newConnectionStore(cfg.Scheduling, metrics, clock.Now),
Expand Down
4 changes: 4 additions & 0 deletions pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,8 @@ func TestScheduling(t *testing.T) {
assertStoreConsistency(t, ct)
})
}
exposed := test.ReadExposedMetrics(t)
require.Contains(t, exposed, `conntrack_end_connections{group="0: Proto=1, ",reason="timeout"} 1`)
}

func assertStoreConsistency(t *testing.T, extractor extract.Extractor) {
Expand Down Expand Up @@ -1265,4 +1267,6 @@ func TestExpiringConnection(t *testing.T) {
assertStoreConsistency(t, ct)
})
}
exposed := test.ReadExposedMetrics(t)
require.Contains(t, exposed, `conntrack_end_connections{group="0: DEFAULT",reason="FIN_flag"} 1`)
}
9 changes: 6 additions & 3 deletions pkg/pipeline/extract/conntrack/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type totalHashType struct {

// ComputeHash computes the hash of a flow log according to keyDefinition.
// Two flow logs will have the same hash if they belong to the same connection.
func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash64) (totalHashType, error) {
func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash64, metrics *metricsType) (totalHashType, error) {
fieldGroup2hash := make(map[string]uint64)

// Compute the hash of each field group
for _, fg := range keyDefinition.FieldGroups {
h, err := computeHashFields(flowLog, fg.Fields, hasher)
h, err := computeHashFields(flowLog, fg.Fields, hasher, metrics)
if err != nil {
return totalHashType{}, fmt.Errorf("compute hash: %w", err)
}
Expand Down Expand Up @@ -72,12 +72,15 @@ func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, has
return th, nil
}

func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash64) (uint64, error) {
func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash64, metrics *metricsType) (uint64, error) {
hasher.Reset()
for _, fn := range fieldNames {
f, ok := flowLog[fn]
if !ok {
log.Warningf("Missing field %v", fn)
if metrics != nil {
metrics.hashErrors.WithLabelValues("MissingFieldError", fn).Inc()
}
continue
}
bytes, err := toBytes(f)
Expand Down
15 changes: 10 additions & 5 deletions pkg/pipeline/extract/conntrack/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -102,8 +103,8 @@ func TestComputeHash_Unidirectional(t *testing.T) {
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher)
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher)
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher, nil)
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher, nil)
require.NoError(t, err1)
require.NoError(t, err2)
if test.sameHash {
Expand Down Expand Up @@ -191,8 +192,8 @@ func TestComputeHash_Bidirectional(t *testing.T) {
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher)
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher)
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher, nil)
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher, nil)
require.NoError(t, err1)
require.NoError(t, err2)
if test.sameHash {
Expand All @@ -205,6 +206,7 @@ func TestComputeHash_Bidirectional(t *testing.T) {
}

func TestComputeHash_MissingField(t *testing.T) {
test.ResetPromRegistry()
keyDefinition := api.KeyDefinition{
FieldGroups: []api.FieldGroup{
{
Expand All @@ -229,7 +231,10 @@ func TestComputeHash_MissingField(t *testing.T) {

fl := newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false)

h, err := ComputeHash(fl, keyDefinition, testHasher)
metrics := newMetrics(opMetrics)
h, err := ComputeHash(fl, keyDefinition, testHasher, metrics)
require.NoError(t, err)
require.NotNil(t, h)
exposed := test.ReadExposedMetrics(t)
require.Contains(t, exposed, `conntrack_hash_errors{error="MissingFieldError",field="Missing"} 1`)
}
Loading

0 comments on commit 4aee357

Please sign in to comment.