From 301956262472b230d3bb749b069afe54a6d9fab1 Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Thu, 26 Sep 2024 11:43:27 -0400 Subject: [PATCH] [OTE-823] Fix FNS onchain events staging + retrieval logic (#2318) --- protocol/app/app.go | 1 + protocol/app/flags/flags.go | 4 ---- protocol/app/flags/flags_test.go | 15 +++++++----- .../streaming/full_node_streaming_manager.go | 23 +++++++++++++++---- protocol/x/clob/module.go | 10 ++++++++ 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index 0528e2b17c..aad83efdcd 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -2100,6 +2100,7 @@ func getFullNodeStreamingManagerFromOptions( appFlags.GrpcStreamingMaxChannelBufferSize, appFlags.FullNodeStreamingSnapshotInterval, streamingManagerTransientStoreKey, + cdc, ) // Start websocket server. diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index cb1715ba62..9a49d4c15c 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -163,10 +163,6 @@ func (f *Flags) Validate() error { // Grpc streaming if f.GrpcStreamingEnabled { - if f.OptimisticExecutionEnabled { - // TODO(OTE-456): Finish gRPC streaming x OE integration. - return fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution") - } if !f.GrpcEnable { return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server") } diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index 50e3a32c72..0ad03872fa 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -113,14 +113,17 @@ func TestValidate(t *testing.T) { OptimisticExecutionEnabled: true, }, }, - "failure - optimistic execution cannot be enabled with gRPC streaming": { + "success - optimistic execution canbe enabled with gRPC streaming": { flags: flags.Flags{ - NonValidatingFullNode: false, - GrpcEnable: true, - GrpcStreamingEnabled: true, - OptimisticExecutionEnabled: true, + NonValidatingFullNode: false, + GrpcEnable: true, + GrpcStreamingEnabled: true, + OptimisticExecutionEnabled: true, + GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxChannelBufferSize: 2000, + WebsocketStreamingPort: 8989, }, - expectedErr: fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution"), }, "failure - gRPC disabled": { flags: flags.Flags{ diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 3b3cab68ad..c43a254e78 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -13,6 +13,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/prefix" storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" @@ -27,6 +28,7 @@ var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) type FullNodeStreamingManagerImpl struct { sync.Mutex + cdc codec.BinaryCodec logger log.Logger // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. @@ -95,6 +97,7 @@ func NewFullNodeStreamingManager( maxSubscriptionChannelSize uint32, snapshotBlockInterval uint32, streamingManagerTransientStoreKey storetypes.StoreKey, + cdc codec.BinaryCodec, ) *FullNodeStreamingManagerImpl { fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ logger: logger, @@ -113,6 +116,7 @@ func NewFullNodeStreamingManager( snapshotBlockInterval: snapshotBlockInterval, streamingManagerTransientStoreKey: streamingManagerTransientStoreKey, + cdc: cdc, } // Start the goroutine for pushing order updates through. @@ -391,6 +395,7 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { + lib.AssertDeliverTxMode(ctx) stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ SubaccountUpdate: &subaccountUpdate, @@ -398,7 +403,7 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( } sm.stageFinalizeBlockEvent( ctx, - clobtypes.Amino.MustMarshal(stagedEvent), + sm.cdc.MustMarshal(&stagedEvent), ) } @@ -411,25 +416,30 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( ctx sdk.Context, fill clobtypes.StreamOrderbookFill, ) { + lib.AssertDeliverTxMode(ctx) stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ OrderFill: &fill, }, } + sm.stageFinalizeBlockEvent( ctx, - clobtypes.Amino.MustMarshal(stagedEvent), + sm.cdc.MustMarshal(&stagedEvent), ) } -func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent { +func getStagedFinalizeBlockEventsFromStore( + store storetypes.KVStore, + cdc codec.BinaryCodec, +) []clobtypes.StagedFinalizeBlockEvent { count := getStagedEventsCount(store) events := make([]clobtypes.StagedFinalizeBlockEvent, count) store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) for i := uint32(0); i < count; i++ { var event clobtypes.StagedFinalizeBlockEvent bytes := store.Get(lib.Uint32ToKey(i)) - clobtypes.Amino.MustUnmarshal(bytes, &event) + cdc.MustUnmarshal(bytes, &event) events[i] = event } return events @@ -441,7 +451,7 @@ func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents( ) []clobtypes.StagedFinalizeBlockEvent { noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) - return getStagedFinalizeBlockEvents(store) + return getStagedFinalizeBlockEventsFromStore(store, sm.cdc) } func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent( @@ -889,6 +899,9 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { + // Prevent gas metering from state read. + ctx = ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) + finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates( diff --git a/protocol/x/clob/module.go b/protocol/x/clob/module.go index 2b61ed37c7..0e06d7e035 100644 --- a/protocol/x/clob/module.go +++ b/protocol/x/clob/module.go @@ -177,6 +177,16 @@ func (am AppModule) PreBlock(ctx context.Context) (appmodule.ResponsePreBlock, e }, nil } +// BeginBlock executes all ABCI BeginBlock logic respective to the clob module. +func (am AppModule) Precommit(ctx context.Context) error { + defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyPrecommiter) + Precommit( + lib.UnwrapSDKContext(ctx, types.ModuleName), + *am.keeper, + ) + return nil +} + // BeginBlock executes all ABCI BeginBlock logic respective to the clob module. func (am AppModule) BeginBlock(ctx context.Context) error { defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyBeginBlocker)