Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NODE-2348 Consistent API liquid state #3748

Open
wants to merge 36 commits into
base: version-1.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9eea9e6
Remove unused method
xrtm000 Aug 16, 2022
fee694f
Take into account discarded diffs for CommonAccountApi
xrtm000 Aug 17, 2022
6e040a0
Set private readLock() in BlockchainUpdaterImpl
xrtm000 Aug 18, 2022
2727d4e
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Aug 19, 2022
086942f
Use UTX priority pool lock to avoid inconsistent state and test
xrtm000 Aug 22, 2022
684e638
Generify test for assets
xrtm000 Aug 22, 2022
7fb4120
Test account data
xrtm000 Aug 22, 2022
7eee3e5
Pass blockchainWithDiscardedDiffs to all places where needed and opti…
xrtm000 Aug 23, 2022
723644d
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Aug 23, 2022
e2903d2
Add test for account script
xrtm000 Aug 23, 2022
1a603ec
Remove unspecified test from EvaluatorV2Test
xrtm000 Aug 23, 2022
8b1a39a
Correct use of lambdas
xrtm000 Aug 23, 2022
edef565
Better naming
xrtm000 Aug 23, 2022
f2e217b
Add test for active leases
xrtm000 Aug 23, 2022
6ea11b1
Unify tests
xrtm000 Aug 23, 2022
4ea47a1
Combine discarded mircoblocks diff with liquid diff
xrtm000 Aug 23, 2022
d2c2d8b
Remove unused import
xrtm000 Aug 24, 2022
a23fde7
Reduce code
xrtm000 Aug 24, 2022
5eb7db6
Use UTX priority pool lock for liquid diff
xrtm000 Aug 24, 2022
509dfe9
Test diff transaction
xrtm000 Aug 24, 2022
ec88935
Test asset description
xrtm000 Aug 24, 2022
048ca9a
Remove unused LeaseActionInfo
xrtm000 Aug 24, 2022
239fa0b
More tests
xrtm000 Aug 24, 2022
6b2f796
Correct blocking liquid diff
xrtm000 Aug 24, 2022
d1af705
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Aug 24, 2022
14ce46b
Optimize imports
xrtm000 Aug 24, 2022
1b35fc7
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Sep 9, 2022
c135f74
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Oct 20, 2022
dd7914c
Fixes after merge
xrtm000 Oct 20, 2022
fbc6481
Add tests
xrtm000 Oct 20, 2022
9f55303
Unify
xrtm000 Oct 20, 2022
cc1ccd8
Adapt tests
xrtm000 Oct 21, 2022
fdd10d9
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Oct 21, 2022
557636f
Fixes after merge
xrtm000 Oct 21, 2022
32a6967
Merge branch 'version-1.4.x' into node-2348-api-liquid-balances
xrtm000 Nov 21, 2022
25f72f9
Merge remote-tracking branch 'origin/version-1.4.x' into node-2348-ap…
xrtm000 Nov 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.wavesplatform.api.http.eth.EthRpcRoute
import com.wavesplatform.api.http.leasing.LeaseApiRoute
import com.wavesplatform.api.http.utils.UtilsApiRoute
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.consensus.PoSSelector
import com.wavesplatform.database.{DBExt, Keys, openDB}
import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent}
Expand All @@ -31,6 +32,7 @@ import com.wavesplatform.mining.{Miner, MinerDebugInfo, MinerImpl}
import com.wavesplatform.network.*
import com.wavesplatform.settings.WavesSettings
import com.wavesplatform.state.appender.{BlockAppender, ExtensionAppender, MicroblockAppender}
import com.wavesplatform.state.reader.CompositeBlockchain
import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height, TxMeta}
import com.wavesplatform.transaction.TxValidationError.GenericError
import com.wavesplatform.transaction.smart.script.trace.TracedResult
Expand Down Expand Up @@ -123,10 +125,20 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con

val establishedConnections = new ConcurrentHashMap[Channel, PeerInfo]
val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
val utxStorage =
new UtxPoolImpl(time, blockchainUpdater, settings.utxSettings, settings.maxTxErrorLogSize, settings.minerSettings.enable, utxEvents.onNext)
val utxStorage = new UtxPoolImpl(time, blockchainUpdater, settings.utxSettings, settings.maxTxErrorLogSize, settings.minerSettings.enable, utxEvents.onNext)
maybeUtx = Some(utxStorage)

