Skip to content

Commit

Permalink
DEX-307 merge 0.16.x to 0.17.x (#2378)
Browse files Browse the repository at this point in the history
* NODE-1550: Added test for fixed error message (#2191)

https://wavesplatform.atlassian.net/browse/NODE-1550

* DEX-299 HTTP 500 error during base58 parsing (#2213)

Also fixed MatcherActorSpecification - create an order book when place order - new order book

* NODE-1572: Block appender metrics (#2181)

* DEX-296 Promise already completed issue (#2229)

* DEX-295 Don't schedule expiration during recovery process (#2182)

* DEX-295 Don't schedule expiration during recovery process

* AddressActor is notified about its orders after last event processed;
* Implemented a general pattern for waiting responses from multiple workers: WatchDistributedCompletionActor;
* Unit tests;

* DEX-295 Don't schedule expiration during recovery process

* OrderBookActor - notifyAddresses as a constructor's parameter;

* DEX-304 Order books' snapshots are created too often (#2242)

* Fixed a bug with next snapshot's offset calculation;
* Removing old snapshots of Order book when the new one is created;

* DEX-305 Starting issues (#2247)

* DEX-305 Starting issues

* DEX-253 For reversed pairs URLs with suffix returns incorrect redirect response (#2260)

DEX-253 For reversed pairs URLs with suffix returns incorrect redirect response

*  DEX-308 Storage for MatcherActor's known pairs (#2257)

DEX-308 Storage for MatcherActor's known pairs

* DEX-312 Notify about orders, but put off expiration schedules (#2279)

* DEX-313 Ability to blacklist an asset, but allow a pair with this asset (#2298)

* whitelist functionality is decoupled from "allowed-asset-pairs";
* "white-list-only" setting (see the application.conf for further information);
* updated unit tests;

* DEX-318 REST API pair validation doesn't respect allowed-asset-pairs and white-list-only (#2305)

Also removed blacklisting checks during order book deletion.

* DEX-313 OrderBook can't be initialized (#2312)

* DEX-313 OrderBook can't be initialized - an additional fix (#2314)

* DEX-315 Can't create an order book after deletion (#2330)

* DEX-314 Hide blacklisted assets in market list (#2321)

Also removed pair validation from OrderValidator. Now it's in AssetPairBuilder

* DEX-311 Can't get an old order's status (#2351)

* DEX-314 OrderBook snapshot creation improvements (#2309)

New:
* OrderBookActor detects that only an offset was changed and won't waste IO for snapshot update;
* OrderBookActor: saving the last trade information in snapshot to recover it after restart;
* MigrationTool: ob-migrate to migrate order books' snapshots;
* OrderBookSnapshotDB writes OrderBook's offset and snapshot to the database. Has an "in-memory" implementation;
* OrderBookSnapshotStoreActor is an actor to handle all work with OrderBookSnapshotDB;
* OrderBook.Snapshot serialization and deserialization in a binary format;

Updated:
* WorkingStash - simpler interface without "sender" in "stash";
* MatcherActor - removed childrenNames, because a terminated actor has the name, thus an asset pair to;

Tests:
* The right implementation for RestartableActor, that supports context.become;
* Updated unit tests;

* DEX-319 MatcherActor back-pressure (#2361)

* KafkaMatcherQueue, LocalMatcherQueue support backpressure;
* LocalMatcherQueue correctly reports the number of processed events;

MatcherQueue:
* startConsume.process must return Future[Unit] to know that all previous requests were processed to continue processing;
* Added lastProcessedOffset;
  • Loading branch information
vsuharnikov authored and koloale committed Jun 21, 2019
1 parent b3c218d commit 8b568b7
Show file tree
Hide file tree
Showing 65 changed files with 1,748 additions and 766 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,33 @@ object AsyncMatcherHttpApi extends Assertions {
.as[MatcherStatusResponse]
}

def orderStatusExpectInvalidAssetId(orderId: String, assetPair: AssetPair, assetId: String): Future[Boolean] = {
def orderStatusExpectInvalidAssetId(orderId: String, assetPair: AssetPair, assetId: String): Future[Boolean] =
orderStatusExpectInvalidAssetId(orderId, assetPair, assetId, _.message == s"Invalid Asset ID: $assetId")

def orderStatusExpectInvalidAssetId(orderId: String,
assetPair: AssetPair,
assetId: String,
pred: MessageMatcherResponse => Boolean): Future[Boolean] = {
matcherGet(s"/matcher/orderbook/${assetPair.toUri}/$orderId") transform {
case Failure(UnexpectedStatusCodeException(_, _, 404, responseBody)) =>
Try(parse(responseBody).as[MessageMatcherResponse]) match {
case Success(mr) if mr.message == s"Invalid Asset ID: $assetId" => Success(true)
case Failure(f) => Failure(new RuntimeException(s"Failed to parse response: $f"))
case Success(mr) if pred(mr) => Success(true)
case Failure(f) => Failure(new RuntimeException(s"Failed to parse response: $f"))
}
case Success(r) => Failure(new RuntimeException(s"Unexpected matcher response: (${r.getStatusCode}) ${r.getResponseBody}"))
case _ => Failure(new RuntimeException(s"Unexpected failure from matcher"))
}
}

def orderBookExpectInvalidAssetId(assetPair: AssetPair, assetId: String): Future[Boolean] =
orderBookExpectInvalidAssetId(assetPair, assetId, _.message == s"Invalid Asset ID: $assetId")

def orderBookExpectInvalidAssetId(assetPair: AssetPair, assetId: String, pred: MessageMatcherResponse => Boolean): Future[Boolean] =
matcherGet(s"/matcher/orderbook/${assetPair.toUri}") transform {
case Failure(UnexpectedStatusCodeException(_, _, 404, responseBody)) =>
Try(parse(responseBody).as[MessageMatcherResponse]) match {
case Success(mr) if mr.message == s"Invalid Asset ID: $assetId" => Success(true)
case Failure(f) => Failure(new RuntimeException(s"Failed to parse response: $f"))
case Success(mr) if pred(mr) => Success(true)
case Failure(f) => Failure(new RuntimeException(s"Failed to parse response: $f"))
}
case Success(r) => Failure(new RuntimeException(s"Unexpected matcher response: (${r.getStatusCode}) ${r.getResponseBody}"))
case _ => Failure(new RuntimeException(s"Unexpected failure from matcher"))
Expand Down Expand Up @@ -301,10 +310,9 @@ object AsyncMatcherHttpApi extends Assertions {
for {
offset <- matcherNode.getCurrentOffset
snapshots <- matcherNode.getAllSnapshotOffsets
orderBooks <- Future.traverse(assetPairs)(x => matcherNode.orderBook(x).map(r => x -> r))
orderBooks <- Future.traverse(assetPairs)(x => matcherNode.orderBook(x).zip(matcherNode.marketStatus(x)).map(r => x -> r))
orderStatuses <- Future.traverse(orders)(x => matcherNode.orderStatus(x.idStr(), x.assetPair).map(r => x.idStr() -> r))
reservedBalances <- Future.traverse(accounts)(x => matcherNode.reservedBalance(x).map(r => x -> r))

accountsOrderHistory = accounts.flatMap(a => assetPairs.map(p => a -> p))
orderHistory <- Future.traverse(accountsOrderHistory) {
case (account, pair) => matcherNode.orderHistoryByPair(account, pair).map(r => (account, pair, r))
Expand Down Expand Up @@ -332,7 +340,7 @@ object AsyncMatcherHttpApi extends Assertions {
}

private def clean(x: MatcherState): MatcherState = x.copy(
orderBooks = x.orderBooks.map { case (k, v) => k -> v.copy(timestamp = 0L) }
orderBooks = x.orderBooks.map { case (k, v) => k -> v.copy(_1 = v._1.copy(timestamp = 0L)) }
)

def upsertRate(asset: Asset, rate: Double, expectedStatusCode: Int): Future[RatesResponse] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.wavesplatform.transaction.assets.exchange.AssetPair

case class MatcherState(offset: QueueEventWithMeta.Offset,
snapshots: Map[String, QueueEventWithMeta.Offset],
orderBooks: Map[AssetPair, OrderBookResponse],
orderBooks: Map[AssetPair, (OrderBookResponse, MarketStatusResponse)],
orderStatuses: Map[String, MatcherStatusResponse],
reservedBalances: Map[KeyPair, Map[String, Long]],
orderHistory: Map[KeyPair, Map[AssetPair, Seq[OrderbookHistory]]])
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ object SyncMatcherHttpApi extends Assertions {
def orderBookExpectInvalidAssetId(assetPair: AssetPair, assetId: String): Boolean =
Await.result(async(m).orderBookExpectInvalidAssetId(assetPair, assetId), OrderRequestAwaitTime)

def orderBookExpectInvalidAssetId(assetPair: AssetPair, assetId: String, pred: MessageMatcherResponse => Boolean): Boolean =
Await.result(async(m).orderBookExpectInvalidAssetId(assetPair, assetId, pred), OrderRequestAwaitTime)

def orderStatusExpectInvalidAssetId(orderId: String, assetPair: AssetPair, assetId: String): Boolean =
Await.result(async(m).orderStatusExpectInvalidAssetId(orderId, assetPair, assetId), OrderRequestAwaitTime)

def orderStatusExpectInvalidAssetId(orderId: String, assetPair: AssetPair, assetId: String, pred: MessageMatcherResponse => Boolean): Boolean =
Await.result(async(m).orderStatusExpectInvalidAssetId(orderId, assetPair, assetId, pred), OrderRequestAwaitTime)

def marketStatus(assetPair: AssetPair): MarketStatusResponse =
sync(async(m).marketStatus(assetPair), RequestAwaitTime)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ class BlacklistedTradingTestSuite extends MatcherSuiteBase with GivenWhenThen {

override protected def beforeAll(): Unit = {
super.beforeAll()
val xs = Seq(IssueUsdTx, IssueWctTx, IssueEthTx, IssueBtcTx).map(createSignedIssueRequest).map(node.signedIssue)
xs.foreach(tx => node.waitForTransaction(tx.id))
node.waitForHeight(node.height + 1)
val xs = Seq(IssueUsdTx, IssueWctTx, IssueEthTx, IssueBtcTx).map(_.json()).map(node.broadcastRequest(_))
val height = xs.map(tx => node.waitForTransaction(tx.id).height).max
node.waitForHeight(height + 1)
}

"When blacklists are empty" in {
val (dec2, dec8) = (1000L, 1000000000L)

Then("Place some orders")
val usdOrder = node.placeOrder(alice, wavesUsdPair, BUY, dec8, dec2, matcherFee).message.id
val wctOrder = node.placeOrder(alice, wctWavesPair, BUY, dec2, dec8, matcherFee).message.id
val ethOrder = node.placeOrder(alice, ethWavesPair, SELL, dec8, dec8, matcherFee).message.id
val btcOrder = node.placeOrder(bob, wavesBtcPair, SELL, dec8, dec8, matcherFee).message.id
node.waitOrderStatus(wctWavesPair, btcOrder, "Accepted")
val usdOrder = node.placeOrder(alice, wavesUsdPair, BUY, dec8, dec2, matcherFee).message.id
val wctOrder = node.placeOrder(alice, wctWavesPair, BUY, dec2, dec8, matcherFee).message.id
val ethOrder = node.placeOrder(alice, ethWavesPair, SELL, dec8, dec8, matcherFee).message.id
val btcOrder1 = node.placeOrder(bob, wavesBtcPair, SELL, dec8, dec8, matcherFee).message.id
node.waitOrderStatus(wctWavesPair, btcOrder1, "Accepted")

Then("We blacklist some assets and addresses and restart the node")
docker.restartNode(
Expand All @@ -43,21 +43,21 @@ class BlacklistedTradingTestSuite extends MatcherSuiteBase with GivenWhenThen {
)

Then("orders for blacklisted assets are not available and new orders can't be placed")
node.orderStatusExpectInvalidAssetId(wctOrder, wctWavesPair, WctId.toString) //
node.orderStatusExpectInvalidAssetId(ethOrder, ethWavesPair, EthId.toString)
node.orderStatusExpectInvalidAssetId(wctOrder, wctWavesPair, WctId.toString, _.message.contains("is blacklisted"))
node.orderStatusExpectInvalidAssetId(ethOrder, ethWavesPair, EthId.toString, _.message.contains("is blacklisted"))
node.expectRejectedOrderPlacement(alice, wctWavesPair, BUY, dec2, dec8)
node.expectRejectedOrderPlacement(alice, ethWavesPair, SELL, dec8, dec8)
node.expectRejectedOrderPlacement(bob, wavesBtcPair, SELL, dec8, dec8)

And("orders of blacklisted address are still available")
node.orderStatus(btcOrder, wavesBtcPair).status shouldBe "Accepted"
node.orderStatus(btcOrder1, wavesBtcPair).status shouldBe "Accepted"

And("orders for other assets are still available")
node.orderStatus(usdOrder, wavesUsdPair).status shouldBe "Accepted"

And("OrderBook for blacklisted assets is not available")
node.orderBookExpectInvalidAssetId(wctWavesPair, WctId.toString)
node.orderBookExpectInvalidAssetId(ethWavesPair, EthId.toString)
node.orderBookExpectInvalidAssetId(wctWavesPair, WctId.toString, _.message.contains("is blacklisted"))
node.orderBookExpectInvalidAssetId(ethWavesPair, EthId.toString, _.message.contains("is blacklisted"))
node.orderBook(wavesBtcPair).asks.size shouldBe 1

And("OrderHistory returns info about all orders")
Expand All @@ -76,6 +76,10 @@ class BlacklistedTradingTestSuite extends MatcherSuiteBase with GivenWhenThen {
And("orders for other assets are still available")
node.orderStatus(usdOrder, wavesUsdPair).status shouldBe "Accepted"

And("order can be placed on allowed pair with blacklisted asset")
val btcOrder2 = node.placeOrder(alice, wavesBtcPair, SELL, dec8, dec8, matcherFee).message.id
node.waitOrderStatus(wavesBtcPair, btcOrder2, "Accepted")

And("now if all blacklists are cleared")
docker.restartNode(node, configWithBlacklisted())

Expand All @@ -90,8 +94,8 @@ class BlacklistedTradingTestSuite extends MatcherSuiteBase with GivenWhenThen {
And("new orders can be placed")
val newWctOrder = node.placeOrder(alice, wctWavesPair, BUY, dec2, dec8, matcherFee).message.id
val newEthOrder = node.placeOrder(alice, ethWavesPair, SELL, dec8, dec8, matcherFee).message.id
val newBtcOrder = node.placeOrder(bob, wavesBtcPair, SELL, dec8, dec8, matcherFee).message.id
node.waitOrderStatus(wctWavesPair, newBtcOrder, "Accepted")
val btcOrder3 = node.placeOrder(bob, wavesBtcPair, SELL, dec8, dec8, matcherFee).message.id
node.waitOrderStatus(wctWavesPair, btcOrder3, "Accepted")
node.orderStatus(newWctOrder, wctWavesPair).status shouldBe "Accepted"
node.orderStatus(newEthOrder, ethWavesPair).status shouldBe "Accepted"
}
Expand All @@ -100,13 +104,18 @@ class BlacklistedTradingTestSuite extends MatcherSuiteBase with GivenWhenThen {

object BlacklistedTradingTestSuite {

def configWithBlacklisted(assets: Array[String] = Array(), names: Array[String] = Array(), addresses: Array[String] = Array()): Config = {
def configWithBlacklisted(assets: Array[String] = Array.empty,
names: Array[String] = Array.empty,
addresses: Array[String] = Array.empty,
allowedAssetPairs: Array[String] = Array.empty): Config = {
def toStr(array: Array[String]): String = if (array.length == 0) "" else array.mkString("\"", "\", \"", "\"")
parseString(s"""
|waves.matcher {
| blacklisted-assets = [${toStr(assets)}]
| blacklisted-names = [${toStr(names)}]
| blacklisted-addresses = [${toStr(addresses)}]
| allowed-asset-pairs = [${toStr(allowedAssetPairs)}]
| white-list-only = no
|}
""".stripMargin)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.wavesplatform.it.sync

import com.typesafe.config.{Config, ConfigFactory}
import com.wavesplatform.it.api.MatcherState

class MarketStatusRecoveryTestSuite extends MatcherRecoveryTestSuite {
// To create a snapshot for each event at least for one order book
protected override def configOverrides: Config =
ConfigFactory.parseString("waves.matcher.snapshots-interval = 2").withFallback(super.configOverrides)

override protected def cleanState(state: MatcherState): MatcherState = state.copy(snapshots = Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ import com.wavesplatform.it.api.SyncMatcherHttpApi._
import com.wavesplatform.it.api.{MatcherCommand, MatcherState}
import com.wavesplatform.it.sync.config.MatcherPriceAssetConfig._
import com.wavesplatform.matcher.queue.QueueEventWithMeta
import com.wavesplatform.transaction.Asset.Waves
import com.wavesplatform.transaction.assets.exchange.{AssetPair, Order}
import com.wavesplatform.transaction.assets.exchange.Order
import org.scalacheck.Gen

import scala.concurrent.duration.DurationInt
import scala.util.Random

class MatcherRecoveryTestSuite extends MatcherSuiteBase {
private def configOverrides = ConfigFactory.parseString("""waves.matcher {
| price-assets = ["WAVES"]
protected def configOverrides: Config = ConfigFactory.parseString("""waves.matcher {
| snapshots-interval = 51
|}""".stripMargin)

Expand All @@ -25,18 +23,9 @@ class MatcherRecoveryTestSuite extends MatcherSuiteBase {
private val placesNumber = 200
private val cancelsNumber = placesNumber / 10

private val (issue1, issue2, assetPair1) = issueAssetPair(alice, 8, 8)
private val assetPair2 = AssetPair(assetPair1.amountAsset, Waves)
private val assetPair3 = AssetPair(assetPair1.priceAsset, Waves)
private val assetPairs = Seq(assetPair1, assetPair2, assetPair3)

{
val xs = Seq(issue1, issue2).map(_.json()).map(node.broadcastRequest(_))
xs.foreach(x => node.waitForTransaction(x.id))
}

private val orders = Gen.containerOfN[Vector, Order](placesNumber, orderGen(matcher, alice, assetPairs)).sample.get
private val lastOrder = orderGen(matcher, alice, assetPairs).sample.get
private val assetPairs = Seq(ethUsdPair, wavesUsdPair, ethWavesPair)
private val orders = Gen.containerOfN[Vector, Order](placesNumber, orderGen(matcher, alice, assetPairs)).sample.get
private val lastOrder = orderGen(matcher, alice, assetPairs).sample.get

"Place, fill and cancel a lot of orders" in {
val cancels = (1 to cancelsNumber).map(_ => choose(orders))
Expand All @@ -45,7 +34,9 @@ class MatcherRecoveryTestSuite extends MatcherSuiteBase {
executeCommands(List(MatcherCommand.Place(node, lastOrder)))
}

"Wait until all requests are processed - 1" in node.waitForStableOffset(10, 100, 200.millis)
"Wait until all requests are processed - 1" in {
node.waitForStableOffset(10, 100, 200.millis)
}

private var stateBefore: MatcherState = _

Expand All @@ -60,6 +51,7 @@ class MatcherRecoveryTestSuite extends MatcherSuiteBase {
snapshotOffset should be > 0L
}
}
node.waitForHeight(node.height + 1)
}

"Restart the matcher" in docker.restartContainer(node)
Expand All @@ -76,5 +68,14 @@ class MatcherRecoveryTestSuite extends MatcherSuiteBase {
stateBefore shouldBe stateAfter
}

private def state = node.matcherState(assetPairs, orders, Seq(alice))
private def state = cleanState(node.matcherState(assetPairs, orders, Seq(alice)))

protected def cleanState(state: MatcherState): MatcherState = state

override protected def beforeAll(): Unit = {
super.beforeAll()

val xs = Seq(IssueEthTx, IssueUsdTx).map(_.json()).map(node.broadcastRequest(_))
xs.foreach(x => node.waitForTransaction(x.id))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.wavesplatform.it.api.SyncMatcherHttpApi._
import com.wavesplatform.it.api.{AssetDecimalsInfo, LevelResponse}
import com.wavesplatform.it.sync.config.MatcherPriceAssetConfig._
import com.wavesplatform.it.util._
import com.wavesplatform.matcher.db.OrderDB
import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves}
import com.wavesplatform.transaction.assets.exchange.OrderType._
import com.wavesplatform.transaction.assets.exchange._
Expand Down Expand Up @@ -244,7 +245,7 @@ class MatcherTestSuite extends MatcherSuiteBase with TableDrivenPropertyChecks {

"request order book for blacklisted pair" in {
val f = node.matcherGetStatusCode(s"/matcher/orderbook/$ForbiddenAssetId/WAVES", 404)
f.message shouldBe s"Invalid Asset ID: $ForbiddenAssetId"
f.message shouldBe s"The asset $ForbiddenAssetId not found"
}

"should consider UTX pool when checking the balance" in {
Expand Down Expand Up @@ -369,6 +370,27 @@ class MatcherTestSuite extends MatcherSuiteBase with TableDrivenPropertyChecks {
}
}

"Order statuses for old orders" in {
val (amountAssetTx, priceAssetTx, pair) = issueAssetPair(alice, 2, 8)

def placeOrder(i: Int, tpe: OrderType) = node.placeOrder(alice, pair, tpe, 100L + i, Order.PriceConstant, matcherFee)

val txIds = List(amountAssetTx, priceAssetTx).map(_.json()).map(node.broadcastRequest(_)).map(_.id)
txIds.foreach(node.waitForTransaction(_))

val ids = (1 to (OrderDB.OldestOrderIndexOffset + 5)).flatMap { i =>
List(
placeOrder(i, OrderType.BUY).message.id,
placeOrder(i, OrderType.SELL).message.id
)
}

ids.foreach { id =>
val status = node.orderStatus(id, pair, waitForStatus = false).status
withClue(id)(status should not be "NotFound")
}
}

"Debug information was updated" in {
val currentOffset = node.getCurrentOffset
currentOffset should be > 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MatcherTickerTestSuite extends MatcherSuiteBase {

"matcher ticker validation" - {
"get tickers for unavailable asset should produce error" in {
SyncMatcherHttpApi.assertNotFoundAndMessage(node.marketStatus(wctWavesPair), s"Invalid Asset ID: ${IssueEightDigitAssetTx.id()}")
SyncMatcherHttpApi.assertNotFoundAndMessage(node.marketStatus(wctWavesPair), s"The asset ${IssueEightDigitAssetTx.id()} not found")
}

"status of empty orderbook" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,21 @@ class OrderBookTestSuite extends MatcherSuiteBase {
node.waitOrderStatus(wctUsdPair, sellOrder, "Cancelled")
}

"orderbook was really deleted" in {
val orderBook = node.orderBook(wctUsdPair)
orderBook.bids shouldBe empty
orderBook.asks shouldBe empty
"orderbook was deleted" in {
withClue("orderBook") {
val orderBook = node.orderBook(wctUsdPair)
orderBook.bids shouldBe empty
orderBook.asks shouldBe empty
}

withClue("tradingMarkets") {
val tradingPairs = node.tradingMarkets().markets.map(x => s"${x.amountAsset}-${x.priceAsset}")
tradingPairs shouldNot contain(wctUsdPair.key)
}

withClue("getAllSnapshotOffsets") {
node.getAllSnapshotOffsets.keySet shouldNot contain(wctUsdPair.key)
}
}

"reserved balances should be released for the pair" in {
Expand Down
Loading

0 comments on commit 8b568b7

Please sign in to comment.