From 2bb6bfa62d557c24e73f68d25b0d70aaea575e89 Mon Sep 17 00:00:00 2001 From: Le Karasique Date: Thu, 13 May 2021 17:17:59 +0300 Subject: [PATCH] NODE-2320: Fix invalid liquid rollbacks (blockchain-updates) (1.2.x) (#3473) --- .../events/BlockchainUpdatesSpec.scala | 109 +++++++++++++----- .../state/BlockchainUpdaterImpl.scala | 1 - .../com/wavesplatform/history/Domain.scala | 25 ++-- 3 files changed, 87 insertions(+), 48 deletions(-) diff --git a/grpc-server/src/test/scala/com/wavesplatform/events/BlockchainUpdatesSpec.scala b/grpc-server/src/test/scala/com/wavesplatform/events/BlockchainUpdatesSpec.scala index 69b224698ad..d42b323ae52 100644 --- a/grpc-server/src/test/scala/com/wavesplatform/events/BlockchainUpdatesSpec.scala +++ b/grpc-server/src/test/scala/com/wavesplatform/events/BlockchainUpdatesSpec.scala @@ -4,8 +4,8 @@ import java.nio.file.Files import scala.concurrent.{Await, Promise} import scala.concurrent.duration._ +import scala.reflect.ClassTag -import com.google.protobuf.ByteString import com.wavesplatform.account.Address import com.wavesplatform.api.common.CommonBlocksApi import com.wavesplatform.block.{Block, MicroBlock} @@ -20,7 +20,6 @@ import com.wavesplatform.events.protobuf.serde._ import com.wavesplatform.events.repo.UpdatesRepoImpl import com.wavesplatform.features.BlockchainFeatures import com.wavesplatform.history.Domain -import com.wavesplatform.protobuf.Amount import com.wavesplatform.settings.{Constants, FunctionalitySettings, TestFunctionalitySettings, WavesSettings} import com.wavesplatform.state.{AssetDescription, Blockchain, EmptyDataEntry, Height, LeaseBalance, StringDataEntry} import com.wavesplatform.state.diffs.BlockDiffer @@ -35,13 +34,15 @@ import org.scalatest.{FreeSpec, Matchers} import org.scalatest.concurrent.ScalaFutures class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with ScalaFutures with PathMockFactory { - var currentSettings: WavesSettings = domainSettingsWithFS(TestFunctionalitySettings.withFeatures( - BlockchainFeatures.BlockReward, - BlockchainFeatures.NG, - BlockchainFeatures.SmartAccounts, - BlockchainFeatures.DataTransaction, - BlockchainFeatures.FeeSponsorship - )) + var currentSettings: WavesSettings = domainSettingsWithFS( + TestFunctionalitySettings.withFeatures( + BlockchainFeatures.BlockReward, + BlockchainFeatures.NG, + BlockchainFeatures.SmartAccounts, + BlockchainFeatures.DataTransaction, + BlockchainFeatures.FeeSponsorship + ) + ) def currentFS: FunctionalitySettings = currentSettings.blockchainSettings.functionalitySettings @@ -69,8 +70,32 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with } } + "should survive invalid micro rollback" in withDomainAndRepo { case (d, repo) => + d.appendKeyBlock() + val sub = repo.createSubscription(SubscribeRequest(1)) + val mb1Id = d.appendMicroBlock(TxHelpers.transfer()) + val mb2Id = d.appendMicroBlock(TxHelpers.transfer()) + d.appendMicroBlock(TxHelpers.transfer()) + + d.blockchain.removeAfter(mb1Id) // Should not do anything + d.appendKeyBlock(mb2Id) + + sub.cancel() + val result = sub.futureValue.map(_.toUpdate) + result should matchPattern { + case Seq( + E.Block(1, _), + E.Micro(1, _), + E.Micro(1, _), + E.Micro(1, _), + E.MicroRollback(1, `mb2Id`), + E.Block(2, _) + ) => + } + } + "should include correct waves amount" in withNEmptyBlocksSubscription() { result => - val balances = result.map(_.getAppend.getBlock.updatedWavesAmount) + val balances = result.collect { case b: BlockAppended => b.updatedWavesAmount } balances shouldBe Seq(10000000000000000L, 10000000600000000L, 10000001200000000L) } @@ -102,17 +127,25 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with PaymentTransaction.create(TxHelpers.defaultSigner, TxHelpers.secondAddress, 100, 100000, TxHelpers.timestamp).explicitGet() d.appendBlock(tx) } { results => - val reward = 600000000 + val reward = 600000000 val genesisAmount = Constants.TotalWaves * Constants.UnitsInWave + reward - val genesis = results.head.getAppend.transactionStateUpdates.head.balances.head - genesis.address shouldBe ByteString.copyFrom(TxHelpers.defaultAddress.bytes) - genesis.getAmountAfter shouldBe Amount(ByteString.EMPTY, genesisAmount) - genesis.amountBefore shouldBe reward - - val payment = results.last.getAppend.transactionStateUpdates.head.balances.last - payment.address shouldBe ByteString.copyFrom(TxHelpers.secondAddress.bytes) - payment.getAmountAfter shouldBe Amount(ByteString.EMPTY, 100) - payment.amountBefore shouldBe 0 + val genesis = results.head match { + case bu: BlockAppended => bu.transactionStateUpdates.head.balances.head + case _ => ??? + } + genesis.address shouldBe TxHelpers.defaultAddress + genesis.after shouldBe genesisAmount + genesis.before shouldBe reward + genesis.asset shouldBe Waves + + val payment = results.last match { + case bu: BlockAppended => bu.transactionStateUpdates.last.balances.last + case _ => ??? + } + payment.address shouldBe TxHelpers.secondAddress + payment.after shouldBe 100 + payment.before shouldBe 0 + payment.asset shouldBe Waves }) "should fail stream with invalid range" in { @@ -140,7 +173,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with withGenerateSubscription() { d => d.appendBlock(issue) } { events => - val event = events.last.vanilla.get.asInstanceOf[BlockAppended] + val event = events.last.asInstanceOf[BlockAppended] val issued = event.transactionStateUpdates.head.assets issued shouldBe Seq(AssetStateUpdate(issue.assetId, None, Some(description))) event.referencedAssets shouldBe Seq(AssetInfo(issue.assetId, description.decimals, description.name.toStringUtf8)) @@ -174,7 +207,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with d.appendKeyBlock() d.rollbackTo(1) } { events => - val rollback: RollbackResult = events.last.vanilla.get.asInstanceOf[RollbackCompleted].rollbackResult + val rollback: RollbackResult = events.collectFirst { case r: RollbackCompleted => r.rollbackResult }.get rollback.removedTransactionIds shouldBe Seq(data, reissue, issue, lease, transfer).map(_.id()) rollback.removedBlocks should have length 1 @@ -204,12 +237,11 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with withGenerateSubscription() { d => d.appendKeyBlock() - d.appendMicroBlock(TxHelpers.transfer()) + val firstMicroId = d.appendMicroBlock(TxHelpers.transfer()) d.appendMicroBlock(transfer, lease, issue, reissue, data) - d.rollbackMicros() + d.appendKeyBlock(firstMicroId) } { events => - import com.wavesplatform.events.protobuf.serde._ - val rollback: RollbackResult = events.last.vanilla.get.asInstanceOf[MicroBlockRollbackCompleted].rollbackResult + val rollback: RollbackResult = events.collectFirst { case r: MicroBlockRollbackCompleted => r.rollbackResult }.get rollback.removedTransactionIds shouldBe Seq(data, reissue, issue, lease, transfer).map(_.id()) rollback.removedBlocks shouldBe empty @@ -270,7 +302,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with } def withGenerateSubscription(request: SubscribeRequest = SubscribeRequest.of(1, Int.MaxValue))(generateBlocks: Domain => Unit)( - f: Seq[protobuf.BlockchainUpdated] => Unit + f: Seq[BlockchainUpdated] => Unit ): Unit = { withDomainAndRepo { (d, repo) => d.appendBlock(TxHelpers.genesis(TxHelpers.defaultSigner.toAddress, Constants.TotalWaves * Constants.UnitsInWave)) @@ -281,7 +313,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with subscription.cancel() val result = Await.result(subscription, 20 seconds) - f(result.map(_.getUpdate)) + f(result.map(_.toUpdate)) } } @@ -300,7 +332,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with } def withNEmptyBlocksSubscription(count: Int = 2, request: SubscribeRequest = SubscribeRequest.of(1, Int.MaxValue))( - f: Seq[protobuf.BlockchainUpdated] => Unit + f: Seq[BlockchainUpdated] => Unit ): Unit = withGenerateSubscription(request)(d => for (_ <- 1 to count) d.appendBlock())(f) def withRepo[T](blocksApi: CommonBlocksApi = stub[CommonBlocksApi])(f: (UpdatesRepoImpl, BlockchainUpdateTriggers) => T): T = { @@ -364,4 +396,23 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with implicit def asGrpcService(updatesRepoImpl: UpdatesRepoImpl): BlockchainUpdatesApiGrpcImpl = new BlockchainUpdatesApiGrpcImpl(updatesRepoImpl) + + // Matchers + private[this] object E { + class EventMatcher[T: ClassTag] { + def unapply(bu: BlockchainUpdated): Option[(Int, ByteStr)] = bu match { + case ba if implicitly[ClassTag[T]].runtimeClass.isInstance(bu) => Some((ba.height, ba.id)) + case _ => None + } + } + + object Block extends EventMatcher[BlockAppended] + object Micro extends EventMatcher[MicroBlockAppended] + object Rollback extends EventMatcher[RollbackCompleted] + object MicroRollback extends EventMatcher[MicroBlockRollbackCompleted] + } + + implicit class ProtoSubscribeEventOps(e: com.wavesplatform.events.api.grpc.protobuf.SubscribeEvent) { + def toUpdate: BlockchainUpdated = e.getUpdate.vanilla.get + } } diff --git a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala index 964f7ec69f6..0b062dd7124 100644 --- a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala +++ b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala @@ -425,7 +425,6 @@ class BlockchainUpdaterImpl( val result = prevNgState match { case Some(ng) if ng.contains(blockId) => log.trace("Resetting liquid block, no rollback necessary") - blockchainUpdateTriggers.onMicroBlockRollback(this, blockId) Right(Seq.empty) case Some(ng) if ng.base.id() == blockId => log.trace("Discarding liquid block, no rollback necessary") diff --git a/node/src/test/scala/com/wavesplatform/history/Domain.scala b/node/src/test/scala/com/wavesplatform/history/Domain.scala index f9a7e8dd1b4..aaeaba34270 100644 --- a/node/src/test/scala/com/wavesplatform/history/Domain.scala +++ b/node/src/test/scala/com/wavesplatform/history/Domain.scala @@ -4,8 +4,8 @@ import cats.syntax.option._ import com.wavesplatform.account.Address import com.wavesplatform.api.BlockMeta import com.wavesplatform.api.common.{AddressPortfolio, AddressTransactions, CommonBlocksApi} -import com.wavesplatform.block.Block.BlockId import com.wavesplatform.block.{Block, MicroBlock} +import com.wavesplatform.block.Block.BlockId import com.wavesplatform.common.state.ByteStr import com.wavesplatform.common.utils.EitherExt2 import com.wavesplatform.database @@ -13,8 +13,8 @@ import com.wavesplatform.database.{DBExt, Keys, LevelDBWriter} import com.wavesplatform.events.BlockchainUpdateTriggers import com.wavesplatform.lang.ValidationError import com.wavesplatform.state._ -import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.{BlockchainUpdater, _} +import com.wavesplatform.transaction.Asset.IssuedAsset import org.iq80.leveldb.DB case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWriter: LevelDBWriter) { @@ -43,7 +43,7 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite def appendMicroBlock(b: MicroBlock): BlockId = blockchainUpdater.processMicroBlock(b).explicitGet() - def lastBlockId: ByteStr = blockchainUpdater.lastBlockId.get + def lastBlockId: ByteStr = blockchainUpdater.lastBlockId.getOrElse(randomSig) def carryFee: Long = blockchainUpdater.carryFee @@ -77,13 +77,13 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite lastBlock } - def appendKeyBlock(): Block = { - val block = createBlock(Block.NgBlockVersion, Nil) + def appendKeyBlock(ref: ByteStr = lastBlockId): Block = { + val block = createBlock(Block.NgBlockVersion, Nil, ref) appendBlock(block) lastBlock } - def appendMicroBlock(txs: Transaction*): Unit = { + def appendMicroBlock(txs: Transaction*): BlockId = { val lastBlock = this.lastBlock val block = lastBlock.copy(transactionData = lastBlock.transactionData ++ txs) val signature = com.wavesplatform.crypto.sign(defaultSigner.privateKey, block.bodyBytes()) @@ -96,18 +96,7 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite blockchainUpdater.removeAfter(blockId).explicitGet() } - def rollbackMicros(offset: Int = 1): Unit = { - val blockId = - blockchainUpdater.microblockIds - .drop(offset) - .headOption - .getOrElse(throw new IllegalStateException("Insufficient count of microblocks")) - - blockchainUpdater.removeAfter(blockId).explicitGet() - } - - def createBlock(version: Byte, txs: Seq[Transaction]): Block = { - val reference = blockchainUpdater.lastBlockId.getOrElse(randomSig) + def createBlock(version: Byte, txs: Seq[Transaction], reference: ByteStr = lastBlockId): Block = { val timestamp = System.currentTimeMillis() Block .buildAndSign(