Skip to content

Commit

Permalink
Bloom filter in diff (#3765)
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Oct 18, 2022
1 parent bfc827b commit 6b2b035
Show file tree
Hide file tree
Showing 31 changed files with 326 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ object StateUpdate {
)
}

diff.transactions.values.map { tx =>
diff.transactions.map { tx =>
TransactionMetadata(
tx.transaction match {
case a: Authorized => a.sender.toAddress.toByteString
Expand Down
20 changes: 0 additions & 20 deletions lang/shared/src/main/scala/com/wavesplatform/lang/Testing.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,12 @@ object CryptoContext {
("index", LONG)
) {
case xs @ ARR(proof) :: CONST_BYTESTR(value) :: CONST_LONG(index) :: Nil =>
if (value.size == 32 && proof.length <= 16 && proof.forall({
case CONST_BYTESTR(v) => v.size == 32
case _ => false
})) {
CONST_BYTESTR(ByteStr(createRoot(value.arr, Math.toIntExact(index), proof.reverse.map({
case CONST_BYTESTR(v) => v.arr
case _ => throw new Exception("Expect ByteStr")
}))))
val filteredProofs = proof.collect {
case bs@CONST_BYTESTR(v) if v.size == 32 => bs
}

if (value.size == 32 && proof.length <= 16 && filteredProofs.size == proof.size) {
CONST_BYTESTR(ByteStr(createRoot(value.arr, Math.toIntExact(index), filteredProofs.reverse.map(_.bs.arr))))
} else {
notImplemented[Id, EVALUATED](s"createMerkleRoot(merkleProof: ByteVector, valueBytes: ByteVector)", xs)
}
Expand Down
18 changes: 18 additions & 0 deletions lang/testkit/src/main/scala/com/wavesplatform/lang/Testing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.wavesplatform.lang
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.lang.v1.compiler.Terms.*

object Testing {

def evaluated(i: Any): Either[ExecutionError, EVALUATED] = i match {
case s: String => CONST_STRING(s)
case s: Long => Right(CONST_LONG(s))
case s: Int => Right(CONST_LONG(s))
case s: ByteStr => CONST_BYTESTR(s)
case s: CaseObj => Right(s)
case s: Boolean => Right(CONST_BOOLEAN(s))
case a: Seq[?] => ARR(a.map(x => evaluated(x).explicitGet()).toIndexedSeq, false)
case _ => Left("Bad Assert: unexpected type")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object AddressTransactions {
): Seq[(TxMeta, Transaction)] =
(for {
(height, diff) <- maybeDiff.toSeq
nti <- diff.transactions.values.toSeq.reverse
nti <- diff.transactions.toSeq.reverse
if nti.affected(subject)
} yield (TxMeta(height, nti.applied, nti.spentComplexity), nti.transaction))
.dropWhile { case (_, tx) => fromId.isDefined && !fromId.contains(tx.id()) }
Expand Down
8 changes: 4 additions & 4 deletions node/src/main/scala/com/wavesplatform/database/Caches.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ abstract class Caches(spendableBalanceChanged: Observer[(Address, Asset)]) exten

val newAddresses = Set.newBuilder[Address]
newAddresses ++= diff.portfolios.keys.filter(addressIdCache.get(_).isEmpty)
for (NewTransactionInfo(_, addresses, _, _) <- diff.transactions.values; address <- addresses if addressIdCache.get(address).isEmpty) {
for (NewTransactionInfo(_, addresses, _, _) <- diff.transactions; address <- addresses if addressIdCache.get(address).isEmpty) {
newAddresses += address
}

Expand All @@ -210,11 +210,11 @@ abstract class Caches(spendableBalanceChanged: Observer[(Address, Asset)]) exten

val transactionMeta = Seq.newBuilder[(TxMeta, Transaction)]
val addressTransactions = ArrayListMultimap.create[AddressId, TransactionId]()
for (((id, nti), _) <- diff.transactions.zipWithIndex) {
transactionIds.put(id, newHeight)
for (nti <- diff.transactions) {
transactionIds.put(nti.transaction.id(), newHeight)
transactionMeta += (TxMeta(Height(newHeight), nti.applied, nti.spentComplexity) -> nti.transaction)
for (addr <- nti.affected) {
addressTransactions.put(addressIdWithFallback(addr, newAddressIds), TransactionId(id))
addressTransactions.put(addressIdWithFallback(addr, newAddressIds), TransactionId(nti.transaction.id()))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class BlockchainUpdaterImpl(
ngState
.flatMap(_.totalDiffOf(id))
.map { case (_, diff, _, _, _) =>
diff.transactions.values.toSeq.map(info => (TxMeta(Height(height), info.applied, info.spentComplexity), info.transaction))
diff.transactions.toSeq.map(info => (TxMeta(Height(height), info.applied, info.spentComplexity), info.transaction))
}
)

Expand Down
160 changes: 134 additions & 26 deletions node/src/main/scala/com/wavesplatform/state/Diff.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import cats.data.Ior
import cats.implicits.{catsSyntaxSemigroup, toFlatMapOps, toFunctorOps}
import cats.kernel.{Monoid, Semigroup}
import cats.syntax.either.*
import com.google.common.hash.{BloomFilter, Funnels}
import com.google.protobuf.ByteString
import com.wavesplatform.account.{Address, AddressOrAlias, Alias, PublicKey}
import com.wavesplatform.account.{Address, Alias, PublicKey}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.protobuf.EthereumTransactionMeta
import com.wavesplatform.features.BlockchainFeatures
Expand All @@ -19,7 +20,7 @@ import com.wavesplatform.transaction.TxValidationError.GenericError
import com.wavesplatform.transaction.smart.InvokeTransaction
import com.wavesplatform.transaction.{Asset, EthereumTransaction, Transaction}

import scala.collection.immutable.VectorMap
import scala.util.chaining.*

case class LeaseBalance(in: Long, out: Long) {
def combineF[F[_]: Monad](that: LeaseBalance)(implicit s: Summarizer[F]): F[LeaseBalance] =
Expand Down Expand Up @@ -145,34 +146,54 @@ case class NewTransactionInfo(transaction: Transaction, affected: Set[Address],

case class NewAssetInfo(static: AssetStaticInfo, dynamic: AssetInfo, volume: AssetVolumeInfo)

case class LeaseActionInfo(invokeId: ByteStr, dAppPublicKey: PublicKey, recipient: AddressOrAlias, amount: Long)

case class Diff(
transactions: collection.Map[ByteStr, NewTransactionInfo] = VectorMap.empty,
portfolios: Map[Address, Portfolio] = Map.empty,
issuedAssets: Map[IssuedAsset, NewAssetInfo] = Map.empty,
updatedAssets: Map[IssuedAsset, Ior[AssetInfo, AssetVolumeInfo]] = Map.empty,
aliases: Map[Alias, Address] = Map.empty,
orderFills: Map[ByteStr, VolumeAndFee] = Map.empty,
leaseState: Map[ByteStr, LeaseDetails] = Map.empty,
scripts: Map[Address, Option[AccountScriptInfo]] = Map.empty,
assetScripts: Map[IssuedAsset, Option[AssetScriptInfo]] = Map.empty,
accountData: Map[Address, AccountDataInfo] = Map.empty,
sponsorship: Map[IssuedAsset, Sponsorship] = Map.empty,
scriptsRun: Int = 0,
scriptsComplexity: Long = 0,
scriptResults: Map[ByteStr, InvokeScriptResult] = Map.empty,
ethereumTransactionMeta: Map[ByteStr, EthereumTransactionMeta] = Map.empty
case class Diff private (
transactions: Vector[NewTransactionInfo],
portfolios: Map[Address, Portfolio],
issuedAssets: Map[IssuedAsset, NewAssetInfo],
updatedAssets: Map[IssuedAsset, Ior[AssetInfo, AssetVolumeInfo]],
aliases: Map[Alias, Address],
orderFills: Map[ByteStr, VolumeAndFee],
leaseState: Map[ByteStr, LeaseDetails],
scripts: Map[Address, Option[AccountScriptInfo]],
assetScripts: Map[IssuedAsset, Option[AssetScriptInfo]],
accountData: Map[Address, AccountDataInfo],
sponsorship: Map[IssuedAsset, Sponsorship],
scriptsRun: Int,
scriptsComplexity: Long,
scriptResults: Map[ByteStr, InvokeScriptResult],
ethereumTransactionMeta: Map[ByteStr, EthereumTransactionMeta],
transactionFilter: Option[BloomFilter[Array[Byte]]]
) {
@inline
final def combineE(newer: Diff): Either[ValidationError, Diff] = combineF(newer).leftMap(GenericError(_))

def transaction(txId: ByteStr): Option[NewTransactionInfo] =
if (transactions.nonEmpty && transactionFilter.exists(_.mightContain(txId.arr)))
transactions.find(_.transaction.id() == txId)
else None

def withScriptsComplexity(newScriptsComplexity: Long): Diff = copy(scriptsComplexity = newScriptsComplexity)

def withScriptResults(newScriptResults: Map[ByteStr, InvokeScriptResult]): Diff = copy(scriptResults = newScriptResults)

def withScriptRuns(newScriptRuns: Int): Diff = copy(scriptsRun = newScriptRuns)

def withPortfolios(newPortfolios: Map[Address, Portfolio]): Diff = copy(portfolios = newPortfolios)

def combineF(newer: Diff): Either[String, Diff] =
Diff
.combine(portfolios, newer.portfolios)
.map(portfolios =>
.map { portfolios =>
val newTransactions = if (transactions.isEmpty) newer.transactions else transactions ++ newer.transactions
val newFilter = transactionFilter match {
case Some(bf) =>
newer.transactions.foreach(nti => bf.put(nti.transaction.id().arr))
Some(bf)
case None => newer.transactionFilter
}

Diff(
transactions = transactions ++ newer.transactions,
transactions = newTransactions,
portfolios = portfolios,
issuedAssets = issuedAssets ++ newer.issuedAssets,
updatedAssets = updatedAssets |+| newer.updatedAssets,
Expand All @@ -186,12 +207,84 @@ case class Diff(
scriptsRun = scriptsRun + newer.scriptsRun,
scriptResults = scriptResults.combine(newer.scriptResults),
scriptsComplexity = scriptsComplexity + newer.scriptsComplexity,
ethereumTransactionMeta = ethereumTransactionMeta ++ newer.ethereumTransactionMeta
ethereumTransactionMeta = ethereumTransactionMeta ++ newer.ethereumTransactionMeta,
transactionFilter = newFilter
)
)
}
}

object Diff {
def apply(
portfolios: Map[Address, Portfolio] = Map.empty,
issuedAssets: Map[IssuedAsset, NewAssetInfo] = Map.empty,
updatedAssets: Map[IssuedAsset, Ior[AssetInfo, AssetVolumeInfo]] = Map.empty,
aliases: Map[Alias, Address] = Map.empty,
orderFills: Map[ByteStr, VolumeAndFee] = Map.empty,
leaseState: Map[ByteStr, LeaseDetails] = Map.empty,
scripts: Map[Address, Option[AccountScriptInfo]] = Map.empty,
assetScripts: Map[IssuedAsset, Option[AssetScriptInfo]] = Map.empty,
accountData: Map[Address, AccountDataInfo] = Map.empty,
sponsorship: Map[IssuedAsset, Sponsorship] = Map.empty,
scriptsRun: Int = 0,
scriptsComplexity: Long = 0,
scriptResults: Map[ByteStr, InvokeScriptResult] = Map.empty,
ethereumTransactionMeta: Map[ByteStr, EthereumTransactionMeta] = Map.empty
): Diff =
new Diff(
Vector.empty,
portfolios,
issuedAssets,
updatedAssets,
aliases,
orderFills,
leaseState,
scripts,
assetScripts,
accountData,
sponsorship,
scriptsRun,
scriptsComplexity,
scriptResults,
ethereumTransactionMeta,
None
)

def withTransactions(
nti: Vector[NewTransactionInfo],
portfolios: Map[Address, Portfolio] = Map.empty,
issuedAssets: Map[IssuedAsset, NewAssetInfo] = Map.empty,
updatedAssets: Map[IssuedAsset, Ior[AssetInfo, AssetVolumeInfo]] = Map.empty,
aliases: Map[Alias, Address] = Map.empty,
orderFills: Map[ByteStr, VolumeAndFee] = Map.empty,
leaseState: Map[ByteStr, LeaseDetails] = Map.empty,
scripts: Map[Address, Option[AccountScriptInfo]] = Map.empty,
assetScripts: Map[IssuedAsset, Option[AssetScriptInfo]] = Map.empty,
accountData: Map[Address, AccountDataInfo] = Map.empty,
sponsorship: Map[IssuedAsset, Sponsorship] = Map.empty,
scriptsRun: Int = 0,
scriptsComplexity: Long = 0,
scriptResults: Map[ByteStr, InvokeScriptResult] = Map.empty,
ethereumTransactionMeta: Map[ByteStr, EthereumTransactionMeta] = Map.empty
): Diff =
new Diff(
nti,
portfolios,
issuedAssets,
updatedAssets,
aliases,
orderFills,
leaseState,
scripts,
assetScripts,
accountData,
sponsorship,
scriptsRun,
scriptsComplexity,
scriptResults,
ethereumTransactionMeta,
mkFilterForTransactions(nti.map(_.transaction)*)
)

val empty: Diff = Diff()

def combine(portfolios1: Map[Address, Portfolio], portfolios2: Map[Address, Portfolio]): Either[String, Map[Address, Portfolio]] =
Expand All @@ -211,6 +304,17 @@ object Diff {
case (r, _) => r
}

private def mkFilter() =
BloomFilter.create[Array[Byte]](Funnels.byteArrayFunnel(), 10000, 0.01f)
private def mkFilterForTransactions(tx: Transaction*) =
Some(
mkFilter().tap(bf =>
tx.foreach { t =>
bf.put(t.id().arr)
}
)
)

implicit class DiffExt(private val d: Diff) extends AnyVal {
def errorMessage(txId: ByteStr): Option[InvokeScriptResult.ErrorMessage] =
d.scriptResults.get(txId).flatMap(_.error)
Expand All @@ -229,13 +333,17 @@ object Diff {
case et: EthereumTransaction =>
et.payload match {
case EthereumTransaction.Invocation(dApp, _) => Some(dApp)
case _ => None
case _ => None
}
case _ =>
None
}
val affectedAddresses = d.portfolios.keySet ++ d.accountData.keySet ++ calledScripts ++ maybeDApp
d.copy(transactions = VectorMap(tx.id() -> NewTransactionInfo(tx, affectedAddresses, applied, d.scriptsComplexity)))

d.copy(
transactions = Vector(NewTransactionInfo(tx, affectedAddresses, applied, d.scriptsComplexity)),
transactionFilter = mkFilterForTransactions(tx)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object EthereumTransactionDiff {
val resultEi = e.payload match {
case et: EthereumTransaction.Transfer =>
for (assetId <- et.tryResolveAsset(blockchain))
yield Diff.empty.copy(
yield Diff(
ethereumTransactionMeta = Map(
e.id() -> EthereumTransactionMeta(
EthereumTransactionMeta.Payload.Transfer(
Expand All @@ -31,7 +31,7 @@ object EthereumTransactionDiff {
case ei: EthereumTransaction.Invocation =>
for {
invocation <- ei.toInvokeScriptLike(e, blockchain)
} yield Diff.empty.copy(
} yield Diff(
ethereumTransactionMeta = Map(
e.id() -> EthereumTransactionMeta(
EthereumTransactionMeta.Payload.Invocation(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.wavesplatform.state.diffs

import com.wavesplatform.lang.ValidationError
import com.wavesplatform.state._
import com.wavesplatform.transaction.lease._
import com.wavesplatform.state.*
import com.wavesplatform.transaction.lease.*

object LeaseTransactionsDiff {
def lease(blockchain: Blockchain)(tx: LeaseTransaction): Either[ValidationError, Diff] =
DiffsCommon
.processLease(blockchain, tx.amount.value, tx.sender, tx.recipient, tx.fee.value, tx.id(), tx.id())
.map(_.copy(scriptsRun = DiffsCommon.countScriptRuns(blockchain, tx)))
.map(_.withScriptRuns(DiffsCommon.countScriptRuns(blockchain, tx)))

def leaseCancel(blockchain: Blockchain, time: Long)(tx: LeaseCancelTransaction): Either[ValidationError, Diff] =
DiffsCommon
.processLeaseCancel(blockchain, tx.sender, tx.fee.value, time, tx.leaseId, tx.id())
.map(_.copy(scriptsRun = DiffsCommon.countScriptRuns(blockchain, tx)))
.map(_.withScriptRuns(DiffsCommon.countScriptRuns(blockchain, tx)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import com.wavesplatform.transaction.smart.script.trace.{TraceStep, TracedResult
import com.wavesplatform.transaction.transfer.{MassTransferTransaction, TransferTransaction}
import play.api.libs.json.Json

import scala.collection.immutable.VectorMap

object TransactionDiffer {
def apply(prevBlockTs: Option[Long], currentBlockTs: Long, verify: Boolean = true)(
blockchain: Blockchain,
Expand Down Expand Up @@ -298,8 +296,8 @@ object TransactionDiffer {
case e: EthereumTransaction => EthereumTransactionDiff.meta(blockchain)(e)
case _ => Diff.empty
}
Diff(
transactions = VectorMap((tx.id(), NewTransactionInfo(tx, affectedAddresses, applied = false, spentComplexity))),
Diff.withTransactions(
Vector(NewTransactionInfo(tx, affectedAddresses, applied = false, spentComplexity)),
portfolios = portfolios,
scriptResults = scriptResult.fold(Map.empty[ByteStr, InvokeScriptResult])(sr => Map(tx.id() -> sr)),
scriptsComplexity = spentComplexity
Expand Down
Loading

0 comments on commit 6b2b035

Please sign in to comment.