Skip to content

Commit

Permalink
Added Units registry (#3977)
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Dec 2, 2024
1 parent 4e408d5 commit 663db56
Show file tree
Hide file tree
Showing 17 changed files with 66 additions and 315 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG baseImage=eclipse-temurin:11-jre-noble
ARG baseImage=eclipse-temurin:21-jre-noble
FROM $baseImage

ENV WAVES_LOG_LEVEL=INFO
Expand Down
1 change: 1 addition & 0 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
JAVA_OPTS="-XX:+ExitOnOutOfMemoryError
-Xmx${WAVES_HEAP_SIZE}
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-Dlogback.stdout.level=${WAVES_LOG_LEVEL}
-Dlogback.file.directory=${WVLOG}
-Dwaves.config.directory=/etc/waves
Expand Down
1 change: 0 additions & 1 deletion node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
scoreStatsReporter,
configRoot,
rocksDB,
() => utxStorage.getPriorityPool.map(_.compositeBlockchain),
routeTimeout,
heavyRequestScheduler
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ case class DebugApiRoute(
scoreReporter: Coeval[RxScoreObserver.Stats],
configRoot: ConfigObject,
db: RocksDBWriter,
priorityPoolBlockchain: () => Option[Blockchain],
routeTimeout: RouteTimeout,
heavyRequestScheduler: Scheduler
) extends ApiRoute
Expand Down Expand Up @@ -197,14 +196,13 @@ case class DebugApiRoute(

def validate: Route =
path("validate")(jsonPost[JsObject] { jsv =>
val resBlockchain = priorityPoolBlockchain().getOrElse(blockchain)
val startTime = System.nanoTime()

val parsedTransaction = TransactionFactory.fromSignedRequest(jsv)

val tracedSnapshot = for {
tx <- TracedResult(parsedTransaction)
diff <- TransactionDiffer.forceValidate(resBlockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(resBlockchain, tx)
diff <- TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(blockchain, tx)
} yield (tx, diff)

val error = tracedSnapshot.resultE match {
Expand All @@ -218,7 +216,7 @@ case class DebugApiRoute(
.fold(
_ => this.serializer,
{ case (_, snapshot) =>
val snapshotBlockchain = SnapshotBlockchain(resBlockchain, snapshot)
val snapshotBlockchain = SnapshotBlockchain(blockchain, snapshot)
this.serializer.copy(blockchain = snapshotBlockchain)
}
)
Expand All @@ -230,8 +228,8 @@ case class DebugApiRoute(
val meta = tx match {
case ist: InvokeScriptTransaction =>
val result = diff.scriptResults.get(ist.id())
TransactionMeta.Invoke(Height(resBlockchain.height), ist, TxMeta.Status.Succeeded, diff.scriptsComplexity, result)
case tx => TransactionMeta.Default(Height(resBlockchain.height), tx, TxMeta.Status.Succeeded, diff.scriptsComplexity)
TransactionMeta.Invoke(Height(blockchain.height), ist, TxMeta.Status.Succeeded, diff.scriptsComplexity, result)
case tx => TransactionMeta.Default(Height(blockchain.height), tx, TxMeta.Status.Succeeded, diff.scriptsComplexity)
}
serializer.transactionWithMetaJson(meta)
}
Expand All @@ -244,7 +242,7 @@ case class DebugApiRoute(
case ist: InvokeScriptTrace => ist.maybeLoggedJson(logged = true)(serializer.invokeScriptResultWrites)
case trace => trace.loggedJson
},
"height" -> resBlockchain.height
"height" -> blockchain.height
)

error.fold(response ++ extendedJson)(err =>
Expand Down
3 changes: 1 addition & 2 deletions node/src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class MinerImpl(
settings.minerSettings,
minerScheduler,
appenderScheduler,
transactionAdded,
utx.getPriorityPool.map(p => p.nextMicroBlockSize(_)).getOrElse(identity)
transactionAdded
)

def getNextBlockGenerationOffset(account: KeyPair): Either[String, FiniteDuration] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ object MicroBlockMiner {
settings: MinerSettings,
minerScheduler: SchedulerService,
appenderScheduler: SchedulerService,
transactionAdded: Observable[Unit],
nextMicroBlockSize: Int => Int = identity
transactionAdded: Observable[Unit]
): MicroBlockMiner =
new MicroBlockMinerImpl(
setDebugState,
Expand All @@ -41,7 +40,6 @@ object MicroBlockMiner {
settings,
minerScheduler,
appenderScheduler,
transactionAdded,
nextMicroBlockSize
transactionAdded
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class MicroBlockMinerImpl(
settings: MinerSettings,
minerScheduler: SchedulerService,
appenderScheduler: SchedulerService,
transactionAdded: Observable[Unit],
nextMicroBlockSize: Int => Int
transactionAdded: Observable[Unit]
) extends MicroBlockMiner
with ScorexLogging {

Expand Down Expand Up @@ -73,7 +72,7 @@ class MicroBlockMinerImpl(
val mdConstraint = MultiDimensionalMiningConstraint(
restTotalConstraint,
OneDimensionalMiningConstraint(
nextMicroBlockSize(settings.maxTransactionsInMicroBlock),
settings.maxTransactionsInMicroBlock,
TxEstimators.one,
"MaxTxsInMicroBlock"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ case class FunctionalitySettings(
xtnBuybackRewardPeriod: Int = Int.MaxValue,
lightNodeBlockFieldsAbsenceInterval: Int = 1000,
blockRewardBoostPeriod: Int = 1000,
paymentsCheckHeight: Int = 0
paymentsCheckHeight: Int = 0,
unitsRegistryAddress: Option[String] = None
) {
val allowLeasedBalanceTransferUntilHeight: Int = blockVersion3AfterHeight
val allowTemporaryNegativeUntil: Long = lastTimeBasedForkParameter
Expand All @@ -92,6 +93,8 @@ case class FunctionalitySettings(
daoAddress.traverse(Address.fromString(_)).leftMap(_ => "Incorrect dao-address")
lazy val xtnBuybackAddressParsed: Either[String, Option[Address]] =
xtnBuybackAddress.traverse(Address.fromString(_)).leftMap(_ => "Incorrect xtn-buyback-address")
lazy val unitsRegistryAddressParsed: Either[String, Option[Address]] =
unitsRegistryAddress.traverse(Address.fromString(_)).leftMap(_ => "Incorrect units-registry-address")

require(featureCheckBlocksPeriod > 0, "featureCheckBlocksPeriod must be greater than 0")
require(
Expand Down Expand Up @@ -133,7 +136,8 @@ object FunctionalitySettings {
xtnBuybackAddress = Some("3PFjHWuH6WXNJbwnfLHqNFBpwBS5dkYjTfv"),
xtnBuybackRewardPeriod = 100000,
blockRewardBoostPeriod = 300_000,
paymentsCheckHeight = 4303300
paymentsCheckHeight = 4303300,
unitsRegistryAddress = Some("3P8LfPXcveST7WKkV3UACQNdr6J3shPYong")
)

val TESTNET: FunctionalitySettings = apply(
Expand All @@ -149,7 +153,8 @@ object FunctionalitySettings {
daoAddress = Some("3Myb6G8DkdBb8YcZzhrky65HrmiNuac3kvS"),
xtnBuybackAddress = Some("3N13KQpdY3UU7JkWUBD9kN7t7xuUgeyYMTT"),
xtnBuybackRewardPeriod = 2000,
blockRewardBoostPeriod = 2_000
blockRewardBoostPeriod = 2_000,
unitsRegistryAddress = Some("3N9fwNGJcUcAbhh7YPr6mrpuGJD4tApZFsT")
)

val STAGENET: FunctionalitySettings = apply(
Expand Down
14 changes: 9 additions & 5 deletions node/src/main/scala/com/wavesplatform/state/Blockchain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ trait Blockchain {

def balanceAtHeight(address: Address, height: Int, assetId: Asset = Waves): Option[(Int, Long)]

/**
* Retrieves Waves balance snapshot in the [from, to] range (inclusive)
* @return Balance snapshots from most recent to oldest.
/** Retrieves Waves balance snapshot in the [from, to] range (inclusive)
* @return
* Balance snapshots from most recent to oldest.
*/
def balanceSnapshots(address: Address, from: Int, to: Option[BlockId]): Seq[BalanceSnapshot]

Expand Down Expand Up @@ -225,13 +225,17 @@ object Blockchain {
blockchain.effectiveBalanceBanHeights(address).contains(height)

def supportsLightNodeBlockFields(height: Int = blockchain.height): Boolean =
blockchain.featureActivationHeight(LightNode.id).exists(height >= _ + blockchain.settings.functionalitySettings.lightNodeBlockFieldsAbsenceInterval)
blockchain
.featureActivationHeight(LightNode.id)
.exists(height >= _ + blockchain.settings.functionalitySettings.lightNodeBlockFieldsAbsenceInterval)

def blockRewardBoost(height: Int): Int =
blockchain
.featureActivationHeight(BlockchainFeatures.BoostBlockReward.id)
.filter { boostHeight =>
boostHeight <= height && height < boostHeight + blockchain.settings.functionalitySettings.blockRewardBoostPeriod
}.fold(1)(_ => BlockRewardCalculator.RewardBoost)
}
.fold(1)(_ => BlockRewardCalculator.RewardBoost)

}
}
47 changes: 18 additions & 29 deletions node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ case class UtxPoolImpl(
private[this] val inUTXPoolOrdering = TransactionsOrdering.InUTXPool(utxSettings.fastLaneAddresses)

// State
val priorityPool = new UtxPriorityPool(blockchain)
val priorityPool = new UtxPriorityPool
private[this] val transactions = new ConcurrentHashMap[ByteStr, Transaction]()

override def getPriorityPool: Option[UtxPriorityPool] = Some(priorityPool)

override def putIfNew(tx: Transaction, forceValidate: Boolean): TracedResult[ValidationError, Boolean] = {
if (transactions.containsKey(tx.id()) || priorityPool.contains(tx.id())) TracedResult.wrapValue(false)
if (transactions.containsKey(tx.id())) TracedResult.wrapValue(false)
else putNewTx(tx, forceValidate)
}

Expand Down Expand Up @@ -171,10 +171,8 @@ case class UtxPoolImpl(
removeIds(ids)
}

def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit = {
val txs = priorityPool.setPriorityDiffs(discSnapshots)
txs.foreach(addTransaction(_, verify = false, canLock = false))
}
def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit =
priorityPool.setPriorityDiffs(discSnapshots).foreach(addTransaction(_, verify = false))

def resetPriorityPool(): Unit =
priorityPool.setPriorityDiffs(Seq.empty)
Expand All @@ -186,23 +184,19 @@ case class UtxPoolImpl(
}
}

private[this] def removeIds(removed: Set[ByteStr]): Unit = {
val priorityRemoved = priorityPool.removeIds(removed)
val factRemoved = priorityRemoved ++ removed.flatMap(id => removeFromOrdPool(id))
factRemoved.foreach(TxStateActions.removeMined(_))
}
private[this] def removeIds(removed: Set[ByteStr]): Unit =
removed.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_))

private[utx] def addTransaction(
tx: Transaction,
verify: Boolean,
forceValidate: Boolean = false,
canLock: Boolean = true
forceValidate: Boolean = false
): TracedResult[ValidationError, Boolean] = {
val diffEi = {
def calculateSnapshot(): TracedResult[ValidationError, StateSnapshot] = {
if (forceValidate)
TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(
priorityPool.compositeBlockchain,
blockchain,
tx
)
else
Expand All @@ -213,13 +207,12 @@ case class UtxPoolImpl(
verify,
enableExecutionLog = true
)(
priorityPool.compositeBlockchain,
blockchain,
tx
)
}

if (canLock) priorityPool.optimisticRead(calculateSnapshot())(_.resultE.isLeft)
else calculateSnapshot()
calculateSnapshot()
}

if (!verify || diffEi.resultE.isRight) {
Expand All @@ -235,13 +228,12 @@ case class UtxPoolImpl(
}

override def all: Seq[Transaction] =
(priorityPool.priorityTransactions ++ nonPriorityTransactions).distinct
(priorityPool.priorityTransactionIds.flatMap(id => Option(transactions.get(id))) ++ nonPriorityTransactions).distinct

override def size: Int = transactions.size

override def transactionById(transactionId: ByteStr): Option[Transaction] =
Option(transactions.get(transactionId))
.orElse(priorityPool.transactionById(transactionId))

private def scriptedAddresses(tx: Transaction): Set[Address] = tx match {
case t if inUTXPoolOrdering.isWhitelisted(t) => Set.empty
Expand All @@ -256,23 +248,21 @@ case class UtxPoolImpl(
private[this] case class TxEntry(tx: Transaction, priority: Boolean)

private[this] def createTxEntrySeq(): Seq[TxEntry] =
priorityPool.priorityTransactions.map(TxEntry(_, priority = true)) ++ nonPriorityTransactions.map(
TxEntry(_, priority = false)
)
priorityPool.priorityTransactionIds.flatMap(id => Option(transactions.get(id)).map(TxEntry(_, priority = true))) ++
nonPriorityTransactions.map(TxEntry(_, priority = false))

override def packUnconfirmed(
initialConstraint: MultiDimensionalMiningConstraint,
prevStateHash: Option[ByteStr],
strategy: PackStrategy,
cancelled: () => Boolean
): (Option[Seq[Transaction]], MultiDimensionalMiningConstraint, Option[ByteStr]) = {
): (Option[Seq[Transaction]], MultiDimensionalMiningConstraint, Option[ByteStr]) =
pack(TransactionDiffer(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true))(
initialConstraint,
strategy,
prevStateHash,
cancelled
)
}

def cleanUnconfirmed(): Unit = {
log.trace(s"Starting UTX cleanup at height ${blockchain.height}")
Expand All @@ -286,7 +276,7 @@ case class UtxPoolImpl(
} else {
val differ = if (!isMiningEnabled && utxSettings.forceValidateInCleanup) {
TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(
priorityPool.compositeBlockchain,
blockchain,
_
)
} else {
Expand All @@ -296,7 +286,7 @@ case class UtxPoolImpl(
utxSettings.alwaysUnlimitedExecution,
enableExecutionLog = true
)(
priorityPool.compositeBlockchain,
blockchain,
_
)
}
Expand Down Expand Up @@ -480,11 +470,10 @@ case class UtxPoolImpl(

log.trace(
s"Validated ${packResult.validatedTransactions.size} transactions, " +
s"of which ${packResult.transactions.fold(0)(_.size)} were packed, ${transactions.size() + priorityPool.priorityTransactions.size} transactions remaining"
s"of which ${packResult.transactions.fold(0)(_.size)} were packed, ${transactions.size()} transactions remaining"
)

if (packResult.removedTransactions.nonEmpty) log.trace(s"Removing invalid transactions: ${packResult.removedTransactions.mkString(", ")}")
priorityPool.invalidateTxs(packResult.removedTransactions)
(packResult.transactions.map(_.reverse), packResult.constraint, packResult.stateHash)
}

Expand Down Expand Up @@ -563,7 +552,7 @@ case class UtxPoolImpl(

private def cleanupLoop(): Unit = cleanupScheduler.execute { () =>
while (scheduled.compareAndSet(true, false)) {
if (!transactions.isEmpty || priorityPool.priorityTransactions.nonEmpty) {
if (!transactions.isEmpty) {
cleanUnconfirmed()
}
}
Expand Down
Loading

0 comments on commit 663db56

Please sign in to comment.