Skip to content

Commit

Permalink
Draft of integration tests for metrics..
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandre Bourget authored and colindickson committed Oct 14, 2024
1 parent ced4b37 commit e1e7647
Showing 1 changed file with 51 additions and 18 deletions.
69 changes: 51 additions & 18 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"os"
"path"
"path/filepath"
"slices"
"strings"
"testing"
"time"

"github.com/streamingfast/bstream"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/manifest"
"github.com/streamingfast/substreams/orchestrator/stage"
Expand Down Expand Up @@ -198,6 +200,7 @@ func TestOneStoreOneMap(t *testing.T) {
spkg string
expectFiles []string
expectError string
expectedMetering []string
expectTier1Events bool
expectTier2Events bool
}{
Expand Down Expand Up @@ -348,8 +351,35 @@ func TestOneStoreOneMap(t *testing.T) {
assertTestStoreAddI64Hash + "/outputs/0000000010-0000000020.output",
assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output",
},
expectTier1Events: true,
expectTier2Events: true,
expectedMetering: []string{
"tier1:egress_bytes:5349.00",
"tier1:file_compressed_read_bytes:0.00",
"tier1:file_compressed_read_forked_bytes:0.00",
"tier1:file_compressed_write_bytes:0.00",
"tier1:file_uncompressed_read_bytes:0.00",
"tier1:file_uncompressed_read_forked_bytes:0.00",
"tier1:file_uncompressed_write_bytes:0.00",
"tier1:live_uncompressed_read_bytes:0.00",
"tier1:live_uncompressed_read_forked_bytes:0.00",
"tier1:message_count:31.00",
"tier1:read_bytes:0.00",
"tier1:wasm_input_bytes:0.00",
"tier1:written_bytes:0.00",
"tier2:egress_bytes:937.00",
"tier2:file_compressed_read_bytes:125.00",
"tier2:file_compressed_read_forked_bytes:0.00",
"tier2:file_compressed_write_bytes:75.00",
"tier2:file_uncompressed_read_bytes:60.00",
"tier2:file_uncompressed_read_forked_bytes:0.00",
"tier2:file_uncompressed_write_bytes:36.00",
"tier2:live_uncompressed_read_bytes:0.00",
"tier2:live_uncompressed_read_forked_bytes:0.00",
"tier2:message_count:24.00",
"tier2:read_bytes:125.00",
"tier2:wasm_input_bytes:4697.00",
"tier2:written_bytes:36.00",
},
expectTier1Events: true, expectTier2Events: true,
},
}

Expand All @@ -374,23 +404,10 @@ func TestOneStoreOneMap(t *testing.T) {
mapOutput := run.MapOutputString("assert_test_store_add_i64")
assert.Contains(t, mapOutput, `assert_test_store_add_i64: 0801`)

var tier1EventsFound bool
for _, ev := range run.Events {
if ev.Endpoint == "tier1" {
tier1EventsFound = true
break
}
}
assert.Equal(t, test.expectTier1Events, tier1EventsFound)

var tier2EventsFound bool
for _, ev := range run.Events {
if ev.Endpoint == "tier2" {
tier2EventsFound = true
break
}
if test.expectedMetering != nil {
meteringEvents := computeMeteringEvents(run.Events)
assert.Contains(t, meteringEvents, test.expectedMetering)
}
assert.Equal(t, test.expectTier2Events, tier2EventsFound)

assert.Equal(t, test.expectedResponseCount, strings.Count(mapOutput, "\n"))

Expand All @@ -407,6 +424,22 @@ func TestOneStoreOneMap(t *testing.T) {
}
}

func computeMeteringEvents(events []dmetering.Event) (out []string) {
meteringSums := make(map[string]float64)
for _, ev := range events {
for metricName, value := range ev.Metrics {
key := fmt.Sprintf("%s:%s", ev.Endpoint, metricName)
meteringSums[key] += value
}
}

for k, v := range meteringSums {
out = append(out, fmt.Sprintf("%s:%0.2f", k, v))
}
slices.Sort(out)
return
}

func files(t *testing.T, tempDir string) []string {
var storedFiles []string
require.NoError(t, filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
Expand Down

0 comments on commit e1e7647

Please sign in to comment.