Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full Node Streaming Order Filtering by Subaccount impl and tests #2704

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

UnbornAztecKing
Copy link

@UnbornAztecKing UnbornAztecKing commented Jan 25, 2025

Changelist

Full Node Streaming provides an initial state and streaming updates for positions, orders, prices and fills.

The subscription API admits an optional sub account ID filter, which is only applied to the initial positions and position changes.

The user would like to apply the sub account ID filter to the order messages, in addition to position messages.

The change will add boolean flags to the Websocket and GRPC streaming API's:

  • filterOrdersBySubaccountId boolean field for WS request (if not provided, default to False)
  • filter_orders_by_subaccount_id boolean field for StreamOrderbookUpdatesRequest protobuf (if not provided, default to False)

For all endpoints, the previous behavior of not filtering orders for subaccounts is preserved by default.

If filtering orders is not specified, the code path remains the same for looping over stream updates.
If filtering orders is specified, each slice of updates received from the subscription updatesChannel will be filtered like:

  • if the message is not a StreamUpdate_OrderbookUpdate, forward it
  • if the message is a StreamUpdate_OrderbookUpdate, forward iff one of the OffChainUpdateV1 messages inside are for a target subaccount
  • if an error occurs while checking ids, drop it

Test Plan

Unit test

load-tester / Python grpc-stream-client integration test

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added ability to filter orderbook updates by specific subaccount IDs
    • Enhanced streaming functionality to support more granular order update control
  • Improvements

    • Introduced new query parameter filterOrdersBySubaccountId for WebSocket and gRPC streaming
    • Improved streaming manager to handle subaccount-specific order filtering
  • Technical Updates

    • Updated protocol buffers and interfaces to support new filtering mechanism
    • Added utility functions for extracting and processing subaccount information

Copy link
Contributor

coderabbitai bot commented Jan 25, 2025

Walkthrough

This pull request introduces a new boolean parameter filterOrdersBySubaccountId across multiple components of the dYdX protocol's streaming and orderbook systems. The change enables more granular filtering of order updates by allowing clients to receive only updates related to specific subaccount IDs. The modification spans several files, including protobuf definitions, TypeScript generated code, Go streaming managers, and WebSocket server implementations, ensuring consistent support for this new filtering capability.

Changes

File Change Summary
proto/dydxprotocol/clob/query.proto Added filter_orders_by_subaccount_id field to StreamOrderbookUpdatesRequest
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts Added filterOrdersBySubaccountId property to TypeScript interfaces
protocol/streaming/... Updated multiple streaming-related files to support new filtering parameter

Sequence Diagram

sequenceDiagram
    participant Client
    participant WebSocketServer
    participant StreamingManager
    participant OrderbookService

    Client->>WebSocketServer: Connect with filterOrdersBySubaccountId
    WebSocketServer->>StreamingManager: Subscribe(subaccountIds, filterFlag)
    StreamingManager->>OrderbookService: Request Updates
    OrderbookService-->>StreamingManager: Filter Updates by Subaccount
    StreamingManager-->>WebSocketServer: Send Filtered Updates
    WebSocketServer-->>Client: Stream Filtered Updates
Loading

Possibly related PRs

Suggested reviewers

  • roy-dydx
  • teddyding

Poem

🐰 Orderbook streams, now refined and neat,
Subaccount filters make updates a treat!
With boolean magic, we filter with glee,
Precision unleashed, data flowing free!
A rabbit's delight in streaming's new art! 🎉

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@UnbornAztecKing
Copy link
Author

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jan 25, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
protocol/streaming/util/util_test.go (1)

21-123: Add test cases for error scenarios and edge cases.

The test suite thoroughly covers the happy path for different order types but lacks test cases for:

  • Invalid order IDs
  • Malformed orders
  • Empty/nil orders
  • Other error conditions that could occur in production

