-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Full Node Streaming Order Filtering by Subaccount impl and tests #2704
base: main
Are you sure you want to change the base?
Full Node Streaming Order Filtering by Subaccount impl and tests #2704
Conversation
WalkthroughThis pull request introduces a new boolean parameter Changes
Sequence DiagramsequenceDiagram
participant Client
participant WebSocketServer
participant StreamingManager
participant OrderbookService
Client->>WebSocketServer: Connect with filterOrdersBySubaccountId
WebSocketServer->>StreamingManager: Subscribe(subaccountIds, filterFlag)
StreamingManager->>OrderbookService: Request Updates
OrderbookService-->>StreamingManager: Filter Updates by Subaccount
StreamingManager-->>WebSocketServer: Send Filtered Updates
WebSocketServer-->>Client: Stream Filtered Updates
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (4)
protocol/streaming/util/util_test.go (1)
21-123
: Add test cases for error scenarios and edge cases.The test suite thoroughly covers the happy path for different order types but lacks test cases for:
- Invalid order IDs
- Malformed orders
- Empty/nil orders
- Other error conditions that could occur in production
Add test cases for error scenarios:
tests := map[string]struct { update ocutypes.OffChainUpdateV1 id satypes.SubaccountId err error }{ + "InvalidOrder": { + update: ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ + OrderPlace: &ocutypes.OrderPlaceV1{ + Order: nil, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED, + TimeStamp: _ToPtr(orderPlaceTime), + }, + }, + }, + id: satypes.SubaccountId{}, + err: errors.New("order is nil"), + }, + "MalformedOrderId": { + update: ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ + OrderPlace: &ocutypes.OrderPlaceV1{ + Order: &v1types.IndexerOrder{ + OrderId: v1types.IndexerOrderId{}, + }, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED, + TimeStamp: _ToPtr(orderPlaceTime), + }, + }, + }, + id: satypes.SubaccountId{}, + err: errors.New("invalid order id"), + },protocol/streaming/ws/websocket_server.go (1)
100-108
: Consider adding logging for subscription initialization.The error handling for subscription initialization could be improved by adding debug logs to help with troubleshooting.
Add logging:
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil { ws.logger.Error("Error sending close message", "err", err) + ws.logger.Debug( + "Failed to initialize subscription", + "filterOrdersBySubaccountId", filterOrdersBySubaccountId, + "subaccountIds", subaccountIds, + ) }protocol/streaming/full_node_streaming_manager_test.go (1)
300-456
: Improve test case descriptions and organization.While the test cases are comprehensive, their names and organization could be improved for better readability and maintainability.
Consider reorganizing the test cases into logical groups with more descriptive names:
tests := map[string]TestCase{ - "snapshotNotInScope": { + "Snapshots/NotInScope": { updates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, subaccountIds: []satypes.SubaccountId{neverInScopeSubaccountId}, filteredUpdates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, }, - "snapshotNoScope": { + "Snapshots/NoScope": { updates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, subaccountIds: []satypes.SubaccountId{}, filteredUpdates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, }, // ... more test casesAlso, consider adding test cases for concurrent access and race conditions:
"ConcurrentAccess/MultipleUpdates": { updates: []clobtypes.StreamUpdate{ baseStreamUpdate, otherStreamUpdate, }, subaccountIds: []satypes.SubaccountId{ subaccountId, otherSubaccountId, }, filteredUpdates: []clobtypes.StreamUpdate{ baseStreamUpdate, otherStreamUpdate, }, },protocol/streaming/full_node_streaming_manager.go (1)
220-248
: Add metrics for filtered updates and error tracking.The filtering logic would benefit from metrics to track:
- Number of updates filtered
- Number of errors encountered
- Filtering latency
Add metrics:
func FilterStreamUpdateBySubaccount( updates []clobtypes.StreamUpdate, subaccountIds []satypes.SubaccountId, logger log.Logger, ) []clobtypes.StreamUpdate { + start := time.Now() + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcFilterUpdatesBySubaccountLatency, + start, + ) + filteredUpdates := []clobtypes.StreamUpdate{} + errorCount := 0 for _, update := range updates { switch updateMessage := update.UpdateMessage.(type) { case *clobtypes.StreamUpdate_OrderbookUpdate: if updateMessage.OrderbookUpdate.Snapshot { break } doFilter, err := doFilterStreamUpdateBySubaccount(updateMessage, subaccountIds) if err != nil { logger.Error(err.Error()) + errorCount++ + metrics.IncrCounter(metrics.GrpcFilterUpdatesBySubaccountErrorCount, 1) } if !doFilter { continue } } filteredUpdates = append(filteredUpdates, update) } + metrics.SetGauge(metrics.GrpcFilteredUpdatesCount, float32(len(filteredUpdates))) + metrics.SetGauge(metrics.GrpcFilterErrorsCount, float32(errorCount)) return filteredUpdates }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
protocol/x/clob/types/query.pb.go
is excluded by!**/*.pb.go
📒 Files selected for processing (10)
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
(6 hunks)proto/dydxprotocol/clob/query.proto
(1 hunks)protocol/streaming/full_node_streaming_manager.go
(7 hunks)protocol/streaming/full_node_streaming_manager_test.go
(1 hunks)protocol/streaming/noop_streaming_manager.go
(1 hunks)protocol/streaming/types/interface.go
(1 hunks)protocol/streaming/util/util.go
(2 hunks)protocol/streaming/util/util_test.go
(1 hunks)protocol/streaming/ws/websocket_server.go
(3 hunks)protocol/x/clob/keeper/grpc_stream_orderbook.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
- GitHub Check: test-sim-after-import
- GitHub Check: test-sim-nondeterminism
- GitHub Check: test-sim-import-export
- GitHub Check: test-sim-multi-seed-short
- GitHub Check: test / run_command
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: test-coverage-upload
- GitHub Check: container-tests
- GitHub Check: benchmark
- GitHub Check: test-race
- GitHub Check: liveness-test
- GitHub Check: Summary
🔇 Additional comments (9)
protocol/x/clob/keeper/grpc_stream_orderbook.go (1)
15-15
: LGTM! Parameter addition aligns with filtering requirements.The new parameter
req.GetFilterOrdersBySubaccountId()
is correctly passed to the Subscribe method, enabling order filtering by subaccount ID.protocol/streaming/util/util.go (2)
8-10
: LGTM! Required imports added.The new imports support the subaccount ID extraction functionality.
27-46
: LGTM! Well-structured subaccount ID extraction.The function handles all order-related event types comprehensively with proper error handling. The comment on line 27 provides good context about potential future extensions.
Let's verify the test coverage for this new function:
✅ Verification successful
Test coverage is solid but could be more complete
The function has comprehensive test coverage for all order-related event types (Place, Remove, Update, Replace). Consider adding a test case for the default error scenario to achieve 100% coverage, although this is a minor enhancement since the error case is straightforward.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if there are corresponding test cases for the new function rg -A 5 "TestGetOffChainUpdateV1SubaccountId" protocol/streaming/util/Length of output: 526
Script:
#!/bin/bash # Fetch the complete test implementation rg -A 50 "TestGetOffChainUpdateV1SubaccountId" protocol/streaming/util/util_test.goLength of output: 1616
Script:
#!/bin/bash # Fetch the entire test file content cat protocol/streaming/util/util_test.goLength of output: 3578
protocol/streaming/types/interface.go (1)
19-19
: LGTM! Interface updated consistently.The new boolean parameter is properly added to the Subscribe method signature.
Let's verify that all implementations of the interface have been updated:
✅ Verification successful
Interface implementations updated correctly
Both implementations of
FullNodeStreamingManager
have been properly updated with the newfilterOrdersBySubAccountId
parameter:
FullNodeStreamingManagerImpl
: Main implementation with proper parameter usageNoopGrpcStreamingManager
: No-op implementation maintaining expected behavior🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Find all implementations of FullNodeStreamingManager interface ast-grep --pattern 'type $_ struct { $$$ } func ($_ $_) Subscribe( $$$ ) ( err error, ) { $$$ }'Length of output: 107
Script:
#!/bin/bash # First, find potential implementation files echo "=== Files containing FullNodeStreamingManager ===" rg "FullNodeStreamingManager" -l echo -e "\n=== Struct implementations ===" rg "type \w+StreamingManager struct" echo -e "\n=== Subscribe method implementations ===" rg "func.*Subscribe.*filterOrdersBySubaccountId" -A 2Length of output: 1122
Script:
#!/bin/bash echo "=== Full Node Implementation ===" cat protocol/streaming/full_node_streaming_manager.go echo -e "\n=== Noop Implementation ===" cat protocol/streaming/noop_streaming_manager.goLength of output: 46436
protocol/streaming/noop_streaming_manager.go (1)
27-27
: LGTM! Noop implementation updated correctly.The Subscribe method signature is updated to match the interface while maintaining its noop behavior.
proto/dydxprotocol/clob/query.proto (1)
189-193
: Well documented boolean flag addition!The new field
filter_orders_by_subaccount_id
is well documented and follows proto3 conventions. The documentation clearly explains that when true, only orders from provided subaccount IDs will be included in the updates.indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (3)
281-287
: LGTM - Interface field properly matches proto definition!The TypeScript interface correctly reflects the proto definition with proper naming conventions and documentation.
303-309
: LGTM - SDK type properly matches proto definition!The SDK type interface correctly maintains the proto naming convention for external compatibility.
1315-1316
: LGTM - Proper protobuf encoding/decoding implementation!The implementation correctly:
- Sets appropriate default value (false)
- Handles encoding/decoding of the boolean field
- Properly manages undefined/null cases in fromPartial
Also applies to: 1341-1345, 1388-1391, 1406-1406
Changelist
Full Node Streaming provides an initial state and streaming updates for positions, orders, prices and fills.
The subscription API admits an optional sub account ID filter, which is only applied to the initial positions and position changes.
The user would like to apply the sub account ID filter to the order messages, in addition to position messages.
The change will add boolean flags to the Websocket and GRPC streaming API's:
filterOrdersBySubaccountId
boolean field for WS request (if not provided, default to False)filter_orders_by_subaccount_id
boolean field forStreamOrderbookUpdatesRequest
protobuf (if not provided, default to False)For all endpoints, the previous behavior of not filtering orders for subaccounts is preserved by default.
If filtering orders is not specified, the code path remains the same for looping over stream updates.
If filtering orders is specified, each slice of updates received from the subscription
updatesChannel
will be filtered like:StreamUpdate_OrderbookUpdate
, forward itStreamUpdate_OrderbookUpdate
, forward iff one of theOffChainUpdateV1
messages inside are for a target subaccountTest Plan
Unit test
load-tester / Python grpc-stream-client integration test
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
Release Notes
New Features
Improvements
filterOrdersBySubaccountId
for WebSocket and gRPC streamingTechnical Updates