def blockchainWithDiscardedDiffs(): CompositeBlockchain = {
def blockchain = CompositeBlockchain(blockchainUpdater, utxStorage.discardedMicrosDiff())
utxStorage.priorityPool.optimisticRead(blockchain)(_ => true)
}

def totalLiquidDiff(): Diff = {
def liquidDiff = blockchainUpdater.bestLiquidDiff.getOrElse(Diff())
def totalDiff = liquidDiff.combineE(utxStorage.discardedMicrosDiff()).explicitGet()
utxStorage.priorityPool.optimisticRead(totalDiff)(_ => true)
}

val timer = new HashedWheelTimer()
val utxSynchronizerLogger = LoggerFacade(LoggerFactory.getLogger(classOf[TransactionPublisher]))
val timedTxValidator =
Expand Down Expand Up @@ -223,18 +235,17 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
override def utxEvents: Observable[UtxEvent] = app.utxEvents

override val transactionsApi: CommonTransactionsApi = CommonTransactionsApi(
blockchainUpdater.bestLiquidDiff.map(diff => Height(blockchainUpdater.height) -> diff),
Some(Height(blockchainUpdater.height) -> totalLiquidDiff()),
db,
blockchainUpdater,
() => blockchainWithDiscardedDiffs(),
utxStorage,
tx => transactionPublisher.validateAndBroadcast(tx, None),
loadBlockAt(db, blockchainUpdater)
)
override val blocksApi: CommonBlocksApi =
CommonBlocksApi(blockchainUpdater, loadBlockMetaAt(db, blockchainUpdater), loadBlockInfoAt(db, blockchainUpdater))
override val accountsApi: CommonAccountsApi =
CommonAccountsApi(() => blockchainUpdater.bestLiquidDiff.getOrElse(Diff.empty), db, blockchainUpdater)
override val assetsApi: CommonAssetsApi = CommonAssetsApi(() => blockchainUpdater.bestLiquidDiff.getOrElse(Diff.empty), db, blockchainUpdater)
override val accountsApi: CommonAccountsApi = CommonAccountsApi(() => totalLiquidDiff(), db, () => blockchainWithDiscardedDiffs())
override val assetsApi: CommonAssetsApi = CommonAssetsApi(() => totalLiquidDiff(), db, () => blockchainWithDiscardedDiffs())
}

extensions = settings.extensions.map { extensionClassName =>
Expand Down Expand Up @@ -361,14 +372,14 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
)(heavyRequestScheduler)

