Skip to content

Commit

Permalink
Replaced priority pool with the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Nov 20, 2024
1 parent 0cdc2c7 commit 70e4d38
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 317 deletions.
1 change: 0 additions & 1 deletion node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
scoreStatsReporter,
configRoot,
rocksDB,
() => utxStorage.getPriorityPool.map(_.compositeBlockchain),
routeTimeout,
heavyRequestScheduler
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ case class DebugApiRoute(
scoreReporter: Coeval[RxScoreObserver.Stats],
configRoot: ConfigObject,
db: RocksDBWriter,
priorityPoolBlockchain: () => Option[Blockchain],
routeTimeout: RouteTimeout,
heavyRequestScheduler: Scheduler
) extends ApiRoute
Expand Down Expand Up @@ -197,7 +196,7 @@ case class DebugApiRoute(

def validate: Route =
path("validate")(jsonPost[JsObject] { jsv =>
val resBlockchain = priorityPoolBlockchain().getOrElse(blockchain)
val resBlockchain = blockchain
val startTime = System.nanoTime()

val parsedTransaction = TransactionFactory.fromSignedRequest(jsv)
Expand Down
3 changes: 1 addition & 2 deletions node/src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class MinerImpl(
settings.minerSettings,
minerScheduler,
appenderScheduler,
transactionAdded,
utx.getPriorityPool.map(p => p.nextMicroBlockSize(_)).getOrElse(identity)
transactionAdded
)

def getNextBlockGenerationOffset(account: KeyPair): Either[String, FiniteDuration] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ object MicroBlockMiner {
settings: MinerSettings,
minerScheduler: SchedulerService,
appenderScheduler: SchedulerService,
transactionAdded: Observable[Unit],
nextMicroBlockSize: Int => Int = identity
transactionAdded: Observable[Unit]
): MicroBlockMiner =
new MicroBlockMinerImpl(
setDebugState,
Expand All @@ -41,7 +40,6 @@ object MicroBlockMiner {
settings,
minerScheduler,
appenderScheduler,
transactionAdded,
nextMicroBlockSize
transactionAdded
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class MicroBlockMinerImpl(
settings: MinerSettings,
minerScheduler: SchedulerService,
appenderScheduler: SchedulerService,
transactionAdded: Observable[Unit],
nextMicroBlockSize: Int => Int
transactionAdded: Observable[Unit]
) extends MicroBlockMiner
with ScorexLogging {

Expand Down Expand Up @@ -73,7 +72,7 @@ class MicroBlockMinerImpl(
val mdConstraint = MultiDimensionalMiningConstraint(
restTotalConstraint,
OneDimensionalMiningConstraint(
nextMicroBlockSize(settings.maxTransactionsInMicroBlock),
settings.maxTransactionsInMicroBlock,
TxEstimators.one,
"MaxTxsInMicroBlock"
)
Expand Down
47 changes: 18 additions & 29 deletions node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ case class UtxPoolImpl(
private[this] val inUTXPoolOrdering = TransactionsOrdering.InUTXPool(utxSettings.fastLaneAddresses)

// State
val priorityPool = new UtxPriorityPool(blockchain)
val priorityPool = new UtxPriorityPool
private[this] val transactions = new ConcurrentHashMap[ByteStr, Transaction]()

override def getPriorityPool: Option[UtxPriorityPool] = Some(priorityPool)

override def putIfNew(tx: Transaction, forceValidate: Boolean): TracedResult[ValidationError, Boolean] = {
if (transactions.containsKey(tx.id()) || priorityPool.contains(tx.id())) TracedResult.wrapValue(false)
if (transactions.containsKey(tx.id())) TracedResult.wrapValue(false)
else putNewTx(tx, forceValidate)
}

Expand Down Expand Up @@ -171,10 +171,8 @@ case class UtxPoolImpl(
removeIds(ids)
}

def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit = {
val txs = priorityPool.setPriorityDiffs(discSnapshots)
txs.foreach(addTransaction(_, verify = false, canLock = false))
}
def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit =
priorityPool.setPriorityDiffs(discSnapshots).foreach(addTransaction(_, verify = false))

def resetPriorityPool(): Unit =
priorityPool.setPriorityDiffs(Seq.empty)
Expand All @@ -186,23 +184,19 @@ case class UtxPoolImpl(
}
}

private[this] def removeIds(removed: Set[ByteStr]): Unit = {
val priorityRemoved = priorityPool.removeIds(removed)
val factRemoved = priorityRemoved ++ removed.flatMap(id => removeFromOrdPool(id))
factRemoved.foreach(TxStateActions.removeMined(_))
}
private[this] def removeIds(removed: Set[ByteStr]): Unit =
removed.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_))

private[utx] def addTransaction(
tx: Transaction,
verify: Boolean,
forceValidate: Boolean = false,
canLock: Boolean = true
forceValidate: Boolean = false
): TracedResult[ValidationError, Boolean] = {
val diffEi = {
def calculateSnapshot(): TracedResult[ValidationError, StateSnapshot] = {
if (forceValidate)
TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(
priorityPool.compositeBlockchain,
blockchain,
tx
)
else
Expand All @@ -213,13 +207,12 @@ case class UtxPoolImpl(
verify,
enableExecutionLog = true
)(
priorityPool.compositeBlockchain,
blockchain,
tx
)
}

if (canLock) priorityPool.optimisticRead(calculateSnapshot())(_.resultE.isLeft)
else calculateSnapshot()
calculateSnapshot()
}

if (!verify || diffEi.resultE.isRight) {
Expand All @@ -235,13 +228,12 @@ case class UtxPoolImpl(
}

override def all: Seq[Transaction] =
(priorityPool.priorityTransactions ++ nonPriorityTransactions).distinct
(priorityPool.priorityTransactionIds.flatMap(id => Option(transactions.get(id))) ++ nonPriorityTransactions).distinct

override def size: Int = transactions.size

override def transactionById(transactionId: ByteStr): Option[Transaction] =
Option(transactions.get(transactionId))
.orElse(priorityPool.transactionById(transactionId))

private def scriptedAddresses(tx: Transaction): Set[Address] = tx match {
case t if inUTXPoolOrdering.isWhitelisted(t) => Set.empty
Expand All @@ -256,23 +248,21 @@ case class UtxPoolImpl(
private[this] case class TxEntry(tx: Transaction, priority: Boolean)

private[this] def createTxEntrySeq(): Seq[TxEntry] =
priorityPool.priorityTransactions.map(TxEntry(_, priority = true)) ++ nonPriorityTransactions.map(
TxEntry(_, priority = false)
)
priorityPool.priorityTransactionIds.flatMap(id => Option(transactions.get(id)).map(TxEntry(_, priority = true))) ++
nonPriorityTransactions.map(TxEntry(_, priority = false))

override def packUnconfirmed(
initialConstraint: MultiDimensionalMiningConstraint,
prevStateHash: Option[ByteStr],
strategy: PackStrategy,
cancelled: () => Boolean
): (Option[Seq[Transaction]], MultiDimensionalMiningConstraint, Option[ByteStr]) = {
): (Option[Seq[Transaction]], MultiDimensionalMiningConstraint, Option[ByteStr]) =
pack(TransactionDiffer(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true))(
initialConstraint,
strategy,
prevStateHash,
cancelled
)
}

def cleanUnconfirmed(): Unit = {
log.trace(s"Starting UTX cleanup at height ${blockchain.height}")
Expand All @@ -286,7 +276,7 @@ case class UtxPoolImpl(
} else {
val differ = if (!isMiningEnabled && utxSettings.forceValidateInCleanup) {
TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(
priorityPool.compositeBlockchain,
blockchain,
_
)
} else {
Expand All @@ -296,7 +286,7 @@ case class UtxPoolImpl(
utxSettings.alwaysUnlimitedExecution,
enableExecutionLog = true
)(
priorityPool.compositeBlockchain,
blockchain,
_
)
}
Expand Down Expand Up @@ -480,11 +470,10 @@ case class UtxPoolImpl(

log.trace(
s"Validated ${packResult.validatedTransactions.size} transactions, " +
s"of which ${packResult.transactions.fold(0)(_.size)} were packed, ${transactions.size() + priorityPool.priorityTransactions.size} transactions remaining"
s"of which ${packResult.transactions.fold(0)(_.size)} were packed, ${transactions.size()} transactions remaining"
)

if (packResult.removedTransactions.nonEmpty) log.trace(s"Removing invalid transactions: ${packResult.removedTransactions.mkString(", ")}")
priorityPool.invalidateTxs(packResult.removedTransactions)
(packResult.transactions.map(_.reverse), packResult.constraint, packResult.stateHash)
}

Expand Down Expand Up @@ -563,7 +552,7 @@ case class UtxPoolImpl(

private def cleanupLoop(): Unit = cleanupScheduler.execute { () =>
while (scheduled.compareAndSet(true, false)) {
if (!transactions.isEmpty || priorityPool.priorityTransactions.nonEmpty) {
if (!transactions.isEmpty) {
cleanUnconfirmed()
}
}
Expand Down
Loading

0 comments on commit 70e4d38

Please sign in to comment.