Skip to content

Commit

Permalink
Revert "[CT-1190] Emit FinalizeBlock updates in single batch. (#2260)"
Browse files Browse the repository at this point in the history
This reverts commit 72873fb.
  • Loading branch information
jonfung-dydx committed Sep 17, 2024
1 parent 21d08d2 commit 22525f9
Showing 1 changed file with 44 additions and 141 deletions.
185 changes: 44 additions & 141 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,19 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
return exists
}

func getStreamUpdatesFromOffchainUpdates(
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
Expand All @@ -506,8 +514,8 @@ func getStreamUpdatesFromOffchainUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
Expand All @@ -527,39 +535,26 @@ func getStreamUpdatesFromOffchainUpdates(
clobPairIds = append(clobPairIds, clobPairId)
}

return streamUpdates, clobPairIds
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}

func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) (
streamUpdates []clobtypes.StreamUpdate,
clobPairIds []uint32,
) {
// Group fills by clob pair id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -582,29 +577,6 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}
Expand Down Expand Up @@ -637,31 +609,6 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
)
}

func getStreamUpdatesForSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) (
streamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Group subaccount updates by subaccount id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
subaccountIds = make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
Expand All @@ -679,11 +626,20 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)
// Group subaccount updates by subaccount id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
subaccountIds := make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}
Expand Down Expand Up @@ -840,47 +796,6 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
return ret
}

// addBatchUpdatesToCacheWithLock adds batched updates to the cache.
// Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill
// and subaccount updates in a single stream.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates []clobtypes.StreamUpdate,
orderbookClobPairIds []uint32,
fillStreamUpdates []clobtypes.StreamUpdate,
fillClobPairIds []uint32,
subaccountStreamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Add orderbook updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...)
for _, clobPairId := range orderbookClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add fill updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...)
for _, clobPairId := range fillClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add subaccount updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...)
for _, subaccountId := range subaccountIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.subaccountIdToSubscriptionIdMapping[*subaccountId],
)
}
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
Expand All @@ -889,45 +804,33 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdates()

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
// TODO(CT-1190): Stream below in a single batch.
// Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
sm.SendOrderbookUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
// Send finalized fills from FinalizeBlock.
sm.SendOrderbookFillUpdates(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)

subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
// Send finalized subaccount updates from FinalizeBlock.
sm.SendFinalizedSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

sm.Lock()
defer sm.Unlock()

// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdatesWithLock()

sm.addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates,
orderbookClobPairIds,
fillStreamUpdates,
fillClobPairIds,
subaccountStreamUpdates,
subaccountIds,
)

// Emit all stream updates in a single batch.
// Note we still have the lock, which is released right before function returns.
sm.FlushStreamUpdatesWithLock()
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
Expand Down

0 comments on commit 22525f9

Please sign in to comment.