Skip to content

Commit

Permalink
test reusing subscription ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 18, 2024
1 parent b38d911 commit 31ca480
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct {

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
activeSubscriptionIds map[uint32]bool

// stream will batch and flush out messages every 10 ms.
ticker *time.Ticker
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewFullNodeStreamingManager(
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
activeSubscriptionIds: make(map[uint32]bool),

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
Expand Down Expand Up @@ -162,6 +162,18 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
}
}

// getNextAvailableSubscriptionId returns next available subscription id. Assumes the
// lock has been acquired.
func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 {
sm.logger.Info(fmt.Sprintf("getting next sub id, actives: %+v", sm.activeSubscriptionIds))
id := uint32(0)
for inUse := sm.activeSubscriptionIds[id]; inUse; {
id = id + 1
}
sm.logger.Info(fmt.Sprintf("done getting next sub id, id: %+v", id))
return id
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
Expand All @@ -180,8 +192,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
}

subscriptionId := sm.getNextAvailableSubscriptionId()

subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
Expand All @@ -196,7 +211,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}
for _, subaccountId := range sIds {
Expand All @@ -207,7 +222,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append(
sm.subaccountIdToSubscriptionIdMapping[subaccountId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}

Expand All @@ -220,7 +235,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
),
)
sm.orderbookSubscriptions[subscription.subscriptionId] = subscription
sm.nextSubscriptionId++
sm.activeSubscriptionIds[subscription.subscriptionId] = true
sm.logger.Info(fmt.Sprintf("updated active map: %+v", sm.activeSubscriptionIds))
sm.EmitMetrics()
sm.Unlock()

Expand Down Expand Up @@ -272,6 +288,10 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)
delete(sm.activeSubscriptionIds, subscriptionIdToRemove)
sm.logger.Info(
fmt.Sprintf("Removed subscription id %+v updated map %+v", subscriptionIdToRemove, sm.activeSubscriptionIds),
)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
Expand Down

0 comments on commit 31ca480

Please sign in to comment.