Add test cases for error scenarios:

 tests := map[string]struct {
     update ocutypes.OffChainUpdateV1
     id     satypes.SubaccountId
     err    error
 }{
+    "InvalidOrder": {
+        update: ocutypes.OffChainUpdateV1{
+            UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{
+                OrderPlace: &ocutypes.OrderPlaceV1{
+                    Order:           nil,
+                    PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED,
+                    TimeStamp:       _ToPtr(orderPlaceTime),
+                },
+            },
+        },
+        id:  satypes.SubaccountId{},
+        err: errors.New("order is nil"),
+    },
+    "MalformedOrderId": {
+        update: ocutypes.OffChainUpdateV1{
+            UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{
+                OrderPlace: &ocutypes.OrderPlaceV1{
+                    Order: &v1types.IndexerOrder{
+                        OrderId: v1types.IndexerOrderId{},
+                    },
+                    PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED,
+                    TimeStamp:       _ToPtr(orderPlaceTime),
+                },
+            },
+        },
+        id:  satypes.SubaccountId{},
+        err: errors.New("invalid order id"),
+    },
protocol/streaming/ws/websocket_server.go (1)

100-108: Consider adding logging for subscription initialization.

The error handling for subscription initialization could be improved by adding debug logs to help with troubleshooting.

Add logging:

 if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
     ws.logger.Error("Error sending close message", "err", err)
+    ws.logger.Debug(
+        "Failed to initialize subscription",
+        "filterOrdersBySubaccountId", filterOrdersBySubaccountId,
+        "subaccountIds", subaccountIds,
+    )
 }
protocol/streaming/full_node_streaming_manager_test.go (1)

300-456: Improve test case descriptions and organization.

While the test cases are comprehensive, their names and organization could be improved for better readability and maintainability.

