Skip to content

Commit

Permalink
De-dupe fills and trades from WebSocket. (#413)
Browse files Browse the repository at this point in the history
Co-authored-by: mobile-build-bot-git <[email protected]>
  • Loading branch information
prashanDYDX and mobile-build-bot authored Jun 5, 2024
1 parent 896b7d2 commit ef33a43
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 87 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ allprojects {
}

group = "exchange.dydx.abacus"
version = "1.7.51"
version = "1.7.52"

repositories {
google()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package exchange.dydx.abacus.processor.base

/**
* Merge two lists of payloads, dropping older items if a new item with the same ID exists.
*/
fun mergeWithIds(
new: List<Any>,
existing: List<Any>,
id: (Any) -> String?,
): List<Any> {
val ids = mutableSetOf<String>()
val merged = mutableListOf<Any>()
new.forEach { item ->
id(item)?.let { itemId ->
ids.add(itemId)
merged.add(item)
}
}
existing.forEach { item ->
id(item)?.let { itemId ->
if (!ids.contains(itemId)) {
ids.add(itemId)
merged.add(item)
}
}
}

return merged
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exchange.dydx.abacus.processor.markets

import exchange.dydx.abacus.processor.base.BaseProcessor
import exchange.dydx.abacus.processor.base.mergeWithIds
import exchange.dydx.abacus.protocols.ParserProtocol

@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -40,18 +41,15 @@ internal class TradesProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
payload: List<Any>?,
): List<Any>? {
if (payload != null) {
val merged = mutableListOf<Any>()
for (value in payload) {
parser.asNativeMap(value)?.let {
val trade = tradeProcessor.received(null, it)
merged.add(trade)
}
}
if (existing?.isNotEmpty() == true) {
merged.addAll(existing)
val new = payload.mapNotNull { eachPayload ->
parser.asNativeMap(eachPayload)?.let { eachPayloadData -> tradeProcessor.received(null, eachPayloadData) }
}
val merged = existing?.let {
mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } }
} ?: new

return if (merged.size > LIMIT) {
merged.subList(0, LIMIT).toList()
merged.subList(0, LIMIT)
} else {
merged
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package exchange.dydx.abacus.processor.wallet.account

import exchange.dydx.abacus.processor.base.BaseProcessor
import exchange.dydx.abacus.processor.base.mergeWithIds
import exchange.dydx.abacus.protocols.ParserProtocol

internal class FillsProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
private val itemProcessor = FillProcessor(parser = parser)

override fun received(existing: List<Any>?, payload: List<Any>): List<Any>? {
val output = mutableListOf<Any>()
val newItems = payload.mapNotNull {
parser.asNativeMap(it)?.let { map ->
itemProcessor.received(null, map)
}
val new = payload.mapNotNull { eachPayload ->
parser.asNativeMap(eachPayload)?.let { eachPayloadData -> itemProcessor.received(null, eachPayloadData) }
}
if (newItems != null) {
output.addAll(newItems)
existing?.let {
return mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } }
}
if (existing != null) {
output.addAll(existing)
}
return output
return new
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import exchange.dydx.abacus.tests.extensions.loadFillsReceived
import exchange.dydx.abacus.tests.extensions.loadMarketsChanged
import exchange.dydx.abacus.tests.extensions.loadOrderbook
import exchange.dydx.abacus.tests.extensions.loadOrderbookChanged
import exchange.dydx.abacus.tests.extensions.loadTrades
import exchange.dydx.abacus.tests.extensions.loadTradesChanged
import exchange.dydx.abacus.tests.extensions.log
import exchange.dydx.abacus.utils.ServerTime
import kotlin.test.Test
Expand All @@ -21,7 +19,6 @@ class V3PerpTests : V3BaseTests() {

testMarketsOnce()
testAccountsOnce()
testTradesOnce()
testOrderbookOnce()
testHistoricalFundingsOnce()
}
Expand Down Expand Up @@ -488,68 +485,6 @@ class V3PerpTests : V3BaseTests() {
)
}

private fun testTradesOnce() {
var time = ServerTime.now()
testTradesSubscribed()
time = perp.log("Trades Subscribed", time)

testTradesBatchChanged()
perp.log("Trades Changed", time)
}

private fun testTradesSubscribed() {
test(
{
perp.loadTrades(mock)
},
"""
{
"markets": {
"markets": {
"ETH-USD": {
"trades": [
{
"side": "BUY",
"price": 1656.2,
"size": 0.01,
"type": "LIQUIDATED",
"createdAt": "2022-08-01T16:58:12.989Z"
}
]
}
}
}
}
""".trimIndent(),
)
}

private fun testTradesBatchChanged() {
test(
{
perp.loadTradesChanged(mock)
},
"""
{
"markets": {
"markets": {
"ETH-USD": {
"trades": [
{
"side": "SELL",
"price": 1645.7,
"size": 24.243,
"createdAt": "2022-08-01T17:05:28.592Z"
}
]
}
}
}
}
""".trimIndent(),
)
}

private fun testHistoricalFundingsOnce() {
var time = ServerTime.now()
testHistoricalFundingsReceived()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package exchange.dydx.abacus.payload.v4

import exchange.dydx.abacus.tests.extensions.loadv4TradesChanged
import kotlin.test.Test

class V4DuplicateWebsocketMessageTests : V4BaseTests() {

@Test
fun testDuplicateFills() {
setup()

repeat(2) {
test(
{
perp.socket(
testWsUrl,
mock.batchedSubaccountsChannel.channel_batch_data_order_filled_1,
0,
null,
)
},
"""
{
"wallet": {
"account": {
"tradingRewards": {
"total": 2800.8
},
"subaccounts": {
"0": {
"equity": {
},
"freeCollateral": {
},
"quoteBalance": {
"current": 1599696.37
},
"orders": {
},
"fills":[
{
"id":"a74830f8-d506-54b3-bf3b-1de791b8fe4e",
"fee":"-0.067364",
"side":"BUY",
"size":"82",
"type":"LIMIT",
"price":"9.128",
"orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d",
"createdAt":"2024-05-06T18:41:20.606Z",
"liquidity":"MAKER",
"clientMetadata":"0",
"marketId":"APT-USD"
},
{
"id":"0d473eec-93b0-5c49-94ca-b8017454d769",
"fee":"-0.001643",
"side":"BUY",
"size":"2",
"type":"LIMIT",
"price":"9.128",
"orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d",
"createdAt":"2024-05-06T18:41:20.606Z",
"liquidity":"MAKER",
"clientMetadata":"0",
"marketId":"APT-USD"
}
]
}
}
}
}
}
""".trimIndent(),
)
}
}

@Test
fun testDuplicateTrades() {
setup()

repeat(2) {
test(
{
perp.loadv4TradesChanged(mock, testWsUrl)
},
"""
{
"markets":{
"markets":{
"ETH-USD":{
"trades": [
{
"id": "8ee6d90d-272d-5edd-bf0f-2e4d6ae3d3b7",
"side": "BUY",
"size": 1.593707,
"price": 1255.949,
"createdAt": "2022-12-12T02:28:14.859Z",
"resources": {
}
}
]
}
}
}
}
""".trimIndent(),
)
}
}
}
Loading

0 comments on commit ef33a43

Please sign in to comment.