From 01d9cc200cdd355e64d77596a133f77124f12bb6 Mon Sep 17 00:00:00 2001
From: Teddy Ding <teddy@dydx.exchange>
Date: Wed, 2 Oct 2024 14:34:47 -0400
Subject: [PATCH] Move event staging logic into a generic EventStager

---
 protocol/finalizeblock/event_stager.go        | 81 +++++++++++++++
 .../streaming/full_node_streaming_manager.go  | 99 +++++--------------
 protocol/streaming/noop_streaming_manager.go  |  7 --
 protocol/streaming/types/interface.go         |  5 -
 protocol/x/clob/types/expected_keepers.go     |  4 -
 protocol/x/subaccounts/keeper/subaccount.go   | 16 ---
 protocol/x/subaccounts/types/types.go         |  4 -
 7 files changed, 108 insertions(+), 108 deletions(-)
 create mode 100644 protocol/finalizeblock/event_stager.go

diff --git a/protocol/finalizeblock/event_stager.go b/protocol/finalizeblock/event_stager.go
new file mode 100644
index 0000000000..88e0552052
--- /dev/null
+++ b/protocol/finalizeblock/event_stager.go
@@ -0,0 +1,81 @@
+package finalizeblock
+
+import (
+	"encoding/binary"
+
+	"cosmossdk.io/store/prefix"
+	storetypes "cosmossdk.io/store/types"
+	"github.com/cosmos/cosmos-sdk/codec"
+	sdk "github.com/cosmos/cosmos-sdk/types"
+	"github.com/cosmos/gogoproto/proto"
+	ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
+	"github.com/dydxprotocol/v4-chain/protocol/lib"
+)
+
+// EventStager supports staging and retrieval of events (of type T) from FinalizeBlock.
+type EventStager[T proto.Message] struct {
+	transientStoreKey    storetypes.StoreKey
+	cdc                  codec.BinaryCodec
+	stagedEventCountKey  string
+	stagedEventKeyPrefix string
+}
+
+// NewEventStager creates a new EventStager.
+func NewEventStager[T proto.Message](
+	transientStoreKey storetypes.StoreKey,
+	cdc codec.BinaryCodec,
+	stagedEventCountKey string,
+	stagedEventKeyPrefix string,
+) EventStager[T] {
+	return EventStager[T]{
+		transientStoreKey:    transientStoreKey,
+		cdc:                  cdc,
+		stagedEventCountKey:  stagedEventCountKey,
+		stagedEventKeyPrefix: stagedEventKeyPrefix,
+	}
+}
+
+// GetStagedFinalizeBlockEventsFromStore retrieves all staged events from the store.
+func (s EventStager[T]) GetStagedFinalizeBlockEventsFromStore(
+	store storetypes.KVStore,
+	newStagedEvent func() T,
+) []T {
+	count := s.getStagedEventsCount(store)
+	events := make([]T, count)
+	store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
+	for i := uint32(0); i < count; i++ {
+		event := newStagedEvent()
+		bytes := store.Get(lib.Uint32ToKey(i))
+		s.cdc.MustUnmarshal(bytes, event)
+		events[i] = event
+	}
+	return events
+}
+
+func (s EventStager[T]) getStagedEventsCount(
+	store storetypes.KVStore,
+) uint32 {
+	countsBytes := store.Get([]byte(s.stagedEventCountKey))
+	if countsBytes == nil {
+		return 0
+	}
+	return binary.BigEndian.Uint32(countsBytes)
+}
+
+// StageFinalizeBlockEvent stages an event in the transient store.
+func (s EventStager[T]) StageFinalizeBlockEvent(
+	ctx sdk.Context,
+	eventBytes []byte,
+) {
+	noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
+
+	store := noGasCtx.TransientStore(s.transientStoreKey)
+
+	// Increment events count.
+	count := s.getStagedEventsCount(store)
+	store.Set([]byte(s.stagedEventCountKey), lib.Uint32ToKey(count+1))
+
+	// Store events keyed by index.
+	store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
+	store.Set(lib.Uint32ToKey(count), eventBytes)
+}
diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go
index 85c265f12e..f8f4bb0fce 100644
--- a/protocol/streaming/full_node_streaming_manager.go
+++ b/protocol/streaming/full_node_streaming_manager.go
@@ -1,7 +1,6 @@
 package streaming
 
 import (
-	"encoding/binary"
 	"fmt"
 	"sync"
 	"sync/atomic"
@@ -11,7 +10,6 @@ import (
 	satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
 
 	"cosmossdk.io/log"
-	"cosmossdk.io/store/prefix"
 	storetypes "cosmossdk.io/store/types"
 	"github.com/cosmos/cosmos-sdk/codec"
 	sdk "github.com/cosmos/cosmos-sdk/types"
@@ -22,6 +20,8 @@ import (
 	clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
 
 	ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
+
+	"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
 )
 
 var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
@@ -62,6 +62,8 @@ type FullNodeStreamingManagerImpl struct {
 
 	// stores the staged FinalizeBlock events for full node streaming.
 	streamingManagerTransientStoreKey storetypes.StoreKey
+
+	finalizeBlockStager finalizeblock.EventStager[*clobtypes.StagedFinalizeBlockEvent]
 }
 
 // OrderbookSubscription represents a active subscription to the orderbook updates stream.
@@ -119,6 +121,12 @@ func NewFullNodeStreamingManager(
 
 		streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
 		cdc:                               cdc,
+		finalizeBlockStager: finalizeblock.NewEventStager[*clobtypes.StagedFinalizeBlockEvent](
+			streamingManagerTransientStoreKey,
+			cdc,
+			StagedEventsCountKey,
+			StagedEventsKeyPrefix,
+		),
 	}
 
 	// Start the goroutine for pushing order updates through.
@@ -381,14 +389,6 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
 	}
 }
 