Consider reorganizing the test cases into logical groups with more descriptive names:

 tests := map[string]TestCase{
-    "snapshotNotInScope": {
+    "Snapshots/NotInScope": {
         updates:         []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
         subaccountIds:   []satypes.SubaccountId{neverInScopeSubaccountId},
         filteredUpdates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
     },
-    "snapshotNoScope": {
+    "Snapshots/NoScope": {
         updates:         []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
         subaccountIds:   []satypes.SubaccountId{},
         filteredUpdates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
     },
     // ... more test cases

Also, consider adding test cases for concurrent access and race conditions:

"ConcurrentAccess/MultipleUpdates": {
    updates: []clobtypes.StreamUpdate{
        baseStreamUpdate,
        otherStreamUpdate,
    },
    subaccountIds: []satypes.SubaccountId{
        subaccountId,
        otherSubaccountId,
    },
    filteredUpdates: []clobtypes.StreamUpdate{
        baseStreamUpdate,
        otherStreamUpdate,
    },
},
protocol/streaming/full_node_streaming_manager.go (1)

220-248: Add metrics for filtered updates and error tracking.

The filtering logic would benefit from metrics to track:

  1. Number of updates filtered
  2. Number of errors encountered
  3. Filtering latency

Add metrics:

 func FilterStreamUpdateBySubaccount(
     updates []clobtypes.StreamUpdate,
     subaccountIds []satypes.SubaccountId,
     logger log.Logger,
 ) []clobtypes.StreamUpdate {
+    start := time.Now()
+    defer metrics.ModuleMeasureSince(
+        metrics.FullNodeGrpc,
+        metrics.GrpcFilterUpdatesBySubaccountLatency,
+        start,
+    )
+
     filteredUpdates := []clobtypes.StreamUpdate{}
+    errorCount := 0
     for _, update := range updates {
         switch updateMessage := update.UpdateMessage.(type) {
         case *clobtypes.StreamUpdate_OrderbookUpdate:
             if updateMessage.OrderbookUpdate.Snapshot {
                 break
             }
             doFilter, err := doFilterStreamUpdateBySubaccount(updateMessage, subaccountIds)
             if err != nil {
                 logger.Error(err.Error())
+                errorCount++
+                metrics.IncrCounter(metrics.GrpcFilterUpdatesBySubaccountErrorCount, 1)
             }
             if !doFilter {
                 continue
             }
         }
         filteredUpdates = append(filteredUpdates, update)
     }
+    metrics.SetGauge(metrics.GrpcFilteredUpdatesCount, float32(len(filteredUpdates)))
+    metrics.SetGauge(metrics.GrpcFilterErrorsCount, float32(errorCount))
     return filteredUpdates
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between afa37ec and 3bb3a07.

⛔ Files ignored due to path filters (1)
  • protocol/x/clob/types/query.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (10)
  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (6 hunks)
  • proto/dydxprotocol/clob/query.proto (1 hunks)
  • protocol/streaming/full_node_streaming_manager.go (7 hunks)
  • protocol/streaming/full_node_streaming_manager_test.go (1 hunks)
  • protocol/streaming/noop_streaming_manager.go (1 hunks)
  • protocol/streaming/types/interface.go (1 hunks)
  • protocol/streaming/util/util.go (2 hunks)
  • protocol/streaming/util/util_test.go (1 hunks)
  • protocol/streaming/ws/websocket_server.go (3 hunks)
  • protocol/x/clob/keeper/grpc_stream_orderbook.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: test-sim-after-import
  • GitHub Check: test-sim-nondeterminism
  • GitHub Check: test-sim-import-export
  • GitHub Check: test-sim-multi-seed-short
  • GitHub Check: test / run_command
  • GitHub Check: unit-end-to-end-and-integration
  • GitHub Check: test-coverage-upload
  • GitHub Check: container-tests
  • GitHub Check: benchmark
  • GitHub Check: test-race
  • GitHub Check: liveness-test
  • GitHub Check: Summary
🔇 Additional comments (9)
protocol/x/clob/keeper/grpc_stream_orderbook.go (1)

15-15: LGTM! Parameter addition aligns with filtering requirements.

The new parameter req.GetFilterOrdersBySubaccountId() is correctly passed to the Subscribe method, enabling order filtering by subaccount ID.

protocol/streaming/util/util.go (2)

8-10: LGTM! Required imports added.

The new imports support the subaccount ID extraction functionality.


27-46: LGTM! Well-structured subaccount ID extraction.

The function handles all order-related event types comprehensively with proper error handling. The comment on line 27 provides good context about potential future extensions.

Let's verify the test coverage for this new function:

✅ Verification successful

Test coverage is solid but could be more complete

The function has comprehensive test coverage for all order-related event types (Place, Remove, Update, Replace). Consider adding a test case for the default error scenario to achieve 100% coverage, although this is a minor enhancement since the error case is straightforward.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there are corresponding test cases for the new function
rg -A 5 "TestGetOffChainUpdateV1SubaccountId" protocol/streaming/util/

Length of output: 526


Script:

#!/bin/bash
# Fetch the complete test implementation
rg -A 50 "TestGetOffChainUpdateV1SubaccountId" protocol/streaming/util/util_test.go

Length of output: 1616


Script:

#!/bin/bash
# Fetch the entire test file content
cat protocol/streaming/util/util_test.go

Length of output: 3578

protocol/streaming/types/interface.go (1)

19-19: LGTM! Interface updated consistently.

The new boolean parameter is properly added to the Subscribe method signature.

Let's verify that all implementations of the interface have been updated:

✅ Verification successful

Interface implementations updated correctly

Both implementations of FullNodeStreamingManager have been properly updated with the new filterOrdersBySubAccountId parameter:

  • FullNodeStreamingManagerImpl: Main implementation with proper parameter usage
  • NoopGrpcStreamingManager: No-op implementation maintaining expected behavior
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find all implementations of FullNodeStreamingManager interface
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($_ $_) Subscribe(
  $$$
) (
  err error,
) {
  $$$
}'

Length of output: 107


Script:

#!/bin/bash
# First, find potential implementation files
echo "=== Files containing FullNodeStreamingManager ==="
rg "FullNodeStreamingManager" -l

echo -e "\n=== Struct implementations ==="
rg "type \w+StreamingManager struct"

echo -e "\n=== Subscribe method implementations ==="
rg "func.*Subscribe.*filterOrdersBySubaccountId" -A 2

Length of output: 1122


Script:

#!/bin/bash
echo "=== Full Node Implementation ==="
cat protocol/streaming/full_node_streaming_manager.go

echo -e "\n=== Noop Implementation ==="
cat protocol/streaming/noop_streaming_manager.go

Length of output: 46436

protocol/streaming/noop_streaming_manager.go (1)

27-27: LGTM! Noop implementation updated correctly.

The Subscribe method signature is updated to match the interface while maintaining its noop behavior.

proto/dydxprotocol/clob/query.proto (1)

189-193: Well documented boolean flag addition!

The new field filter_orders_by_subaccount_id is well documented and follows proto3 conventions. The documentation clearly explains that when true, only orders from provided subaccount IDs will be included in the updates.

indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (3)

281-287: LGTM - Interface field properly matches proto definition!

The TypeScript interface correctly reflects the proto definition with proper naming conventions and documentation.


303-309: LGTM - SDK type properly matches proto definition!

The SDK type interface correctly maintains the proto naming convention for external compatibility.


1315-1316: LGTM - Proper protobuf encoding/decoding implementation!

The implementation correctly:

  • Sets appropriate default value (false)
  • Handles encoding/decoding of the boolean field
  • Properly manages undefined/null cases in fromPartial

Also applies to: 1341-1345, 1388-1391, 1406-1406

@UnbornAztecKing UnbornAztecKing marked this pull request as ready for review January 27, 2025 15:13
@UnbornAztecKing UnbornAztecKing requested a review from a team as a code owner January 27, 2025 15:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

2 participants