Skip to content

Commit

Permalink
[CT-723] add block number + stage to grpc updates (#1252)
Browse files Browse the repository at this point in the history
* [CT-723] add block number + stage to grpc updates

* add indexer changes
  • Loading branch information
jayy04 authored Mar 27, 2024
1 parent 9c25211 commit 9d1076f
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ export interface StreamOrderbookUpdatesResponse {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand All @@ -250,6 +259,15 @@ export interface StreamOrderbookUpdatesResponseSDKType {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
Expand Down Expand Up @@ -904,7 +922,9 @@ export const StreamOrderbookUpdatesRequest = {
function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false
snapshot: false,
blockHeight: 0,
execMode: 0
};
}

Expand All @@ -918,6 +938,14 @@ export const StreamOrderbookUpdatesResponse = {
writer.uint32(16).bool(message.snapshot);
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

Expand All @@ -938,6 +966,14 @@ export const StreamOrderbookUpdatesResponse = {
message.snapshot = reader.bool();
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -951,6 +987,8 @@ export const StreamOrderbookUpdatesResponse = {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand Down
7 changes: 7 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ message StreamOrderbookUpdatesResponse {
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 3;

// Exec mode of the updates.
uint32 exec_mode = 4;
}
6 changes: 6 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,8 @@ func (app *App) PreBlocker(ctx sdk.Context, _ *abci.RequestFinalizeBlock) (*sdk.

// BeginBlocker application updates every begin block
func (app *App) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) {
ctx = ctx.WithExecMode(lib.ExecModeBeginBlock)

// Update the proposer address in the logger for the panic logging middleware.
proposerAddr := sdk.ConsAddress(ctx.BlockHeader().ProposerAddress)
middleware.Logger = ctx.Logger().With("proposer_cons_addr", proposerAddr.String())
Expand All @@ -1693,6 +1695,8 @@ func (app *App) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) {

// EndBlocker application updates every end block
func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) {
ctx = ctx.WithExecMode(lib.ExecModeEndBlock)

// Reset the logger for middleware.
// Note that the middleware is only used by `CheckTx` and `DeliverTx`, and not `EndBlocker`.
// Panics from `EndBlocker` will not be logged by the middleware and will lead to consensus failures.
Expand All @@ -1716,6 +1720,8 @@ func (app *App) Precommitter(ctx sdk.Context) {

// PrepareCheckStater application updates after commit and before any check state is invoked.
func (app *App) PrepareCheckStater(ctx sdk.Context) {
ctx = ctx.WithExecMode(lib.ExecModePrepareCheckState)

if err := app.ModuleManager.PrepareCheckState(ctx); err != nil {
panic(err)
}
Expand Down
7 changes: 7 additions & 0 deletions protocol/lib/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/lib/log"
)

// Custom exec modes
const (
ExecModeBeginBlock = 100
ExecModeEndBlock = 101
ExecModePrepareCheckState = 102
)

type TxHash string

func GetTxHash(tx []byte) TxHash {
Expand Down
6 changes: 3 additions & 3 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"sync"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
Expand Down Expand Up @@ -76,6 +77,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -113,8 +116,10 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
if len(updatesToSend) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
Updates: updatesToSend,
Snapshot: snapshot,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpc

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand Down Expand Up @@ -29,6 +30,8 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
sdk "github.com/cosmos/cosmos-sdk/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

Expand All @@ -18,5 +19,7 @@ type GrpcStreamingManager interface {
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
)
}
1 change: 1 addition & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func PrepareCheckState(
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(allUpdates, false)
keeper.SendOrderbookUpdates(ctx, allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
Expand Down
10 changes: 8 additions & 2 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,23 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
allUpdates.Append(update)
}

k.SendOrderbookUpdates(allUpdates, true)
k.SendOrderbookUpdates(ctx, allUpdates, true)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
func (k Keeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
if len(offchainUpdates.Messages) == 0 {
return
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot)
k.GetGrpcStreamingManager().SendOrderbookUpdates(
offchainUpdates,
snapshot,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
2 changes: 1 addition & 1 deletion protocol/x/clob/keeper/order_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,6 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders(
allUpdates.Append(orderbookUpdate)
}
}
k.SendOrderbookUpdates(allUpdates, false)
k.SendOrderbookUpdates(ctx, allUpdates, false)
}
}
6 changes: 3 additions & 3 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook(
if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false)
}
}

Expand Down Expand Up @@ -1963,7 +1963,7 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder(
if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false)
}
}

Expand All @@ -1985,7 +1985,7 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder
// Send an orderbook update for the order's new total filled amount.
if m.generateOrderbookUpdates {
orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false)
}

// If the order is fully filled, remove it from the orderbook.
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type ClobKeeper interface {
// Gprc streaming
InitializeNewGrpcStreams(ctx sdk.Context)
SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *OffchainUpdates,
snapshot bool,
)
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type MemClobKeeper interface {
ctx sdk.Context,
) log.Logger
SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *OffchainUpdates,
snapshot bool,
)
Expand Down
Loading

0 comments on commit 9d1076f

Please sign in to comment.