Skip to content

Commit

Permalink
Move event staging logic into a generic EventStager
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Oct 2, 2024
1 parent 672169b commit 01d9cc2
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 108 deletions.
81 changes: 81 additions & 0 deletions protocol/finalizeblock/event_stager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package finalizeblock

import (
"encoding/binary"

"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
)

// EventStager supports staging and retrieval of events (of type T) from FinalizeBlock.
type EventStager[T proto.Message] struct {
transientStoreKey storetypes.StoreKey
cdc codec.BinaryCodec
stagedEventCountKey string
stagedEventKeyPrefix string
}

// NewEventStager creates a new EventStager.
func NewEventStager[T proto.Message](
transientStoreKey storetypes.StoreKey,
cdc codec.BinaryCodec,
stagedEventCountKey string,
stagedEventKeyPrefix string,
) EventStager[T] {
return EventStager[T]{
transientStoreKey: transientStoreKey,
cdc: cdc,
stagedEventCountKey: stagedEventCountKey,
stagedEventKeyPrefix: stagedEventKeyPrefix,
}
}

// GetStagedFinalizeBlockEventsFromStore retrieves all staged events from the store.
func (s EventStager[T]) GetStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
newStagedEvent func() T,
) []T {
count := s.getStagedEventsCount(store)
events := make([]T, count)
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
for i := uint32(0); i < count; i++ {
event := newStagedEvent()
bytes := store.Get(lib.Uint32ToKey(i))
s.cdc.MustUnmarshal(bytes, event)
events[i] = event
}
return events
}

func (s EventStager[T]) getStagedEventsCount(
store storetypes.KVStore,
) uint32 {
countsBytes := store.Get([]byte(s.stagedEventCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// StageFinalizeBlockEvent stages an event in the transient store.
func (s EventStager[T]) StageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())

store := noGasCtx.TransientStore(s.transientStoreKey)

// Increment events count.
count := s.getStagedEventsCount(store)
store.Set([]byte(s.stagedEventCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
}
99 changes: 27 additions & 72 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -11,7 +10,6 @@ import (
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -22,6 +20,8 @@ import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
Expand Down Expand Up @@ -62,6 +62,8 @@ type FullNodeStreamingManagerImpl struct {

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey

finalizeBlockStager finalizeblock.EventStager[*clobtypes.StagedFinalizeBlockEvent]
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -119,6 +121,12 @@ func NewFullNodeStreamingManager(

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
cdc: cdc,
finalizeBlockStager: finalizeblock.NewEventStager[*clobtypes.StagedFinalizeBlockEvent](
streamingManagerTransientStoreKey,
cdc,
StagedEventsCountKey,
StagedEventsKeyPrefix,
),
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -381,14 +389,6 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// Send a subaccount update event.
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
Expand All @@ -405,51 +405,32 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

func getStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
cdc codec.BinaryCodec,
) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
cdc.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
events := sm.finalizeBlockStager.GetStagedFinalizeBlockEventsFromStore(
store,
func() *clobtypes.StagedFinalizeBlockEvent {
return &clobtypes.StagedFinalizeBlockEvent{}
},
)
results := make([]clobtypes.StagedFinalizeBlockEvent, len(events))
for i, event := range events {
if event == nil {
panic("Got nil event from finalizeBlockStager")
}
results[i] = *event
}
return results
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
Expand Down Expand Up @@ -574,7 +555,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
},
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
Expand Down Expand Up @@ -649,7 +630,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
},
}

sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
Expand Down Expand Up @@ -710,32 +691,6 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
time.Now(),
)

if execMode != sdk.ExecModeFinalize {
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand Down
7 changes: 0 additions & 7 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
) {
}

func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool {
return false
}
Expand Down
5 changes: 0 additions & 5 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface {
takerOrder clobtypes.StreamTakerOrder,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
Expand Down
4 changes: 0 additions & 4 deletions protocol/x/clob/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ type SubaccountsKeeper interface {
revSharesForFill revsharetypes.RevSharesForFill,
fillForProcess FillForProcess,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []satypes.StreamSubaccountUpdate,
)
}

type AssetsKeeper interface {
Expand Down
16 changes: 0 additions & 16 deletions protocol/x/subaccounts/keeper/subaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals(
func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}

// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager.
func (k Keeper) SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []types.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
if len(subaccountUpdates) == 0 {
return
}
k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
subaccountUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
4 changes: 0 additions & 4 deletions protocol/x/subaccounts/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,4 @@ type SubaccountsKeeper interface {
perpetualId uint32,
blockHeight uint32,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []StreamSubaccountUpdate,
)
}

0 comments on commit 01d9cc2

Please sign in to comment.