From dd0e1d7245983c178bb5bf6649864985a9941a9d Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Sun, 6 Nov 2022 12:48:21 +0400 Subject: [PATCH] Async cleanup after microblock (#3785) --- .../main/scala/com/wavesplatform/Application.scala | 2 +- .../state/appender/ExtensionAppender.scala | 2 +- .../state/appender/MicroblockAppender.scala | 11 +++++++---- .../com/wavesplatform/state/appender/package.scala | 2 +- .../scala/com/wavesplatform/utx/UtxPoolImpl.scala | 9 ++++----- .../com/wavesplatform/utx/UtxFailedTxsSpec.scala | 14 +++++++------- 6 files changed, 21 insertions(+), 19 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index 073e0d9858e..c30df6c270f 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -203,7 +203,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con .map { case Right(discardedBlocks) => allChannels.broadcast(LocalScoreChanged(blockchainUpdater.score)) - if (returnTxsToUtx) utxStorage.addAndCleanup(discardedBlocks.view.flatMap(_._1.transactionData)) + if (returnTxsToUtx) utxStorage.addAndScheduleCleanup(discardedBlocks.view.flatMap(_._1.transactionData)) Right(discardedBlocks) case Left(error) => Left(error) } diff --git a/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala b/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala index 36a7e265c27..fa1d2c2abc5 100644 --- a/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala +++ b/node/src/main/scala/com/wavesplatform/state/appender/ExtensionAppender.scala @@ -102,7 +102,7 @@ object ExtensionAppender extends ScorexLogging { val newTransactions = newBlocks.view.flatMap(_.transactionData).toSet utxStorage.removeAll(newTransactions) - utxStorage.addAndCleanup(droppedBlocks.flatMap(_._1.transactionData).filterNot(newTransactions)) + utxStorage.addAndScheduleCleanup(droppedBlocks.flatMap(_._1.transactionData).filterNot(newTransactions)) Right(Some(blockchainUpdater.score)) } } diff --git a/node/src/main/scala/com/wavesplatform/state/appender/MicroblockAppender.scala b/node/src/main/scala/com/wavesplatform/state/appender/MicroblockAppender.scala index abcbb8caefb..ebc99c7c4ee 100644 --- a/node/src/main/scala/com/wavesplatform/state/appender/MicroblockAppender.scala +++ b/node/src/main/scala/com/wavesplatform/state/appender/MicroblockAppender.scala @@ -30,11 +30,14 @@ object MicroblockAppender extends ScorexLogging { blockchainUpdater .processMicroBlock(microBlock, verify) .map { totalBlockId => - if (microBlock.transactionData.nonEmpty) log.trace { - s"Removing mined txs from ${microBlock.stringRepr(totalBlockId)}: ${microBlock.transactionData.map(_.id()).mkString(", ")}" + if (microBlock.transactionData.nonEmpty) { + utxStorage.removeAll(microBlock.transactionData) + log.trace( + s"Removing txs of ${microBlock.stringRepr(totalBlockId)} ${microBlock.transactionData.map(_.id()).mkString("(", ", ", ")")} from UTX pool" + ) } - utxStorage.removeAll(microBlock.transactionData) - utxStorage.cleanUnconfirmed() + + utxStorage.scheduleCleanup() totalBlockId } }).executeOn(scheduler) diff --git a/node/src/main/scala/com/wavesplatform/state/appender/package.scala b/node/src/main/scala/com/wavesplatform/state/appender/package.scala index 7b84d1db08b..bfdffe201ce 100644 --- a/node/src/main/scala/com/wavesplatform/state/appender/package.scala +++ b/node/src/main/scala/com/wavesplatform/state/appender/package.scala @@ -39,7 +39,7 @@ package object appender { .map { discardedDiffs => utx.removeAll(block.transactionData) utx.setPriorityDiffs(discardedDiffs) - utx.runCleanup() + utx.scheduleCleanup() Some(blockchainUpdater.height) } } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala index 10b7a722759..954c92c8783 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala @@ -155,11 +155,10 @@ class UtxPoolImpl( log.trace(s"putIfNew(${tx.id()}) succeeded, isNew = $isNew") case Left(err) => log.debug(s"putIfNew(${tx.id()}) failed with ${extractErrorMessage(err)}") - val errMsg = err match { + traceLogger.trace(err match { case w: WithLog => w.toStringWithLog(maxTxErrorLogSize) case err => err.toString - } - traceLogger.trace(errMsg) + }) } tracedIsNew } @@ -524,12 +523,12 @@ class UtxPoolImpl( } /** DOES NOT verify transactions */ - def addAndCleanup(transactions: Iterable[Transaction]): Unit = { + def addAndScheduleCleanup(transactions: Iterable[Transaction]): Unit = { transactions.foreach(addTransaction(_, verify = false)) TxCleanup.runCleanupAsync() } - def runCleanup(): Unit = { + def scheduleCleanup(): Unit = { TxCleanup.runCleanupAsync() } diff --git a/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala b/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala index 7ecdf94a829..fa4b4813c26 100644 --- a/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala +++ b/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala @@ -36,7 +36,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.putIfNew(tx, forceValidate = false).resultE should produce("reached err") utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err") - utx.addAndCleanup(Seq(tx)) + utx.addAndScheduleCleanup(Seq(tx)) eventually { utx.size shouldBe 0 } @@ -50,7 +50,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err") utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true) - utx.addAndCleanup(Nil) + utx.addAndScheduleCleanup(Nil) Thread.sleep(5000) utx.size shouldBe 1 @@ -107,7 +107,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.putIfNew(tx, forceValidate = true).resultE should produce(s"Transfer error: asset '${TestValues.asset}' is not found on the blockchain") utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true) - utx.addAndCleanup(Nil) + utx.addAndScheduleCleanup(Nil) Thread.sleep(5000) utx.size shouldBe 1 @@ -147,7 +147,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err") utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true) - utx.addAndCleanup(Nil) + utx.addAndScheduleCleanup(Nil) Thread.sleep(5000) utx.size shouldBe 1 @@ -165,7 +165,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.putIfNew(tx, forceValidate = false).resultE should produce("reached err") utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err") - utx.addAndCleanup(Seq(tx)) + utx.addAndScheduleCleanup(Seq(tx)) eventually { utx.size shouldBe 0 } @@ -185,7 +185,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err") utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true) - utx.addAndCleanup(Nil) + utx.addAndScheduleCleanup(Nil) Thread.sleep(5000) utx.size shouldBe 1 @@ -227,7 +227,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { utx.size shouldBe 100 d.appendBlock() // Height is odd - utx.addAndCleanup(Nil) + utx.addAndScheduleCleanup(Nil) eventually(timeout(10 seconds), interval(500 millis)) { utx.size shouldBe 0 utx.all shouldBe Nil