-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(fullnode) Add filterOrders option to streaming subscription #2676
base: main
Are you sure you want to change the base?
Changes from all commits
05ff9f3
309f2ec
39213e7
97d08ce
457459b
e632ed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,27 +2,25 @@ package streaming | |
|
||
import ( | ||
"fmt" | ||
"slices" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/dydxprotocol/v4-chain/protocol/lib" | ||
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" | ||
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" | ||
|
||
"cosmossdk.io/log" | ||
storetypes "cosmossdk.io/store/types" | ||
"github.com/cosmos/cosmos-sdk/codec" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" | ||
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock" | ||
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" | ||
"github.com/dydxprotocol/v4-chain/protocol/lib" | ||
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics" | ||
"github.com/dydxprotocol/v4-chain/protocol/streaming/types" | ||
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" | ||
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" | ||
|
||
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" | ||
|
||
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock" | ||
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" | ||
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" | ||
) | ||
|
||
var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) | ||
|
@@ -96,6 +94,41 @@ type OrderbookSubscription struct { | |
nextSnapshotBlock uint32 | ||
} | ||
|
||
func NewOrderbookSubscription( | ||
subscriptionId uint32, | ||
clobPairIds []uint32, | ||
subaccountIds []satypes.SubaccountId, | ||
marketIds []uint32, | ||
messageSender types.OutgoingMessageSender, | ||
updatesChannel chan []clobtypes.StreamUpdate, | ||
) *OrderbookSubscription { | ||
return &OrderbookSubscription{ | ||
subscriptionId: subscriptionId, | ||
initialized: &atomic.Bool{}, // False by default. | ||
clobPairIds: clobPairIds, | ||
subaccountIds: subaccountIds, | ||
marketIds: marketIds, | ||
messageSender: messageSender, | ||
updatesChannel: updatesChannel, | ||
} | ||
} | ||
|
||
func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription( | ||
clobPairIds []uint32, | ||
subaccountIds []satypes.SubaccountId, | ||
marketIds []uint32, | ||
messageSender types.OutgoingMessageSender, | ||
) *OrderbookSubscription { | ||
return NewOrderbookSubscription( | ||
sm.getNextAvailableSubscriptionId(), | ||
clobPairIds, | ||
subaccountIds, | ||
marketIds, | ||
messageSender, | ||
make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), | ||
) | ||
} | ||
|
||
func (sub *OrderbookSubscription) IsInitialized() bool { | ||
return sub.initialized.Load() | ||
} | ||
|
@@ -187,11 +220,68 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 | |
return id | ||
} | ||
|
||
// Filter StreamUpdates for subaccountIdNumbers | ||
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message | ||
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new | ||
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts | ||
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates( | ||
output chan []clobtypes.StreamUpdate, | ||
logger log.Logger, | ||
) { | ||
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds)) | ||
for i, subaccountId := range sub.subaccountIds { | ||
subaccountIdNumbers[i] = subaccountId.Number | ||
} | ||
|
||
// If reflection becomes too expensive, split updatesChannel by message type | ||
for updates := range sub.updatesChannel { | ||
filteredUpdates := []clobtypes.StreamUpdate{} | ||
for _, update := range updates { | ||
switch updateMessage := update.UpdateMessage.(type) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For modularity, move switch statements into a new function:
Also makes it easier to test |
||
case *clobtypes.StreamUpdate_OrderbookUpdate: | ||
orderBookUpdates := []ocutypes.OffChainUpdateV1{} | ||
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates { | ||
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) | ||
if err == nil { | ||
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to use slice over a map (as a set)? Is it because the former is more efficient at smaller size? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's based on the assumption that the subaccount cardinality is low. |
||
orderBookUpdates = append(orderBookUpdates, orderBookUpdate) | ||
} | ||
} else { | ||
logger.Error(err.Error()) | ||
} | ||
} | ||
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped | ||
if len(orderBookUpdates) > 0 { | ||
if len(orderBookUpdates) < len(updateMessage.OrderbookUpdate.Updates) { | ||
update = clobtypes.StreamUpdate{ | ||
BlockHeight: update.BlockHeight, | ||
ExecMode: update.ExecMode, | ||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||
Snapshot: updateMessage.OrderbookUpdate.Snapshot, | ||
Updates: orderBookUpdates, | ||
}, | ||
}, | ||
} | ||
} | ||
filteredUpdates = append(filteredUpdates, update) | ||
} | ||
default: | ||
filteredUpdates = append(filteredUpdates, update) | ||
} | ||
} | ||
if len(filteredUpdates) > 0 { | ||
output <- filteredUpdates | ||
} | ||
} | ||
} | ||
|
||
// Subscribe subscribes to the orderbook updates stream. | ||
func (sm *FullNodeStreamingManagerImpl) Subscribe( | ||
clobPairIds []uint32, | ||
subaccountIds []*satypes.SubaccountId, | ||
marketIds []uint32, | ||
filterOrders bool, | ||
messageSender types.OutgoingMessageSender, | ||
) ( | ||
err error, | ||
|
@@ -207,17 +297,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( | |
sIds[i] = *subaccountId | ||
} | ||
|
||
subscriptionId := sm.getNextAvailableSubscriptionId() | ||
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender) | ||
|
||
subscription := &OrderbookSubscription{ | ||
subscriptionId: subscriptionId, | ||
initialized: &atomic.Bool{}, // False by default. | ||
clobPairIds: clobPairIds, | ||
subaccountIds: sIds, | ||
marketIds: marketIds, | ||
messageSender: messageSender, | ||
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), | ||
} | ||
for _, clobPairId := range clobPairIds { | ||
// if clobPairId exists in the map, append the subscription id to the slice | ||
// otherwise, create a new slice with the subscription id | ||
|
@@ -265,9 +346,27 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( | |
sm.EmitMetrics() | ||
sm.Unlock() | ||
|
||
// If filterOrders, listen to filtered channel and start filter goroutine | ||
// Error if filterOrders but no subaccounts are subscribed | ||
filteredUpdateChannel := subscription.updatesChannel | ||
if filterOrders { | ||
if len(subaccountIds) == 0 { | ||
sm.logger.Error( | ||
fmt.Sprintf( | ||
"filterOrders requires subaccountIds for subscription id: %+v", | ||
subscription.subscriptionId, | ||
), | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just return error to user and fail the |
||
} else { | ||
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize) | ||
defer close(filteredUpdateChannel) | ||
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per offline discussion, let's start with the simpler implementation of doing filtering below in the loop on |
||
} | ||
} | ||
|
||
Comment on lines
+349
to
+366
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent execution when When Apply this diff to return an error when if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
+ return types.ErrInvalidStreamingRequest
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}
|
||
// Use current goroutine to consistently poll subscription channel for updates | ||
// to send through stream. | ||
for updates := range subscription.updatesChannel { | ||
for updates := range filteredUpdateChannel { | ||
metrics.IncrCounterWithLabels( | ||
metrics.GrpcSendResponseToSubscriberCount, | ||
1, | ||
|
@@ -1080,12 +1179,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( | |
sm.FlushStreamUpdatesWithLock() | ||
|
||
// Cache updates to sync local ops queue | ||
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), | ||
lib.MustConvertIntegerToUint32(ctx.BlockHeight()), | ||
ctx.ExecMode(), | ||
) | ||
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) | ||
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds) | ||
|
||
// Cache updates for finalized fills. | ||
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer making it more explicit: