diff --git a/protocol/finalizeblock/event_stager.go b/protocol/finalizeblock/event_stager.go new file mode 100644 index 0000000000..88e0552052 --- /dev/null +++ b/protocol/finalizeblock/event_stager.go @@ -0,0 +1,81 @@ +package finalizeblock + +import ( + "encoding/binary" + + "cosmossdk.io/store/prefix" + storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/codec" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/gogoproto/proto" + ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" + "github.com/dydxprotocol/v4-chain/protocol/lib" +) + +// EventStager supports staging and retrieval of events (of type T) from FinalizeBlock. +type EventStager[T proto.Message] struct { + transientStoreKey storetypes.StoreKey + cdc codec.BinaryCodec + stagedEventCountKey string + stagedEventKeyPrefix string +} + +// NewEventStager creates a new EventStager. +func NewEventStager[T proto.Message]( + transientStoreKey storetypes.StoreKey, + cdc codec.BinaryCodec, + stagedEventCountKey string, + stagedEventKeyPrefix string, +) EventStager[T] { + return EventStager[T]{ + transientStoreKey: transientStoreKey, + cdc: cdc, + stagedEventCountKey: stagedEventCountKey, + stagedEventKeyPrefix: stagedEventKeyPrefix, + } +} + +// GetStagedFinalizeBlockEventsFromStore retrieves all staged events from the store. +func (s EventStager[T]) GetStagedFinalizeBlockEventsFromStore( + store storetypes.KVStore, + newStagedEvent func() T, +) []T { + count := s.getStagedEventsCount(store) + events := make([]T, count) + store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix)) + for i := uint32(0); i < count; i++ { + event := newStagedEvent() + bytes := store.Get(lib.Uint32ToKey(i)) + s.cdc.MustUnmarshal(bytes, event) + events[i] = event + } + return events +} + +func (s EventStager[T]) getStagedEventsCount( + store storetypes.KVStore, +) uint32 { + countsBytes := store.Get([]byte(s.stagedEventCountKey)) + if countsBytes == nil { + return 0 + } + return binary.BigEndian.Uint32(countsBytes) +} + +// StageFinalizeBlockEvent stages an event in the transient store. +func (s EventStager[T]) StageFinalizeBlockEvent( + ctx sdk.Context, + eventBytes []byte, +) { + noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) + + store := noGasCtx.TransientStore(s.transientStoreKey) + + // Increment events count. + count := s.getStagedEventsCount(store) + store.Set([]byte(s.stagedEventCountKey), lib.Uint32ToKey(count+1)) + + // Store events keyed by index. + store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix)) + store.Set(lib.Uint32ToKey(count), eventBytes) +} diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 85c265f12e..f8f4bb0fce 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -1,7 +1,6 @@ package streaming import ( - "encoding/binary" "fmt" "sync" "sync/atomic" @@ -11,7 +10,6 @@ import ( satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "cosmossdk.io/log" - "cosmossdk.io/store/prefix" storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" @@ -22,6 +20,8 @@ import ( clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + + "github.com/dydxprotocol/v4-chain/protocol/finalizeblock" ) var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) @@ -62,6 +62,8 @@ type FullNodeStreamingManagerImpl struct { // stores the staged FinalizeBlock events for full node streaming. streamingManagerTransientStoreKey storetypes.StoreKey + + finalizeBlockStager finalizeblock.EventStager[*clobtypes.StagedFinalizeBlockEvent] } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -119,6 +121,12 @@ func NewFullNodeStreamingManager( streamingManagerTransientStoreKey: streamingManagerTransientStoreKey, cdc: cdc, + finalizeBlockStager: finalizeblock.NewEventStager[*clobtypes.StagedFinalizeBlockEvent]( + streamingManagerTransientStoreKey, + cdc, + StagedEventsCountKey, + StagedEventsKeyPrefix, + ), } // Start the goroutine for pushing order updates through. @@ -381,14 +389,6 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( } } -func getStagedEventsCount(store storetypes.KVStore) uint32 { - countsBytes := store.Get([]byte(StagedEventsCountKey)) - if countsBytes == nil { - return 0 - } - return binary.BigEndian.Uint32(countsBytes) -} - // Send a subaccount update event. func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, @@ -405,51 +405,32 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( SubaccountUpdate: &subaccountUpdate, }, } - sm.stageFinalizeBlockEvent( + sm.finalizeBlockStager.StageFinalizeBlockEvent( ctx, sm.cdc.MustMarshal(&stagedEvent), ) } -func getStagedFinalizeBlockEventsFromStore( - store storetypes.KVStore, - cdc codec.BinaryCodec, -) []clobtypes.StagedFinalizeBlockEvent { - count := getStagedEventsCount(store) - events := make([]clobtypes.StagedFinalizeBlockEvent, count) - store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) - for i := uint32(0); i < count; i++ { - var event clobtypes.StagedFinalizeBlockEvent - bytes := store.Get(lib.Uint32ToKey(i)) - cdc.MustUnmarshal(bytes, &event) - events[i] = event - } - return events -} - // Retrieve all events staged during `FinalizeBlock`. func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent { noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) - return getStagedFinalizeBlockEventsFromStore(store, sm.cdc) -} - -func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent( - ctx sdk.Context, - eventBytes []byte, -) { - noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) - store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) - - // Increment events count. - count := getStagedEventsCount(store) - store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1)) - - // Store events keyed by index. - store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) - store.Set(lib.Uint32ToKey(count), eventBytes) + events := sm.finalizeBlockStager.GetStagedFinalizeBlockEventsFromStore( + store, + func() *clobtypes.StagedFinalizeBlockEvent { + return &clobtypes.StagedFinalizeBlockEvent{} + }, + ) + results := make([]clobtypes.StagedFinalizeBlockEvent, len(events)) + for i, event := range events { + if event == nil { + panic("Got nil event from finalizeBlockStager") + } + results[i] = *event + } + return results } // SendCombinedSnapshot sends messages to a particular subscriber without buffering. @@ -574,7 +555,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( }, }, } - sm.stageFinalizeBlockEvent( + sm.finalizeBlockStager.StageFinalizeBlockEvent( ctx, sm.cdc.MustMarshal(&stagedEvent), ) @@ -649,7 +630,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( }, } - sm.stageFinalizeBlockEvent( + sm.finalizeBlockStager.StageFinalizeBlockEvent( ctx, sm.cdc.MustMarshal(&stagedEvent), ) @@ -710,32 +691,6 @@ func getStreamUpdatesForSubaccountUpdates( return streamUpdates, subaccountIds } -// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and -// sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, -) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendFinalizedSubaccountUpdatesLatency, - time.Now(), - ) - - if execMode != sdk.ExecModeFinalize { - panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize") - } - - streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( - subaccountUpdates, - blockHeight, - execMode, - ) - - sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds) -} - // AddOrderUpdatesToCache adds a series of updates to the full node streaming cache. // Clob pair ids are the clob pair id each update is relevant to. func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 9dc7bf6de9..89250854c4 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( ) { } -func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, -) { -} - func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool { return false } diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 5b42864016..33907fc1ec 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface { takerOrder clobtypes.StreamTakerOrder, ctx sdk.Context, ) - SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, - ) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, diff --git a/protocol/x/clob/types/expected_keepers.go b/protocol/x/clob/types/expected_keepers.go index 872daf2905..edf2af9c2b 100644 --- a/protocol/x/clob/types/expected_keepers.go +++ b/protocol/x/clob/types/expected_keepers.go @@ -87,10 +87,6 @@ type SubaccountsKeeper interface { revSharesForFill revsharetypes.RevSharesForFill, fillForProcess FillForProcess, ) error - SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []satypes.StreamSubaccountUpdate, - ) } type AssetsKeeper interface { diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index e72ccd60e7..52d4c05bb3 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals( func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager { return k.streamingManager } - -// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager. -func (k Keeper) SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []types.StreamSubaccountUpdate, -) { - lib.AssertDeliverTxMode(ctx) - if len(subaccountUpdates) == 0 { - return - } - k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates( - subaccountUpdates, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), - ) -} diff --git a/protocol/x/subaccounts/types/types.go b/protocol/x/subaccounts/types/types.go index cbccc9d2b9..3e180d05c3 100644 --- a/protocol/x/subaccounts/types/types.go +++ b/protocol/x/subaccounts/types/types.go @@ -77,8 +77,4 @@ type SubaccountsKeeper interface { perpetualId uint32, blockHeight uint32, ) error - SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []StreamSubaccountUpdate, - ) }