Skip to content

Commit

Permalink
Async cleanup after microblock (#3785)
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Nov 6, 2022
1 parent 32e1a9b commit dd0e1d7
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 19 deletions.
2 changes: 1 addition & 1 deletion node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
.map {
case Right(discardedBlocks) =>
allChannels.broadcast(LocalScoreChanged(blockchainUpdater.score))
if (returnTxsToUtx) utxStorage.addAndCleanup(discardedBlocks.view.flatMap(_._1.transactionData))
if (returnTxsToUtx) utxStorage.addAndScheduleCleanup(discardedBlocks.view.flatMap(_._1.transactionData))
Right(discardedBlocks)
case Left(error) => Left(error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object ExtensionAppender extends ScorexLogging {

val newTransactions = newBlocks.view.flatMap(_.transactionData).toSet
utxStorage.removeAll(newTransactions)
utxStorage.addAndCleanup(droppedBlocks.flatMap(_._1.transactionData).filterNot(newTransactions))
utxStorage.addAndScheduleCleanup(droppedBlocks.flatMap(_._1.transactionData).filterNot(newTransactions))
Right(Some(blockchainUpdater.score))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ object MicroblockAppender extends ScorexLogging {
blockchainUpdater
.processMicroBlock(microBlock, verify)
.map { totalBlockId =>
if (microBlock.transactionData.nonEmpty) log.trace {
s"Removing mined txs from ${microBlock.stringRepr(totalBlockId)}: ${microBlock.transactionData.map(_.id()).mkString(", ")}"
if (microBlock.transactionData.nonEmpty) {
utxStorage.removeAll(microBlock.transactionData)
log.trace(
s"Removing txs of ${microBlock.stringRepr(totalBlockId)} ${microBlock.transactionData.map(_.id()).mkString("(", ", ", ")")} from UTX pool"
)
}
utxStorage.removeAll(microBlock.transactionData)
utxStorage.cleanUnconfirmed()

utxStorage.scheduleCleanup()
totalBlockId
}
}).executeOn(scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ package object appender {
.map { discardedDiffs =>
utx.removeAll(block.transactionData)
utx.setPriorityDiffs(discardedDiffs)
utx.runCleanup()
utx.scheduleCleanup()
Some(blockchainUpdater.height)
}
}
Expand Down
9 changes: 4 additions & 5 deletions node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,10 @@ class UtxPoolImpl(
log.trace(s"putIfNew(${tx.id()}) succeeded, isNew = $isNew")
case Left(err) =>
log.debug(s"putIfNew(${tx.id()}) failed with ${extractErrorMessage(err)}")
val errMsg = err match {
traceLogger.trace(err match {
case w: WithLog => w.toStringWithLog(maxTxErrorLogSize)
case err => err.toString
}
traceLogger.trace(errMsg)
})
}
tracedIsNew
}
Expand Down Expand Up @@ -524,12 +523,12 @@ class UtxPoolImpl(
}

/** DOES NOT verify transactions */
def addAndCleanup(transactions: Iterable[Transaction]): Unit = {
def addAndScheduleCleanup(transactions: Iterable[Transaction]): Unit = {
transactions.foreach(addTransaction(_, verify = false))
TxCleanup.runCleanupAsync()
}

def runCleanup(): Unit = {
def scheduleCleanup(): Unit = {
TxCleanup.runCleanupAsync()
}

Expand Down
14 changes: 7 additions & 7 deletions node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {
utx.putIfNew(tx, forceValidate = false).resultE should produce("reached err")
utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err")

utx.addAndCleanup(Seq(tx))
utx.addAndScheduleCleanup(Seq(tx))
eventually {
utx.size shouldBe 0
}
Expand All @@ -50,7 +50,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {
utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err")
utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true)

utx.addAndCleanup(Nil)
utx.addAndScheduleCleanup(Nil)
Thread.sleep(5000)
utx.size shouldBe 1

Expand Down Expand Up @@ -107,7 +107,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {
utx.putIfNew(tx, forceValidate = true).resultE should produce(s"Transfer error: asset '${TestValues.asset}' is not found on the blockchain")
utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true)

utx.addAndCleanup(Nil)
utx.addAndScheduleCleanup(Nil)
Thread.sleep(5000)
utx.size shouldBe 1

Expand Down Expand Up @@ -147,7 +147,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {
utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err")
utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true)

utx.addAndCleanup(Nil)
utx.addAndScheduleCleanup(Nil)
Thread.sleep(5000)
utx.size shouldBe 1

Expand All @@ -165,7 +165,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {
utx.putIfNew(tx, forceValidate = false).resultE should produce("reached err")
utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err")

utx.addAndCleanup(Seq(tx))
utx.addAndScheduleCleanup(Seq(tx))
eventually {
utx.size shouldBe 0
}
Expand All @@ -185,7 +185,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {
utx.putIfNew(tx, forceValidate = true).resultE should produce("reached err")
utx.putIfNew(tx, forceValidate = false).resultE shouldBe Right(true)

utx.addAndCleanup(Nil)
utx.addAndScheduleCleanup(Nil)
Thread.sleep(5000)
utx.size shouldBe 1

Expand Down Expand Up @@ -227,7 +227,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually {

utx.size shouldBe 100
d.appendBlock() // Height is odd
utx.addAndCleanup(Nil)
utx.addAndScheduleCleanup(Nil)
eventually(timeout(10 seconds), interval(500 millis)) {
utx.size shouldBe 0
utx.all shouldBe Nil
Expand Down

0 comments on commit dd0e1d7

Please sign in to comment.