diff --git a/relayer/app/app.go b/relayer/app/app.go index 6cc439d29..2e61484fb 100644 --- a/relayer/app/app.go +++ b/relayer/app/app.go @@ -67,7 +67,7 @@ func Run(ctx context.Context, cfg Config) error { pricer := newTokenPricer(ctx) pnl := newPnlLogger(network.ID, pricer) - attestStreamer := newLeaderStreamer(cprov, network) + attestStreamer := newLeaderStreamer(cprov.StreamAttestations, network.ID) for _, destChain := range network.EVMChains() { // Setup send provider diff --git a/relayer/app/cursors_internal_test.go b/relayer/app/cursors_internal_test.go index 77c00a377..461155891 100644 --- a/relayer/app/cursors_internal_test.go +++ b/relayer/app/cursors_internal_test.go @@ -142,13 +142,13 @@ const mockValSetID = 99 type mockProvider struct { cchain.Provider - SubscribeFn func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) + StreamFunc func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) error } -func (m *mockProvider) StreamAsync(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, +func (m *mockProvider) StreamAttestations(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, _ string, callback cchain.ProviderCallback, -) { - m.SubscribeFn(ctx, chainVer, attestOffset, callback) +) error { + return m.StreamFunc(ctx, chainVer, attestOffset, callback) } func (m *mockProvider) PortalValidatorSet(ctx context.Context, valSetID uint64) ([]cchain.PortalValidator, bool, error) { diff --git a/relayer/app/streambuf.go b/relayer/app/leadstream.go similarity index 87% rename from relayer/app/streambuf.go rename to relayer/app/leadstream.go index 67e4792ae..c43cef685 100644 --- a/relayer/app/streambuf.go +++ b/relayer/app/leadstream.go @@ -2,14 +2,17 @@ package relayer import ( "context" + "sync" + "time" + "github.com/omni-network/omni/lib/cchain" "github.com/omni-network/omni/lib/errors" "github.com/omni-network/omni/lib/log" "github.com/omni-network/omni/lib/netconf" + "github.com/omni-network/omni/lib/umath" "github.com/omni-network/omni/lib/xchain" + "github.com/tidwall/btree" - "sync" - "time" ) // errStop is returned when the leader should stop streaming. @@ -24,12 +27,21 @@ func getLimit(network netconf.ID) int { return 10_000 // 10k atts & 1KB per attestation & 10 chain versions ~= 100MB } +// getBackoff returns the duration to backoff before querying the cache again. +func getBackoff(network netconf.ID) time.Duration { + if network == netconf.Simnet { + return time.Millisecond // No backoff in tests + } + + return time.Second // Default 1 second blocks otherwise +} + // leadChaosTimeout returns a function that returns true if the leader chaos timeout has been reached. // This ensures we rotate leaders after a certain time (and test leader rotation). func leadChaosTimeout(network netconf.ID) func() bool { t0 := time.Now() return func() bool { - duration := time.Hour + duration := time.Hour // Default 1 hour timeout if network == netconf.Devnet { duration = time.Second * 10 } else if network == netconf.Staging { @@ -38,7 +50,6 @@ func leadChaosTimeout(network netconf.ID) func() bool { return time.Since(t0) > duration } - } // leader tracks a worker actively streaming attestations and adding them to the cache. @@ -95,6 +106,8 @@ type attestBuffer struct { // Add adds an attestation to the cache if the cache is not full or // if the attestation is not too old. // It returns true if it was added and an existing key was replaced. +// +//nolint:nonamedreturns // Name for clarify of API. func (b *attestBuffer) Add(att xchain.Attestation) (replaced bool) { b.mu.Lock() defer b.mu.Unlock() @@ -175,17 +188,20 @@ type attestStreamer func(ctx context.Context, chainVer xchain.ChainVersion, atte // newLeaderStreamer returns a new attestStreamer that avoids multiple overlapping streaming queries // by selecting a leader to query each range of offsets. -func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStreamer { +func newLeaderStreamer(upstream attestStreamer, network netconf.ID) attestStreamer { var buffers sync.Map // map[xchain.ChainVer]*attestBuffer return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, workerName string, callback cchain.ProviderCallback) error { - anyBuffer, _ := buffers.LoadOrStore(chainVer, newAttestBuffer(getLimit(network.ID))) - buffer := anyBuffer.(*attestBuffer) + anyBuffer, _ := buffers.LoadOrStore(chainVer, newAttestBuffer(getLimit(network))) + buffer := anyBuffer.(*attestBuffer) //nolint:revive,forcetypeassert // Type is known - name := network.ChainVersionName(chainVer) + name := netconf.ChainVersionNamer(network)(chainVer) // Track the offset of the last attestation we "processed" - prevOffset := fromOffset - 1 + prevOffset, ok := umath.Subtract(fromOffset, 1) + if !ok { + return errors.New("attest from offset zero [BUG]", "from", fromOffset) + } // lead blocks and streams attestations from the provided height using the provided leader. // It populates the cache with fetched attestations. @@ -193,9 +209,9 @@ func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStr lead := func(l *leader, from uint64) error { defer l.Delete() log.Debug(ctx, "Starting attest stream", "chain", name, "offset", from, "worker", workerName) - timeout := leadChaosTimeout(network.ID) + timeout := leadChaosTimeout(network) - err := cprov.StreamAttestations(ctx, chainVer, from, workerName, func(ctx context.Context, att xchain.Attestation) error { + err := upstream(ctx, chainVer, from, workerName, func(ctx context.Context, att xchain.Attestation) error { l.IncRange(att.AttestOffset) replaced := buffer.Add(att) @@ -258,7 +274,7 @@ func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStr } // Otherwise, wait a bit, and try the same offset again - timer.Reset(time.Second) + timer.Reset(getBackoff(network)) } } } diff --git a/relayer/app/leadstream_internal_test.go b/relayer/app/leadstream_internal_test.go new file mode 100644 index 000000000..024b9ec20 --- /dev/null +++ b/relayer/app/leadstream_internal_test.go @@ -0,0 +1,147 @@ +package relayer + +import ( + "context" + "maps" + "sync" + "testing" + + "github.com/omni-network/omni/lib/cchain" + "github.com/omni-network/omni/lib/errors" + "github.com/omni-network/omni/lib/netconf" + "github.com/omni-network/omni/lib/xchain" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestLeaderStreamer tests the leader streamer. +func TestLeaderStreamer(t *testing.T) { + t.Parallel() + + // upstream tracks upstream requests and returns test responses + upstream := newUpstream() + upstreamFunc := func( + ctx context.Context, + chainVer xchain.ChainVersion, + attestOffset uint64, + workerName string, + callback cchain.ProviderCallback, + ) error { + resps := upstream.Req(workerName, attestOffset) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case resp := <-resps: + err := callback(ctx, xchain.Attestation{AttestHeader: xchain.AttestHeader{ChainVersion: chainVer, AttestOffset: resp}}) + if err != nil { + return err + } + } + } + } + + streamer := newLeaderStreamer(upstreamFunc, netconf.Simnet) + + ctx := context.Background() + var eg errgroup.Group + errDone := errors.New("done") + + // startStream starts a worker stream from `from` to `to`, ensuring strictly sequential attestations. + startStream := func(worker string, from uint64, to uint64) error { + next := from + return streamer(ctx, xchain.ChainVersion{}, from, worker, func(ctx context.Context, att xchain.Attestation) error { + if att.AttestOffset != next { + return errors.New("unexpected offset") + } + next++ + + if att.AttestOffset == to { + return errDone + } + + return nil + }) + } + + w1 := "worker1" + w2 := "worker2" + w3 := "worker3" + + eg.Go(func() error { + // worker 1 streams from 3 to 5 as leader + return startStream(w1, 3, 5) + }) + upstream.Respond(w1, 3) // w1 starts leader streaming at 3 + eg.Go(func() error { + // worker 2 streams from 1 to 7, 1-4 as leader, 4-5 from cache, 6-7 as leader + return startStream(w2, 1, 7) + }) + upstream.Respond(w2, 1) // w2 starts leader streaming at 1 + upstream.Respond(w1, 4) // w1 continues leader streaming + upstream.Respond(w2, 2) // w2 continues leader streaming + upstream.Respond(w2, 3) // w2 overlaps w1, switch to cache + upstream.Respond(w1, 5) // w1 is done + upstream.Respond(w2, 6) // w2 switches back to lead streaming + upstream.Respond(w2, 7) // w2 is done + + eg.Go(func() error { + // worker 3 streams from 8 to 8 as leader + return startStream(w3, 1, 8) + }) + upstream.Respond(w3, 8) // w3 starts leader streaming at 8 and is done + + require.ErrorIs(t, eg.Wait(), errDone) + require.EqualValues(t, map[string][]uint64{ + w1: {3}, + w2: {1, 6}, + w3: {8}, + }, upstream.Reqs()) +} + +func newUpstream() *upstream { + return &upstream{ + reqs: make(map[string][]uint64), + resps: make(map[string]chan uint64), + } +} + +type upstream struct { + mu sync.Mutex + reqs map[string][]uint64 + resps map[string]chan uint64 +} + +func (u *upstream) Respond(worker string, offset uint64) { + u.mu.Lock() + resp, ok := u.resps[worker] + if !ok { + resp = make(chan uint64) + u.resps[worker] = resp + } + u.mu.Unlock() + + resp <- offset +} + +func (u *upstream) Reqs() map[string][]uint64 { + u.mu.Lock() + defer u.mu.Unlock() + + return maps.Clone(u.reqs) +} + +func (u *upstream) Req(worker string, from uint64) chan uint64 { + u.mu.Lock() + u.reqs[worker] = append(u.reqs[worker], from) + + resp, ok := u.resps[worker] + if !ok { + resp = make(chan uint64) + u.resps[worker] = resp + } + u.mu.Unlock() + + return resp +} diff --git a/relayer/app/types_internal_test.go b/relayer/app/types_internal_test.go deleted file mode 100644 index dfd65c3eb..000000000 --- a/relayer/app/types_internal_test.go +++ /dev/null @@ -1 +0,0 @@ -package relayer diff --git a/relayer/app/worker.go b/relayer/app/worker.go index 1188c2cbb..5b65c4f62 100644 --- a/relayer/app/worker.go +++ b/relayer/app/worker.go @@ -2,7 +2,6 @@ package relayer import ( "context" - "golang.org/x/sync/errgroup" "sync/atomic" "time" @@ -15,6 +14,8 @@ import ( "github.com/omni-network/omni/lib/xchain" "github.com/ethereum/go-ethereum/accounts/abi/bind" + + "golang.org/x/sync/errgroup" ) const ( @@ -210,6 +211,7 @@ func (w *Worker) newCallback( var cachedValSet []cchain.PortalValidator var prevOffset uint64 + //nolint:nonamedreturns // Required for defer. return func(ctx context.Context, att xchain.Attestation) (err error) { // Sanity check strictly sequential offsets. { diff --git a/relayer/app/worker_internal_test.go b/relayer/app/worker_internal_test.go index 522d7f0d9..24fc8b4ac 100644 --- a/relayer/app/worker_internal_test.go +++ b/relayer/app/worker_internal_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/omni-network/omni/lib/cchain" + "github.com/omni-network/omni/lib/errors" "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" @@ -96,12 +97,18 @@ func TestWorker_Run(t *testing.T) { // Provider mock attestations as requested until context canceled. mockProvider := &mockProvider{ - SubscribeFn: func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) { + StreamFunc: func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) error { if chainVer.ID != srcChain { - return // Only subscribe to source chain. + return errors.New("not source chain") } + if attestOffset == 1 { + // Block other streams + <-ctx.Done() + return ctx.Err() + } + if attestOffset != destChainACursor && attestOffset != destChainBCursor { - return + return errors.New("unexpected offset", "offset", attestOffset, "chain_a", destChainACursor, "chain_b", destChainBCursor) } offset := attestOffset @@ -132,10 +139,10 @@ func TestWorker_Run(t *testing.T) { } } - for ctx.Err() == nil { + for { err := callback(ctx, nextAtt()) if ctx.Err() != nil { - return + return nil //nolint:nilerr // Return nil as per contract } require.NoError(t, err) } @@ -158,7 +165,8 @@ func TestWorker_Run(t *testing.T) { mockXClient, mockCreateFunc, func() (SendAsync, error) { return mockSender.SendTransaction, nil }, - noAwait) + noAwait, + mockProvider.StreamAttestations) go w.Run(ctx) }