-func getStagedEventsCount(store storetypes.KVStore) uint32 {
-	countsBytes := store.Get([]byte(StagedEventsCountKey))
-	if countsBytes == nil {
-		return 0
-	}
-	return binary.BigEndian.Uint32(countsBytes)
-}
-
 // Send a subaccount update event.
 func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
 	ctx sdk.Context,
@@ -405,51 +405,32 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
 			SubaccountUpdate: &subaccountUpdate,
 		},
 	}
-	sm.stageFinalizeBlockEvent(
+	sm.finalizeBlockStager.StageFinalizeBlockEvent(
 		ctx,
 		sm.cdc.MustMarshal(&stagedEvent),
 	)
 }
 
-func getStagedFinalizeBlockEventsFromStore(
-	store storetypes.KVStore,
-	cdc codec.BinaryCodec,
-) []clobtypes.StagedFinalizeBlockEvent {
-	count := getStagedEventsCount(store)
-	events := make([]clobtypes.StagedFinalizeBlockEvent, count)
-	store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
-	for i := uint32(0); i < count; i++ {
-		var event clobtypes.StagedFinalizeBlockEvent
-		bytes := store.Get(lib.Uint32ToKey(i))
-		cdc.MustUnmarshal(bytes, &event)
-		events[i] = event
-	}
-	return events
-}
-
 // Retrieve all events staged during `FinalizeBlock`.
 func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
 	ctx sdk.Context,
 ) []clobtypes.StagedFinalizeBlockEvent {
 	noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
 	store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
-	return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
-}
-
-func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
-	ctx sdk.Context,
-	eventBytes []byte,
-) {
-	noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
-	store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
-
-	// Increment events count.
-	count := getStagedEventsCount(store)
-	store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))
-
-	// Store events keyed by index.
-	store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
-	store.Set(lib.Uint32ToKey(count), eventBytes)
+	events := sm.finalizeBlockStager.GetStagedFinalizeBlockEventsFromStore(
+		store,
+		func() *clobtypes.StagedFinalizeBlockEvent {
+			return &clobtypes.StagedFinalizeBlockEvent{}
+		},
+	)
+	results := make([]clobtypes.StagedFinalizeBlockEvent, len(events))
+	for i, event := range events {
+		if event == nil {
+			panic("Got nil event from finalizeBlockStager")
+		}
+		results[i] = *event
+	}
+	return results
 }
 
 // SendCombinedSnapshot sends messages to a particular subscriber without buffering.
@@ -574,7 +555,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
 			},
 		},
 	}
-	sm.stageFinalizeBlockEvent(
+	sm.finalizeBlockStager.StageFinalizeBlockEvent(
 		ctx,
 		sm.cdc.MustMarshal(&stagedEvent),
 	)
@@ -649,7 +630,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
 		},
 	}
 
