From a3ea114579299fd1386a980062bdc6d11f9faffe Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 20 Jun 2024 20:00:53 -0400 Subject: [PATCH] [CT-946] only send snapshots to uninitialized streams (backport #1738) (#1745) Co-authored-by: jayy04 <103467857+jayy04@users.noreply.github.com> --- .../streaming/grpc/grpc_streaming_manager.go | 142 +++++++++--------- .../streaming/grpc/noop_streaming_manager.go | 8 +- protocol/streaming/grpc/types/manager.go | 7 +- protocol/x/clob/keeper/keeper.go | 20 +-- 4 files changed, 89 insertions(+), 88 deletions(-) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 005f6d57ab..54037813f9 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -9,7 +9,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" - "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" @@ -206,11 +205,12 @@ func (sm *GrpcStreamingManagerImpl) Stop() { sm.done <- true } -// SendSnapshot groups updates by their clob pair ids and -// sends messages to the subscribers. It groups out updates differently -// and bypasses the buffer. +// SendSnapshot sends messages to a particular subscriber without buffering. +// Note this method requires the lock and assumes that the lock has already been +// acquired by the caller. func (sm *GrpcStreamingManagerImpl) SendSnapshot( offchainUpdates *clobtypes.OffchainUpdates, + subscriptionId uint32, blockHeight uint32, execMode sdk.ExecMode, ) { @@ -220,74 +220,56 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot( time.Now(), ) - // Group updates by clob pair id. - updates := make(map[uint32]*clobtypes.OffchainUpdates) - for _, message := range offchainUpdates.Messages { - clobPairId := message.OrderId.ClobPairId - if _, ok := updates[clobPairId]; !ok { - updates[clobPairId] = clobtypes.NewOffchainUpdates() - } - updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) + v1updates, err := GetOffchainUpdatesV1(offchainUpdates) + if err != nil { + panic(err) } - // Unmarshal each per-clob pair message to v1 updates. - updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) - for clobPairId, update := range updates { - v1updates, err := GetOffchainUpdatesV1(update) - if err != nil { - panic(err) - } - updatesByClobPairId[clobPairId] = v1updates - } - - sm.Lock() - defer sm.Unlock() - - idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - // Consolidate orderbook updates into a single `StreamUpdate`. - v1updates := make([]ocutypes.OffChainUpdateV1, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := updatesByClobPairId[clobPairId]; ok { - v1updates = append(v1updates, update...) - } + removeSubscription := false + if len(v1updates) > 0 { + subscription, ok := sm.orderbookSubscriptions[subscriptionId] + if !ok { + sm.logger.Error( + fmt.Sprintf( + "GRPC Streaming subscription id %+v not found. This should not happen.", + subscriptionId, + ), + ) + return } - - if len(v1updates) > 0 { - streamUpdates := []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: true, - }, + streamUpdates := []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: true, }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), }, - } - metrics.IncrCounter( - metrics.GrpcAddToSubscriptionChannelCount, - 1, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + }, + } + metrics.IncrCounter( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + ) + select { + case subscription.updatesChannel <- streamUpdates: + default: + sm.logger.Error( + fmt.Sprintf( + "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", + subscriptionId, + ), ) - select { - case subscription.updatesChannel <- streamUpdates: - default: - sm.logger.Error( - fmt.Sprintf( - "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", - id, - ), - ) - idsToRemove = append(idsToRemove, subscription.subscriptionId) - } + removeSubscription = true } } // Clean up subscriptions that have been closed. // If a Send update has failed for any clob pair id, the whole subscription will be removed. - for _, id := range idsToRemove { - sm.removeSubscription(id) + if removeSubscription { + sm.removeSubscription(subscriptionId) } } @@ -405,17 +387,22 @@ func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache( sm.EmitMetrics() } -// FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers. func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() { + sm.Lock() + defer sm.Unlock() + sm.FlushStreamUpdatesWithLock() +} + +// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers. +// Note this method requires the lock and assumes that the lock has already been +// acquired by the caller. +func (sm *GrpcStreamingManagerImpl) FlushStreamUpdatesWithLock() { defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, metrics.GrpcFlushUpdatesLatency, time.Now(), ) - sm.Lock() - defer sm.Unlock() - // Non-blocking send updates through subscriber's buffered channel. // If the buffer is full, drop the subscription. idsToRemove := make([]uint32, 0) @@ -456,23 +443,34 @@ func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() { sm.EmitMetrics() } -// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized. -func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 { +func (sm *GrpcStreamingManagerImpl) InitializeNewGrpcStreams( + getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, + blockHeight uint32, + execMode sdk.ExecMode, +) { sm.Lock() defer sm.Unlock() - clobPairIds := make(map[uint32]bool) - for _, subscription := range sm.orderbookSubscriptions { + // Flush any pending updates before sending the snapshot to avoid + // race conditions with the snapshot. + sm.FlushStreamUpdatesWithLock() + + updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates) + for subscriptionId, subscription := range sm.orderbookSubscriptions { subscription.initialize.Do( func() { + allUpdates := clobtypes.NewOffchainUpdates() for _, clobPairId := range subscription.clobPairIds { - clobPairIds[clobPairId] = true + if _, ok := updatesByClobPairId[clobPairId]; !ok { + updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId)) + } + allUpdates.Append(updatesByClobPairId[clobPairId]) } + + sm.SendSnapshot(allUpdates, subscriptionId, blockHeight, execMode) }, ) } - - return lib.GetSortedKeys[lib.Sortable[uint32]](clobPairIds) } // GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1. diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 0875e89faa..f5c61f0713 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -29,6 +29,7 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendSnapshot( updates *clobtypes.OffchainUpdates, + subscriptionId uint32, blockHeight uint32, execMode sdk.ExecMode, ) { @@ -49,8 +50,11 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( ) { } -func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { - return []uint32{} +func (sm *NoopGrpcStreamingManager) InitializeNewGrpcStreams( + getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, + blockHeight uint32, + execMode sdk.ExecMode, +) { } func (sm *NoopGrpcStreamingManager) Stop() { diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index ec43821093..74b145985c 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -15,9 +15,14 @@ type GrpcStreamingManager interface { ) ( err error, ) - GetUninitializedClobPairIds() []uint32 + InitializeNewGrpcStreams( + getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, + blockHeight uint32, + execMode sdk.ExecMode, + ) SendSnapshot( offchainUpdates *clobtypes.OffchainUpdates, + subscriptionId uint32, blockHeight uint32, execMode sdk.ExecMode, ) diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 043c2f3d7f..1bbd7872ee 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -260,20 +260,14 @@ func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) { // by sending the corresponding orderbook snapshots. func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { streamingManager := k.GetGrpcStreamingManager() - allUpdates := types.NewOffchainUpdates() - uninitializedClobPairIds := streamingManager.GetUninitializedClobPairIds() - for _, clobPairId := range uninitializedClobPairIds { - update := k.MemClob.GetOffchainUpdatesForOrderbookSnapshot( - ctx, - types.ClobPairId(clobPairId), - ) - - allUpdates.Append(update) - } - - streamingManager.SendSnapshot( - allUpdates, + streamingManager.InitializeNewGrpcStreams( + func(clobPairId types.ClobPairId) *types.OffchainUpdates { + return k.MemClob.GetOffchainUpdatesForOrderbookSnapshot( + ctx, + clobPairId, + ) + }, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), )