Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move FinalizeBlock event staging logic into a generic EventStager (backport #2435) #2436

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions protocol/finalizeblock/event_stager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package finalizeblock

import (
"encoding/binary"

"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
)

// EventStager supports staging and retrieval of events (of type T) from FinalizeBlock.
type EventStager[T proto.Message] struct {
transientStoreKey storetypes.StoreKey
cdc codec.BinaryCodec
stagedEventCountKey string
stagedEventKeyPrefix string
}

// NewEventStager creates a new EventStager.
func NewEventStager[T proto.Message](
transientStoreKey storetypes.StoreKey,
cdc codec.BinaryCodec,
stagedEventCountKey string,
stagedEventKeyPrefix string,
) EventStager[T] {
return EventStager[T]{
transientStoreKey: transientStoreKey,
cdc: cdc,
stagedEventCountKey: stagedEventCountKey,
stagedEventKeyPrefix: stagedEventKeyPrefix,
}
}

// GetStagedFinalizeBlockEvents retrieves all staged events from the store.
func (s EventStager[T]) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
newStagedEvent func() T,
) []T {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(s.transientStoreKey)

count := s.getStagedEventsCount(store)
events := make([]T, count)
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
for i := uint32(0); i < count; i++ {
event := newStagedEvent()
bytes := store.Get(lib.Uint32ToKey(i))
s.cdc.MustUnmarshal(bytes, event)
events[i] = event
}
return events
}

func (s EventStager[T]) getStagedEventsCount(
store storetypes.KVStore,
) uint32 {
countsBytes := store.Get([]byte(s.stagedEventCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// StageFinalizeBlockEvent stages an event in the transient store.
func (s EventStager[T]) StageFinalizeBlockEvent(
ctx sdk.Context,
stagedEvent T,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(s.transientStoreKey)

// Increment events count.
count := s.getStagedEventsCount(store)
store.Set([]byte(s.stagedEventCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
store.Set(lib.Uint32ToKey(count), s.cdc.MustMarshal(stagedEvent))
}
107 changes: 30 additions & 77 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -11,7 +10,6 @@ import (
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -22,6 +20,8 @@ import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
Expand Down Expand Up @@ -62,6 +62,8 @@ type FullNodeStreamingManagerImpl struct {

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey

finalizeBlockStager finalizeblock.EventStager[*clobtypes.StagedFinalizeBlockEvent]
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -119,6 +121,12 @@ func NewFullNodeStreamingManager(

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
cdc: cdc,
finalizeBlockStager: finalizeblock.NewEventStager[*clobtypes.StagedFinalizeBlockEvent](
streamingManagerTransientStoreKey,
cdc,
StagedEventsCountKey,
StagedEventsKeyPrefix,
),
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -381,14 +389,6 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// Send a subaccount update event.
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
Expand All @@ -405,51 +405,30 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
&stagedEvent,
)
}

func getStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
cdc codec.BinaryCodec,
) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
cdc.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
events := sm.finalizeBlockStager.GetStagedFinalizeBlockEvents(
ctx,
func() *clobtypes.StagedFinalizeBlockEvent {
return &clobtypes.StagedFinalizeBlockEvent{}
},
)
results := make([]clobtypes.StagedFinalizeBlockEvent, len(events))
for i, event := range events {
if event == nil {
panic("Got nil event from finalizeBlockStager")
}
results[i] = *event
}
return results
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
Expand Down Expand Up @@ -574,9 +553,9 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
},
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
&stagedEvent,
)
}

Expand Down Expand Up @@ -649,9 +628,9 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
},
}

sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
&stagedEvent,
)
}

Expand Down Expand Up @@ -710,32 +689,6 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
time.Now(),
)

if execMode != sdk.ExecModeFinalize {
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand Down
7 changes: 0 additions & 7 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
) {
}

func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool {
return false
}
Expand Down
5 changes: 0 additions & 5 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface {
takerOrder clobtypes.StreamTakerOrder,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
Expand Down
4 changes: 0 additions & 4 deletions protocol/x/clob/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ type SubaccountsKeeper interface {
revSharesForFill revsharetypes.RevSharesForFill,
fillForProcess FillForProcess,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []satypes.StreamSubaccountUpdate,
)
}

type AssetsKeeper interface {
Expand Down
16 changes: 0 additions & 16 deletions protocol/x/subaccounts/keeper/subaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals(
func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}

// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager.
func (k Keeper) SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []types.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
if len(subaccountUpdates) == 0 {
return
}
k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
subaccountUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
4 changes: 0 additions & 4 deletions protocol/x/subaccounts/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,4 @@ type SubaccountsKeeper interface {
perpetualId uint32,
blockHeight uint32,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []StreamSubaccountUpdate,
)
}
Loading