-	sm.stageFinalizeBlockEvent(
+	sm.finalizeBlockStager.StageFinalizeBlockEvent(
 		ctx,
 		sm.cdc.MustMarshal(&stagedEvent),
 	)
@@ -710,32 +691,6 @@ func getStreamUpdatesForSubaccountUpdates(
 	return streamUpdates, subaccountIds
 }
 
-// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
-// sends messages to the subscribers.
-func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
-	subaccountUpdates []satypes.StreamSubaccountUpdate,
-	blockHeight uint32,
-	execMode sdk.ExecMode,
-) {
-	defer metrics.ModuleMeasureSince(
-		metrics.FullNodeGrpc,
-		metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
-		time.Now(),
-	)
-
-	if execMode != sdk.ExecModeFinalize {
-		panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
-	}
-
-	streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
-		subaccountUpdates,
-		blockHeight,
-		execMode,
-	)
-
-	sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
-}
-
 // AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
 // Clob pair ids are the clob pair id each update is relevant to.
 func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go
index 9dc7bf6de9..89250854c4 100644
--- a/protocol/streaming/noop_streaming_manager.go
+++ b/protocol/streaming/noop_streaming_manager.go
@@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
 ) {
 }
 
-func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates(
-	subaccountUpdates []satypes.StreamSubaccountUpdate,
-	blockHeight uint32,
-	execMode sdk.ExecMode,
-) {
-}
-
 func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool {
 	return false
 }
diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go
index 5b42864016..33907fc1ec 100644
--- a/protocol/streaming/types/interface.go
+++ b/protocol/streaming/types/interface.go
@@ -42,11 +42,6 @@ type FullNodeStreamingManager interface {
 		takerOrder clobtypes.StreamTakerOrder,
 		ctx sdk.Context,
 	)
-	SendFinalizedSubaccountUpdates(
-		subaccountUpdates []satypes.StreamSubaccountUpdate,
-		blockHeight uint32,
-		execMode sdk.ExecMode,
-	)
 	SendSubaccountUpdate(
 		ctx sdk.Context,
 		subaccountUpdate satypes.StreamSubaccountUpdate,
diff --git a/protocol/x/clob/types/expected_keepers.go b/protocol/x/clob/types/expected_keepers.go
index 872daf2905..edf2af9c2b 100644
--- a/protocol/x/clob/types/expected_keepers.go
+++ b/protocol/x/clob/types/expected_keepers.go
@@ -87,10 +87,6 @@ type SubaccountsKeeper interface {
 		revSharesForFill revsharetypes.RevSharesForFill,
 		fillForProcess FillForProcess,
 	) error
-	SendFinalizedSubaccountUpdates(
-		ctx sdk.Context,
-		subaccountUpdates []satypes.StreamSubaccountUpdate,
-	)
 }
 
 type AssetsKeeper interface {
diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go
index e72ccd60e7..52d4c05bb3 100644
--- a/protocol/x/subaccounts/keeper/subaccount.go
+++ b/protocol/x/subaccounts/keeper/subaccount.go
@@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals(
 func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
 	return k.streamingManager
 }
-
-// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager.
-func (k Keeper) SendFinalizedSubaccountUpdates(
-	ctx sdk.Context,
-	subaccountUpdates []types.StreamSubaccountUpdate,
-) {
-	lib.AssertDeliverTxMode(ctx)
-	if len(subaccountUpdates) == 0 {
-		return
-	}
-	k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
-		subaccountUpdates,
-		lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
-		ctx.ExecMode(),
-	)
-}
diff --git a/protocol/x/subaccounts/types/types.go b/protocol/x/subaccounts/types/types.go
index cbccc9d2b9..3e180d05c3 100644
--- a/protocol/x/subaccounts/types/types.go
+++ b/protocol/x/subaccounts/types/types.go
@@ -77,8 +77,4 @@ type SubaccountsKeeper interface {
 		perpetualId uint32,
 		blockHeight uint32,
 	) error
-	SendFinalizedSubaccountUpdates(
-		ctx sdk.Context,
-		subaccountUpdates []StreamSubaccountUpdate,
-	)
 }