Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

De-dupe fills and trades from WebSocket. #413

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ allprojects {
}

group = "exchange.dydx.abacus"
version = "1.7.51"
version = "1.7.52"

repositories {
google()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package exchange.dydx.abacus.processor.base

/**
* Merge two lists of payloads, dropping older items if a new item with the same ID exists.
*/
fun mergeWithIds(
new: List<Any>,
existing: List<Any>,
id: (Any) -> String?,
): List<Any> {
val ids = mutableSetOf<String>()
val merged = mutableListOf<Any>()
new.forEach { item ->
id(item)?.let { itemId ->
ids.add(itemId)
merged.add(item)
}
}
existing.forEach { item ->
id(item)?.let { itemId ->
if (!ids.contains(itemId)) {
ids.add(itemId)
merged.add(item)
}
}
}

return merged
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exchange.dydx.abacus.processor.markets

import exchange.dydx.abacus.processor.base.BaseProcessor
import exchange.dydx.abacus.processor.base.mergeWithIds
import exchange.dydx.abacus.protocols.ParserProtocol

@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -40,18 +41,15 @@ internal class TradesProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
payload: List<Any>?,
): List<Any>? {
if (payload != null) {
val merged = mutableListOf<Any>()
for (value in payload) {
parser.asNativeMap(value)?.let {
val trade = tradeProcessor.received(null, it)
merged.add(trade)
}
}
if (existing?.isNotEmpty() == true) {
merged.addAll(existing)
val new = payload.mapNotNull { eachPayload ->
parser.asNativeMap(eachPayload)?.let { eachPayloadData -> tradeProcessor.received(null, eachPayloadData) }
}
val merged = existing?.let {
mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } }
} ?: new

return if (merged.size > LIMIT) {
merged.subList(0, LIMIT).toList()
merged.subList(0, LIMIT)
} else {
merged
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package exchange.dydx.abacus.processor.wallet.account

import exchange.dydx.abacus.processor.base.BaseProcessor
import exchange.dydx.abacus.processor.base.mergeWithIds
import exchange.dydx.abacus.protocols.ParserProtocol

internal class FillsProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
private val itemProcessor = FillProcessor(parser = parser)

override fun received(existing: List<Any>?, payload: List<Any>): List<Any>? {
val output = mutableListOf<Any>()
val newItems = payload.mapNotNull {
parser.asNativeMap(it)?.let { map ->
itemProcessor.received(null, map)
}
val new = payload.mapNotNull { eachPayload ->
parser.asNativeMap(eachPayload)?.let { eachPayloadData -> itemProcessor.received(null, eachPayloadData) }
}
if (newItems != null) {
output.addAll(newItems)
existing?.let {
return mergeWithIds(new, existing) { data -> parser.asNativeMap(data)?.let { parser.asString(it["id"]) } }
}
if (existing != null) {
output.addAll(existing)
}
return output
return new
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import exchange.dydx.abacus.tests.extensions.loadFillsReceived
import exchange.dydx.abacus.tests.extensions.loadMarketsChanged
import exchange.dydx.abacus.tests.extensions.loadOrderbook
import exchange.dydx.abacus.tests.extensions.loadOrderbookChanged
import exchange.dydx.abacus.tests.extensions.loadTrades
import exchange.dydx.abacus.tests.extensions.loadTradesChanged
import exchange.dydx.abacus.tests.extensions.log
import exchange.dydx.abacus.utils.ServerTime
import kotlin.test.Test
Expand All @@ -21,7 +19,6 @@ class V3PerpTests : V3BaseTests() {

testMarketsOnce()
testAccountsOnce()
testTradesOnce()
testOrderbookOnce()
testHistoricalFundingsOnce()
}
Expand Down Expand Up @@ -488,68 +485,6 @@ class V3PerpTests : V3BaseTests() {
)
}

private fun testTradesOnce() {
var time = ServerTime.now()
testTradesSubscribed()
time = perp.log("Trades Subscribed", time)

testTradesBatchChanged()
perp.log("Trades Changed", time)
}

private fun testTradesSubscribed() {
test(
{
perp.loadTrades(mock)
},
"""
{
"markets": {
"markets": {
"ETH-USD": {
"trades": [
{
"side": "BUY",
"price": 1656.2,
"size": 0.01,
"type": "LIQUIDATED",
"createdAt": "2022-08-01T16:58:12.989Z"
}
]
}
}
}
}
""".trimIndent(),
)
}

private fun testTradesBatchChanged() {
test(
{
perp.loadTradesChanged(mock)
},
"""
{
"markets": {
"markets": {
"ETH-USD": {
"trades": [
{
"side": "SELL",
"price": 1645.7,
"size": 24.243,
"createdAt": "2022-08-01T17:05:28.592Z"
}
]
}
}
}
}
""".trimIndent(),
)
}

private fun testHistoricalFundingsOnce() {
var time = ServerTime.now()
testHistoricalFundingsReceived()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package exchange.dydx.abacus.payload.v4

import exchange.dydx.abacus.tests.extensions.loadv4TradesChanged
import kotlin.test.Test

class V4DuplicateWebsocketMessageTests : V4BaseTests() {

@Test
fun testDuplicateFills() {
setup()

repeat(2) {
test(
{
perp.socket(
testWsUrl,
mock.batchedSubaccountsChannel.channel_batch_data_order_filled_1,
0,
null,
)
},
"""
{
"wallet": {
"account": {
"tradingRewards": {
"total": 2800.8
},
"subaccounts": {
"0": {
"equity": {
},
"freeCollateral": {
},
"quoteBalance": {
"current": 1599696.37
},
"orders": {
},
"fills":[
{
"id":"a74830f8-d506-54b3-bf3b-1de791b8fe4e",
"fee":"-0.067364",
"side":"BUY",
"size":"82",
"type":"LIMIT",
"price":"9.128",
"orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d",
"createdAt":"2024-05-06T18:41:20.606Z",
"liquidity":"MAKER",
"clientMetadata":"0",
"marketId":"APT-USD"
},
{
"id":"0d473eec-93b0-5c49-94ca-b8017454d769",
"fee":"-0.001643",
"side":"BUY",
"size":"2",
"type":"LIMIT",
"price":"9.128",
"orderId":"f7c9cd24-57cd-5240-a98d-3c9c3c11767d",
"createdAt":"2024-05-06T18:41:20.606Z",
"liquidity":"MAKER",
"clientMetadata":"0",
"marketId":"APT-USD"
}
]
}
}
}
}
}
""".trimIndent(),
)
}
}

@Test
fun testDuplicateTrades() {
setup()

repeat(2) {
test(
{
perp.loadv4TradesChanged(mock, testWsUrl)
},
"""
{
"markets":{
"markets":{
"ETH-USD":{
"trades": [
{
"id": "8ee6d90d-272d-5edd-bf0f-2e4d6ae3d3b7",
"side": "BUY",
"size": 1.593707,
"price": 1255.949,
"createdAt": "2022-12-12T02:28:14.859Z",
"resources": {
}
}
]
}
}
}
}
""".trimIndent(),
)
}
}
}
Loading
Loading