From 43134ea42501242701dca54b0d773f32b9f9f039 Mon Sep 17 00:00:00 2001 From: Prashan Dharmasena Date: Tue, 4 Jun 2024 10:40:24 -0400 Subject: [PATCH] De-dupe fills and trades from WebSocket. --- .../processor/base/MergeWithIds.kt | 53 +++++++++ .../processor/markets/TradesProcessor.kt | 18 ++- .../wallet/account/FillsProcessor.kt | 17 +-- .../v4/V4DuplicateWebsocketMessageTests.kt | 111 ++++++++++++++++++ .../tests/payloads/TradesChannelMock.kt | 1 + 5 files changed, 179 insertions(+), 21 deletions(-) create mode 100644 src/commonMain/kotlin/exchange.dydx.abacus/processor/base/MergeWithIds.kt create mode 100644 src/commonTest/kotlin/exchange.dydx.abacus/payload/v4/V4DuplicateWebsocketMessageTests.kt diff --git a/src/commonMain/kotlin/exchange.dydx.abacus/processor/base/MergeWithIds.kt b/src/commonMain/kotlin/exchange.dydx.abacus/processor/base/MergeWithIds.kt new file mode 100644 index 000000000..6b4451af9 --- /dev/null +++ b/src/commonMain/kotlin/exchange.dydx.abacus/processor/base/MergeWithIds.kt @@ -0,0 +1,53 @@ +package exchange.dydx.abacus.processor.base + +/** + * Merge two lists of payloads, dropping older items if a new item with the same ID exists. + */ +fun mergeWithIds( + new: List, + existing: List, + id: (Any) -> String?, +): List { + val merged = mutableSetOf() + new.forEach { item -> + id(item)?.let { itemId -> + merged.add( + ItemWithId( + id = itemId, + item = item, + ), + ) + } + } + existing.forEach { item -> + id(item)?.let { itemId -> + merged.add( + ItemWithId( + id = itemId, + item = item, + ), + ) + } + } + + return merged.map { it.item } +} + +// Wrapper for de-duping in set +private data class ItemWithId( + val id: String, + val item: Any, +) { + override fun hashCode(): Int { + return id.hashCode() + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other == null || this::class != other::class) return false + + other as ItemWithId + + return id == other.id + } +} diff --git a/src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/TradesProcessor.kt b/src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/TradesProcessor.kt index 3a67205ba..8546f0a02 100644 --- a/src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/TradesProcessor.kt +++ b/src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/TradesProcessor.kt @@ -1,6 +1,7 @@ package exchange.dydx.abacus.processor.markets import exchange.dydx.abacus.processor.base.BaseProcessor +import exchange.dydx.abacus.processor.base.mergeWithIds import exchange.dydx.abacus.protocols.ParserProtocol @Suppress("UNCHECKED_CAST") @@ -40,18 +41,15 @@ internal class TradesProcessor(parser: ParserProtocol) : BaseProcessor(parser) { payload: List?, ): List? { if (payload != null) { - val merged = mutableListOf() - for (value in payload) { - parser.asNativeMap(value)?.let { - val trade = tradeProcessor.received(null, it) - merged.add(trade) - } - } - if (existing?.isNotEmpty() == true) { - merged.addAll(existing) + val new = payload.mapNotNull { eachPayload -> + parser.asNativeMap(eachPayload)?.let { eachPayloadData -> tradeProcessor.received(null, eachPayloadData) } } + val merged = existing?.let { + mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } } + } ?: new + return if (merged.size > LIMIT) { - merged.subList(0, LIMIT).toList() + merged.subList(0, LIMIT) } else { merged } diff --git a/src/commonMain/kotlin/exchange.dydx.abacus/processor/wallet/account/FillsProcessor.kt b/src/commonMain/kotlin/exchange.dydx.abacus/processor/wallet/account/FillsProcessor.kt index 945ef9bd6..8a53e7416 100644 --- a/src/commonMain/kotlin/exchange.dydx.abacus/processor/wallet/account/FillsProcessor.kt +++ b/src/commonMain/kotlin/exchange.dydx.abacus/processor/wallet/account/FillsProcessor.kt @@ -1,24 +1,19 @@ package exchange.dydx.abacus.processor.wallet.account import exchange.dydx.abacus.processor.base.BaseProcessor +import exchange.dydx.abacus.processor.base.mergeWithIds import exchange.dydx.abacus.protocols.ParserProtocol internal class FillsProcessor(parser: ParserProtocol) : BaseProcessor(parser) { private val itemProcessor = FillProcessor(parser = parser) override fun received(existing: List?, payload: List): List? { - val output = mutableListOf() - val newItems = payload.mapNotNull { - parser.asNativeMap(it)?.let { map -> - itemProcessor.received(null, map) - } + val new = payload.mapNotNull { eachPayload -> + parser.asNativeMap(eachPayload)?.let { eachPayloadData -> itemProcessor.received(null, eachPayloadData) } } - if (newItems != null) { - output.addAll(newItems) + existing?.let { + return mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } } } - if (existing != null) { - output.addAll(existing) - } - return output + return new } } diff --git a/src/commonTest/kotlin/exchange.dydx.abacus/payload/v4/V4DuplicateWebsocketMessageTests.kt b/src/commonTest/kotlin/exchange.dydx.abacus/payload/v4/V4DuplicateWebsocketMessageTests.kt new file mode 100644 index 000000000..fdc563038 --- /dev/null +++ b/src/commonTest/kotlin/exchange.dydx.abacus/payload/v4/V4DuplicateWebsocketMessageTests.kt @@ -0,0 +1,111 @@ +package exchange.dydx.abacus.payload.v4 + +import exchange.dydx.abacus.tests.extensions.loadv4TradesChanged +import kotlin.test.Test + +class V4DuplicateWebsocketMessageTests : V4BaseTests() { + + @Test + fun testDuplicateFills() { + setup() + + repeat(2) { + test( + { + perp.socket( + testWsUrl, + mock.batchedSubaccountsChannel.channel_batch_data_order_filled_1, + 0, + null, + ) + }, + """ + { + "wallet": { + "account": { + "tradingRewards": { + "total": 2800.8 + }, + "subaccounts": { + "0": { + "equity": { + }, + "freeCollateral": { + }, + "quoteBalance": { + "current": 1599696.37 + }, + "orders": { + }, + "fills":[ + { + "id":"a74830f8-d506-54b3-bf3b-1de791b8fe4e", + "fee":"-0.067364", + "side":"BUY", + "size":"82", + "type":"LIMIT", + "price":"9.128", + "orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d", + "createdAt":"2024-05-06T18:41:20.606Z", + "liquidity":"MAKER", + "clientMetadata":"0", + "marketId":"APT-USD" + }, + { + "id":"0d473eec-93b0-5c49-94ca-b8017454d769", + "fee":"-0.001643", + "side":"BUY", + "size":"2", + "type":"LIMIT", + "price":"9.128", + "orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d", + "createdAt":"2024-05-06T18:41:20.606Z", + "liquidity":"MAKER", + "clientMetadata":"0", + "marketId":"APT-USD" + } + ] + } + } + } + } + } + """.trimIndent(), + ) + } + } + + @Test + fun testDuplicateTrades() { + setup() + + repeat(2) { + test( + { + perp.loadv4TradesChanged(mock, testWsUrl) + }, + """ + { + "markets":{ + "markets":{ + "ETH-USD":{ + "trades": [ + { + "id": "8ee6d90d-272d-5edd-bf0f-2e4d6ae3d3b7", + "side": "BUY", + "size": 1.593707, + "price": 1255.949, + "createdAt": "2022-12-12T02:28:14.859Z", + "resources": { + } + } + ] + } + } + } + } + """.trimIndent(), + ) + } + } +} diff --git a/src/commonTest/kotlin/exchange.dydx.abacus/tests/payloads/TradesChannelMock.kt b/src/commonTest/kotlin/exchange.dydx.abacus/tests/payloads/TradesChannelMock.kt index ef36b5cb9..fce16ade2 100644 --- a/src/commonTest/kotlin/exchange.dydx.abacus/tests/payloads/TradesChannelMock.kt +++ b/src/commonTest/kotlin/exchange.dydx.abacus/tests/payloads/TradesChannelMock.kt @@ -1468,6 +1468,7 @@ internal class TradesChannelMock { "contents":{ "trades":[ { + "id": "8ee6d90d-272d-5edd-bf0f-2e4d6ae3d3b7", "size":"1.593707", "price":"1255.949", "side":"BUY",