val apiRoutes = Seq(
new EthRpcRoute(blockchainUpdater, extensionContext.transactionsApi, time),
EthRpcRoute(() => blockchainWithDiscardedDiffs(), extensionContext.transactionsApi, time),
NodeApiRoute(settings.restAPISettings, blockchainUpdater, () => shutdown()),
BlocksApiRoute(settings.restAPISettings, extensionContext.blocksApi, time, routeTimeout),
TransactionsApiRoute(
settings.restAPISettings,
extensionContext.transactionsApi,
wallet,
blockchainUpdater,
() => blockchainWithDiscardedDiffs(),
() => utxStorage.size,
transactionPublisher,
time,
Expand All @@ -387,7 +398,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
AddressApiRoute(
settings.restAPISettings,
wallet,
blockchainUpdater,
() => blockchainWithDiscardedDiffs(),
transactionPublisher,
time,
limitedScheduler,
Expand Down Expand Up @@ -423,7 +434,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
settings.restAPISettings,
wallet,
transactionPublisher,
blockchainUpdater,
() => blockchainWithDiscardedDiffs(),
time,
extensionContext.accountsApi,
extensionContext.assetsApi,
Expand All @@ -434,7 +445,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
LeaseApiRoute(
settings.restAPISettings,
wallet,
blockchainUpdater,
transactionPublisher,
time,
extensionContext.accountsApi,
Expand All @@ -446,7 +456,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
wallet,
transactionPublisher,
time,
blockchainUpdater,
() => blockchainWithDiscardedDiffs(),
routeTimeout
),
RewardApiRoute(blockchainUpdater)
Expand Down
6 changes: 3 additions & 3 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ object Importer extends ScorexLogging {
CommonTransactionsApi(
blockchainUpdater.bestLiquidDiff.map(diff => Height(blockchainUpdater.height) -> diff),
db,
blockchainUpdater,
() => blockchainUpdater,
utxPool,
_ => Future.successful(TracedResult.wrapE(Left(GenericError("Not implemented during import")))),
Application.loadBlockAt(db, blockchainUpdater)
)
override def blocksApi: CommonBlocksApi =
CommonBlocksApi(blockchainUpdater, Application.loadBlockMetaAt(db, blockchainUpdater), Application.loadBlockInfoAt(db, blockchainUpdater))
override def accountsApi: CommonAccountsApi =
CommonAccountsApi(() => blockchainUpdater.bestLiquidDiff.getOrElse(Diff.empty), db, blockchainUpdater)
CommonAccountsApi(() => blockchainUpdater.bestLiquidDiff.getOrElse(Diff.empty), db, () => blockchainUpdater)
override def assetsApi: CommonAssetsApi =
CommonAssetsApi(() => blockchainUpdater.bestLiquidDiff.getOrElse(Diff.empty), db, blockchainUpdater)
CommonAssetsApi(() => blockchainUpdater.bestLiquidDiff.getOrElse(Diff.empty), db, () => blockchainUpdater)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,24 @@ object CommonAccountsApi {

final case class BalanceDetails(regular: Long, generating: Long, available: Long, effective: Long, leaseIn: Long, leaseOut: Long)

def apply(diff: () => Diff, db: DB, blockchain: Blockchain): CommonAccountsApi = new CommonAccountsApi {
def apply(diff: () => Diff, db: DB, blockchain: () => Blockchain): CommonAccountsApi = new CommonAccountsApi {

override def balance(address: Address, confirmations: Int = 0): Long =
blockchain.balance(address, blockchain.height, confirmations)
override def balance(address: Address, confirmations: Int = 0): Long = {
val bc = blockchain()
bc.balance(address, bc.height, confirmations)
}

override def effectiveBalance(address: Address, confirmations: Int = 0): Long = {
blockchain.effectiveBalance(address, confirmations)
blockchain().effectiveBalance(address, confirmations)
}

override def balanceDetails(address: Address): Either[String, BalanceDetails] = {
val portfolio = blockchain.wavesPortfolio(address)
val bc = blockchain()
val portfolio = bc.wavesPortfolio(address)
portfolio.effectiveBalance.map(effectiveBalance =>
BalanceDetails(
portfolio.balance,
blockchain.generatingBalance(address),
bc.generatingBalance(address),
portfolio.balance - portfolio.lease.out,
effectiveBalance,
portfolio.lease.in,
Expand All @@ -78,26 +81,29 @@ object CommonAccountsApi {
)
}

override def assetBalance(address: Address, asset: IssuedAsset): Long = blockchain.balance(address, asset)
override def assetBalance(address: Address, asset: IssuedAsset): Long = blockchain().balance(address, asset)

override def portfolio(address: Address): Observable[(IssuedAsset, Long)] = {
val currentDiff = diff()
val bc = blockchain()
db.resourceObservable.flatMap { resource =>
Observable.fromIterator(Task(assetBalanceIterator(resource, address, currentDiff, includeNft(blockchain))))
Observable.fromIterator(Task(assetBalanceIterator(resource, address, currentDiff, includeNft(bc))))
}
}

override def nftList(address: Address, after: Option[IssuedAsset]): Observable[(IssuedAsset, AssetDescription)] = {
val currentDiff = diff()
val bc = blockchain()
db.resourceObservable.flatMap { resource =>
Observable.fromIterator(Task(nftIterator(resource, address, currentDiff, after, blockchain.assetDescription)))
Observable.fromIterator(Task(nftIterator(resource, address, currentDiff, after, bc.assetDescription)))
}
}

override def script(address: Address): Option[AccountScriptInfo] = blockchain.accountScript(address)
override def script(address: Address): Option[AccountScriptInfo] =
blockchain().accountScript(address)

override def data(address: Address, key: String): Option[DataEntry[?]] =
blockchain.accountData(address, key)
blockchain().accountData(address, key)

override def dataStream(address: Address, regex: Option[String]): Observable[DataEntry[?]] = Observable.defer {
val pattern = regex.map(_.r.pattern)
Expand All @@ -124,12 +130,14 @@ object CommonAccountsApi {
Observable.fromIterable((entriesFromDiff.values ++ entries).filterNot(_.isEmpty))
}

override def resolveAlias(alias: Alias): Either[ValidationError, Address] = blockchain.resolveAlias(alias)
override def resolveAlias(alias: Alias): Either[ValidationError, Address] =
blockchain().resolveAlias(alias)

override def activeLeases(address: Address): Observable[LeaseInfo] =
override def activeLeases(address: Address): Observable[LeaseInfo] = {
val bc = blockchain()
addressTransactions(
db,
Some(Height(blockchain.height) -> diff()),
Some(Height(bc.height) -> diff()),
address,
None,
Set(TransactionType.Lease, TransactionType.InvokeScript, TransactionType.InvokeExpression, TransactionType.Ethereum),
Expand All @@ -141,7 +149,7 @@ object CommonAccountsApi {
lt.id(),
lt.id(),
lt.sender.toAddress,
blockchain.resolveAlias(lt.recipient).explicitGet(),
bc.resolveAlias(lt.recipient).explicitGet(),
lt.amount.value,
leaseHeight,
LeaseInfo.Status.Active
Expand All @@ -153,13 +161,15 @@ object CommonAccountsApi {
extractLeases(address, scriptResult, tx.id(), height)
case _ => Seq()
}
}

private def extractLeases(subject: Address, result: InvokeScriptResult, txId: ByteStr, height: Height): Seq[LeaseInfo] = {
val bc = blockchain()
(for {
lease <- result.leases
details <- blockchain.leaseDetails(lease.id) if details.isActive
details <- bc.leaseDetails(lease.id) if details.isActive
sender = details.sender.toAddress
recipient <- blockchain.resolveAlias(lease.recipient).toOption if subject == sender || subject == recipient
recipient <- bc.resolveAlias(lease.recipient).toOption if subject == sender || subject == recipient
} yield LeaseInfo(
lease.id,
txId,
Expand All @@ -180,26 +190,29 @@ object CommonAccountsApi {
Right(recipientAddress)
}

def leaseInfo(leaseId: ByteStr): Option[LeaseInfo] = blockchain.leaseDetails(leaseId) map { ld =>
LeaseInfo(
leaseId,
ld.sourceId,
ld.sender.toAddress,
blockchain.resolveAlias(ld.recipient).orElse(resolveDisabledAlias(leaseId)).explicitGet(),
ld.amount,
ld.height,
ld.status match {
case Status.Active => LeaseInfo.Status.Active
case Status.Cancelled(_, _) => LeaseInfo.Status.Canceled
case Status.Expired(_) => LeaseInfo.Status.Expired
},
ld.status.cancelHeight,
ld.status.cancelTransactionId
)
def leaseInfo(leaseId: ByteStr): Option[LeaseInfo] = {
val bc = blockchain()
bc.leaseDetails(leaseId) map { ld =>
LeaseInfo(
leaseId,
ld.sourceId,
ld.sender.toAddress,
bc.resolveAlias(ld.recipient).orElse(resolveDisabledAlias(leaseId)).explicitGet(),
ld.amount,
ld.height,
ld.status match {
case Status.Active => LeaseInfo.Status.Active
case Status.Cancelled(_, _) => LeaseInfo.Status.Canceled
case Status.Expired(_) => LeaseInfo.Status.Expired
},
ld.status.cancelHeight,
ld.status.cancelTransactionId
)
}
}

private[this] def leaseIsActive(id: ByteStr): Boolean =
blockchain.leaseDetails(id).exists(_.isActive)
blockchain().leaseDetails(id).exists(_.isActive)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,28 @@ trait CommonAssetsApi {
object CommonAssetsApi {
final case class AssetInfo(description: AssetDescription, issueTransaction: Option[IssueTransaction], sponsorBalance: Option[Long])

def apply(diff: () => Diff, db: DB, blockchain: Blockchain): CommonAssetsApi = new CommonAssetsApi {
def apply(diff: () => Diff, db: DB, blockchain: () => Blockchain): CommonAssetsApi = new CommonAssetsApi {
def description(assetId: IssuedAsset): Option[AssetDescription] =
blockchain.assetDescription(assetId)
blockchain().assetDescription(assetId)

def fullInfo(assetId: IssuedAsset): Option[AssetInfo] =
def fullInfo(assetId: IssuedAsset): Option[AssetInfo] = {
val bc = blockchain()
for {
assetInfo <- blockchain.assetDescription(assetId)
sponsorBalance = if (assetInfo.sponsorship != 0) Some(blockchain.wavesPortfolio(assetInfo.issuer.toAddress).spendableBalance) else None
assetInfo <- bc.assetDescription(assetId)
sponsorBalance = if (assetInfo.sponsorship != 0) Some(bc.wavesPortfolio(assetInfo.issuer.toAddress).spendableBalance) else None
} yield AssetInfo(
assetInfo,
blockchain.transactionInfo(assetId.id).collect { case (tm, it: IssueTransaction) if tm.succeeded => it },
bc.transactionInfo(assetId.id).collect { case (tm, it: IssueTransaction) if tm.succeeded => it },
sponsorBalance
)
}

override def wavesDistribution(height: Int, after: Option[Address]): Observable[(Address, Long)] =
balanceDistribution(
db,
height,
after,
if (height == blockchain.height) diff().portfolios else Map.empty[Address, Portfolio],
if (height == blockchain().height) diff().portfolios else Map.empty[Address, Portfolio],
KeyTags.WavesBalance.prefixBytes,
bs => AddressId.fromByteArray(bs.slice(2, bs.length - 4)),
_.balance
Expand All @@ -53,7 +55,7 @@ object CommonAssetsApi {
db,
height,
after,
if (height == blockchain.height) diff().portfolios else Map.empty[Address, Portfolio],
if (height == blockchain().height) diff().portfolios else Map.empty[Address, Portfolio],
KeyTags.AssetBalance.prefixBytes ++ asset.id.arr,
bs => AddressId.fromByteArray(bs.slice(2 + crypto.DigestLength, bs.length - 4)),
_.assets.getOrElse(asset, 0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object CommonTransactionsApi {
def apply(
maybeDiff: => Option[(Height, Diff)],
db: DB,
blockchain: Blockchain,
blockchain: () => Blockchain,
utx: UtxPool,
publishTransaction: Transaction => Future[TracedResult[ValidationError, Boolean]],
blockAt: Int => Option[(BlockMeta, Seq[(TxMeta, Transaction)])]
Expand All @@ -63,7 +63,7 @@ object CommonTransactionsApi {
common.addressTransactions(db, maybeDiff, subject, sender, transactionTypes, fromId)

override def transactionById(transactionId: ByteStr): Option[TransactionMeta] =
blockchain.transactionInfo(transactionId).map(common.loadTransactionMeta(db, maybeDiff))
blockchain().transactionInfo(transactionId).map(common.loadTransactionMeta(db, maybeDiff))

override def unconfirmedTransactions: Seq[Transaction] = utx.all

Expand All @@ -72,7 +72,7 @@ object CommonTransactionsApi {

override def calculateFee(tx: Transaction): Either[ValidationError, (Asset, Long, Long)] =
FeeValidation
.getMinFee(blockchain, tx)
.getMinFee(blockchain(), tx)
.map {
case FeeDetails(asset, _, feeInAsset, feeInWaves) =>
(asset, feeInAsset, feeInWaves)
Expand All @@ -83,7 +83,7 @@ object CommonTransactionsApi {
override def transactionProofs(transactionIds: List[ByteStr]): List[TransactionProof] =
for {
transactionId <- transactionIds
(txm, tx) <- blockchain.transactionInfo(transactionId)
(txm, tx) <- blockchain().transactionInfo(transactionId)
(meta, allTransactions) <- blockAt(txm.height) if meta.header.version >= Block.ProtoBlockVersion
transactionProof <- block.transactionProof(tx, allTransactions.map(_._2))
} yield transactionProof
Expand Down
Loading