Skip to content

Commit

Permalink
De-dupe fills and trades from WebSocket.
Browse files Browse the repository at this point in the history
  • Loading branch information
prashanDYDX committed Jun 4, 2024
1 parent 896b7d2 commit 43134ea
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package exchange.dydx.abacus.processor.base

/**
* Merge two lists of payloads, dropping older items if a new item with the same ID exists.
*/
fun mergeWithIds(
new: List<Any>,
existing: List<Any>,
id: (Any) -> String?,
): List<Any> {
val merged = mutableSetOf<ItemWithId>()
new.forEach { item ->
id(item)?.let { itemId ->
merged.add(
ItemWithId(
id = itemId,
item = item,
),
)
}
}
existing.forEach { item ->
id(item)?.let { itemId ->
merged.add(
ItemWithId(
id = itemId,
item = item,
),
)
}
}

return merged.map { it.item }
}

// Wrapper for de-duping in set
private data class ItemWithId(
val id: String,
val item: Any,
) {
override fun hashCode(): Int {
return id.hashCode()
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other == null || this::class != other::class) return false

other as ItemWithId

return id == other.id
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exchange.dydx.abacus.processor.markets

import exchange.dydx.abacus.processor.base.BaseProcessor
import exchange.dydx.abacus.processor.base.mergeWithIds
import exchange.dydx.abacus.protocols.ParserProtocol

@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -40,18 +41,15 @@ internal class TradesProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
payload: List<Any>?,
): List<Any>? {
if (payload != null) {
val merged = mutableListOf<Any>()
for (value in payload) {
parser.asNativeMap(value)?.let {
val trade = tradeProcessor.received(null, it)
merged.add(trade)
}
}
if (existing?.isNotEmpty() == true) {
merged.addAll(existing)
val new = payload.mapNotNull { eachPayload ->
parser.asNativeMap(eachPayload)?.let { eachPayloadData -> tradeProcessor.received(null, eachPayloadData) }
}
val merged = existing?.let {
mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } }
} ?: new

return if (merged.size > LIMIT) {
merged.subList(0, LIMIT).toList()
merged.subList(0, LIMIT)
} else {
merged
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package exchange.dydx.abacus.processor.wallet.account

import exchange.dydx.abacus.processor.base.BaseProcessor
import exchange.dydx.abacus.processor.base.mergeWithIds
import exchange.dydx.abacus.protocols.ParserProtocol

internal class FillsProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
private val itemProcessor = FillProcessor(parser = parser)

override fun received(existing: List<Any>?, payload: List<Any>): List<Any>? {
val output = mutableListOf<Any>()
val newItems = payload.mapNotNull {
parser.asNativeMap(it)?.let { map ->
itemProcessor.received(null, map)
}
val new = payload.mapNotNull { eachPayload ->
parser.asNativeMap(eachPayload)?.let { eachPayloadData -> itemProcessor.received(null, eachPayloadData) }
}
if (newItems != null) {
output.addAll(newItems)
existing?.let {
return mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } }
}
if (existing != null) {
output.addAll(existing)
}
return output
return new
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package exchange.dydx.abacus.payload.v4

import exchange.dydx.abacus.tests.extensions.loadv4TradesChanged
import kotlin.test.Test

class V4DuplicateWebsocketMessageTests : V4BaseTests() {

@Test
fun testDuplicateFills() {
setup()

repeat(2) {
test(
{
perp.socket(
testWsUrl,
mock.batchedSubaccountsChannel.channel_batch_data_order_filled_1,
0,
null,
)
},
"""
{
"wallet": {
"account": {
"tradingRewards": {
"total": 2800.8
},
"subaccounts": {
"0": {
"equity": {
},
"freeCollateral": {
},
"quoteBalance": {
"current": 1599696.37
},
"orders": {
},
"fills":[
{
"id":"a74830f8-d506-54b3-bf3b-1de791b8fe4e",
"fee":"-0.067364",
"side":"BUY",
"size":"82",
"type":"LIMIT",
"price":"9.128",
"orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d",
"createdAt":"2024-05-06T18:41:20.606Z",
"liquidity":"MAKER",
"clientMetadata":"0",
"marketId":"APT-USD"
},
{
"id":"0d473eec-93b0-5c49-94ca-b8017454d769",
"fee":"-0.001643",
"side":"BUY",
"size":"2",
"type":"LIMIT",
"price":"9.128",
"orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d",
"createdAt":"2024-05-06T18:41:20.606Z",
"liquidity":"MAKER",
"clientMetadata":"0",
"marketId":"APT-USD"
}
]
}
}
}
}
}
""".trimIndent(),
)
}
}

@Test
fun testDuplicateTrades() {
setup()

repeat(2) {
test(
{
perp.loadv4TradesChanged(mock, testWsUrl)
},
"""
{
"markets":{
"markets":{
"ETH-USD":{
"trades": [
{
"id": "8ee6d90d-272d-5edd-bf0f-2e4d6ae3d3b7",
"side": "BUY",
"size": 1.593707,
"price": 1255.949,
"createdAt": "2022-12-12T02:28:14.859Z",
"resources": {
}
}
]
}
}
}
}
""".trimIndent(),
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,7 @@ internal class TradesChannelMock {
"contents":{
"trades":[
{
"id": "8ee6d90d-272d-5edd-bf0f-2e4d6ae3d3b7",
"size":"1.593707",
"price":"1255.949",
"side":"BUY",
Expand Down

0 comments on commit 43134ea

Please sign in to comment.