Skip to content

Commit

Permalink
[CT-647] construct the initial orderbook snapshot (#1147)
Browse files Browse the repository at this point in the history
* [CT-647] construct the initial orderbook snapshot

* [CT-647] initialize new streams and send orderbook snapshot (#1152)

* [CT-647] initialize new streams and send orderbook snapshot

* use sync once

* comments
  • Loading branch information
jayy04 authored Mar 7, 2024
1 parent a6429da commit db3b189
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 8 deletions.
13 changes: 9 additions & 4 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 29 additions & 2 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -22,8 +23,14 @@ type GrpcStreamingManagerImpl struct {

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
// Initialize the subscription with orderbook snapshots.
initialize sync.Once

// Clob pair ids to subscribe to.
clobPairIds []uint32
srv clobtypes.Query_StreamOrderbookUpdatesServer

// Stream
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
Expand Down Expand Up @@ -68,6 +75,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -100,7 +108,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: false,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand All @@ -117,6 +125,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
sm.Lock()
defer sm.Unlock()

clobPairIds := make(map[uint32]bool)
for _, subscription := range sm.orderbookSubscriptions {
subscription.initialize.Do(
func() {
for _, clobPairId := range subscription.clobPairIds {
clobPairIds[clobPairId] = true
}
},
)
}

return lib.GetSortedKeys[lib.Sortable[uint32]](clobPairIds)
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ func (sm *NoopGrpcStreamingManager) Subscribe(

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}
6 changes: 5 additions & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,9 @@ type GrpcStreamingManager interface {
) (
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
GetUninitializedClobPairIds() []uint32
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
)
}
3 changes: 3 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func PrepareCheckState(
types.GetInternalOperationsQueueTextString(newLocalValidatorOperationsQueue),
)

// Initialize new GRPC streams with orderbook snapshots, if any.
keeper.InitializeNewGrpcStreams(ctx)

// Set per-orderbook gauges.
keeper.MemClob.SetMemclobGauges(ctx)
}
19 changes: 19 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,22 @@ func (k Keeper) InitMemStore(ctx sdk.Context) {
func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) {
k.antehandler = anteHandler
}

// InitializeNewGrpcStreams initializes new gRPC streams for all uninitialized clob pairs
// by sending the corresponding orderbook snapshots.
func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
streamingManager := k.GetGrpcStreamingManager()
allUpdates := types.NewOffchainUpdates()

uninitializedClobPairIds := streamingManager.GetUninitializedClobPairIds()
for _, clobPairId := range uninitializedClobPairIds {
update := k.MemClob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
types.ClobPairId(clobPairId),
)

allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
}
2 changes: 1 addition & 1 deletion protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ func (k Keeper) SendOffchainMessages(
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates)
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
88 changes: 88 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package memclob

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

// GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot.
// This is used by the gRPC streaming server to send the orderbook snapshot to the client.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
ctx sdk.Context,
clobPairId types.ClobPairId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()

if orderbook, exists := m.openOrders.orderbooksMap[clobPairId]; exists {
// Generate the offchain updates for buy orders.
// Updates are sorted in descending order of price.
buyPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Bids)
for i := len(buyPriceLevels) - 1; i >= 0; i-- {
subticks := buyPriceLevels[i]
level := orderbook.Bids[subticks]

// For each price level, generate offchain updates for each order in the level.
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
)
},
)
}

// Generate the offchain updates for sell orders.
// Updates are sorted in ascending order of price.
sellPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Asks)
for i := 0; i < len(sellPriceLevels); i++ {
subticks := sellPriceLevels[i]
level := orderbook.Asks[subticks]

// For each price level, generate offchain updates for each order in the level.
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
)
},
)
}
}

return offchainUpdates
}

// GetOffchainUpdatesForOrder returns a place order offchain message and
// a update order offchain message used to construct an order for
// the orderbook snapshot grpc stream.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(
ctx sdk.Context,
order types.Order,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()
orderId := order.OrderId

// Generate a order place message.
if message, success := off_chain_updates.CreateOrderPlaceMessage(
ctx,
order,
); success {
offchainUpdates.AddPlaceMessage(orderId, message)
}

// Get the current fill amount of the order.
fillAmount := m.GetOrderFilledAmount(ctx, orderId)

// Generate an update message updating the total filled amount of order.
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
fillAmount,
); success {
offchainUpdates.AddUpdateMessage(orderId, message)
}

return offchainUpdates
}
91 changes: 91 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package memclob

import (
"testing"

"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
sdktest "github.com/dydxprotocol/v4-chain/protocol/testutil/sdk"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {
ctx, _, _ := sdktest.NewSdkContextWithMultistore()

clobKeeper := &mocks.MemClobKeeper{}
clobKeeper.On(
"GetOrderFillAmount",
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)

memclob.CreateOrderbook(ctx, constants.ClobPair_Btc)

orders := []types.Order{
constants.Order_Alice_Num0_Id1_Clob0_Buy15_Price10_GTB18_PO,
constants.Order_Alice_Num0_Id0_Clob0_Buy10_Price10_GTB16,
constants.Order_Bob_Num0_Id12_Clob0_Buy5_Price40_GTB20,
}

for _, order := range orders {
memclob.mustAddOrderToOrderbook(ctx, order, false)
}

offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
constants.ClobPair_Btc.GetClobPairId(),
)

expected := types.NewOffchainUpdates()
// Buy orders are in descending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))

require.Equal(t, expected, offchainUpdates)
}

func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {
ctx, _, _ := sdktest.NewSdkContextWithMultistore()

clobKeeper := &mocks.MemClobKeeper{}
clobKeeper.On(
"GetOrderFillAmount",
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)

memclob.CreateOrderbook(ctx, constants.ClobPair_Btc)

orders := []types.Order{
constants.Order_Bob_Num0_Id12_Clob0_Sell20_Price35_GTB32,
constants.Order_Alice_Num0_Id0_Clob0_Sell5_Price10_GTB20,
constants.Order_Alice_Num0_Id1_Clob0_Sell15_Price10_GTB18_PO,
}

for _, order := range orders {
memclob.mustAddOrderToOrderbook(ctx, order, false)
}

offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
constants.ClobPair_Btc.GetClobPairId(),
)

expected := types.NewOffchainUpdates()
// Sell orders are in ascending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))

require.Equal(t, expected, offchainUpdates)
}
1 change: 1 addition & 0 deletions protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ type ClobKeeper interface {
clobPair ClobPair,
) error
UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error
InitializeNewGrpcStreams(ctx sdk.Context)
}
4 changes: 4 additions & 0 deletions protocol/x/clob/types/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ type MemClob interface {
subaccountId satypes.SubaccountId,
perpetualId uint32,
)
GetOffchainUpdatesForOrderbookSnapshot(
ctx sdk.Context,
clobPairId ClobPairId,
) (offchainUpdates *OffchainUpdates)
}

0 comments on commit db3b189

Please sign in to comment.