Skip to content

Commit

Permalink
NODE-2320: Fix invalid liquid rollbacks (blockchain-updates) (1.2.x) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq authored May 13, 2021
1 parent 8b44f02 commit 2bb6bfa
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import java.nio.file.Files

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
import scala.reflect.ClassTag

import com.google.protobuf.ByteString
import com.wavesplatform.account.Address
import com.wavesplatform.api.common.CommonBlocksApi
import com.wavesplatform.block.{Block, MicroBlock}
Expand All @@ -20,7 +20,6 @@ import com.wavesplatform.events.protobuf.serde._
import com.wavesplatform.events.repo.UpdatesRepoImpl
import com.wavesplatform.features.BlockchainFeatures
import com.wavesplatform.history.Domain
import com.wavesplatform.protobuf.Amount
import com.wavesplatform.settings.{Constants, FunctionalitySettings, TestFunctionalitySettings, WavesSettings}
import com.wavesplatform.state.{AssetDescription, Blockchain, EmptyDataEntry, Height, LeaseBalance, StringDataEntry}
import com.wavesplatform.state.diffs.BlockDiffer
Expand All @@ -35,13 +34,15 @@ import org.scalatest.{FreeSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures

class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with ScalaFutures with PathMockFactory {
var currentSettings: WavesSettings = domainSettingsWithFS(TestFunctionalitySettings.withFeatures(
BlockchainFeatures.BlockReward,
BlockchainFeatures.NG,
BlockchainFeatures.SmartAccounts,
BlockchainFeatures.DataTransaction,
BlockchainFeatures.FeeSponsorship
))
var currentSettings: WavesSettings = domainSettingsWithFS(
TestFunctionalitySettings.withFeatures(
BlockchainFeatures.BlockReward,
BlockchainFeatures.NG,
BlockchainFeatures.SmartAccounts,
BlockchainFeatures.DataTransaction,
BlockchainFeatures.FeeSponsorship
)
)

def currentFS: FunctionalitySettings = currentSettings.blockchainSettings.functionalitySettings

Expand Down Expand Up @@ -69,8 +70,32 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
}
}

"should survive invalid micro rollback" in withDomainAndRepo { case (d, repo) =>
d.appendKeyBlock()
val sub = repo.createSubscription(SubscribeRequest(1))
val mb1Id = d.appendMicroBlock(TxHelpers.transfer())
val mb2Id = d.appendMicroBlock(TxHelpers.transfer())
d.appendMicroBlock(TxHelpers.transfer())

d.blockchain.removeAfter(mb1Id) // Should not do anything
d.appendKeyBlock(mb2Id)

sub.cancel()
val result = sub.futureValue.map(_.toUpdate)
result should matchPattern {
case Seq(
E.Block(1, _),
E.Micro(1, _),
E.Micro(1, _),
E.Micro(1, _),
E.MicroRollback(1, `mb2Id`),
E.Block(2, _)
) =>
}
}

"should include correct waves amount" in withNEmptyBlocksSubscription() { result =>
val balances = result.map(_.getAppend.getBlock.updatedWavesAmount)
val balances = result.collect { case b: BlockAppended => b.updatedWavesAmount }
balances shouldBe Seq(10000000000000000L, 10000000600000000L, 10000001200000000L)
}

Expand Down Expand Up @@ -102,17 +127,25 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
PaymentTransaction.create(TxHelpers.defaultSigner, TxHelpers.secondAddress, 100, 100000, TxHelpers.timestamp).explicitGet()
d.appendBlock(tx)
} { results =>
val reward = 600000000
val reward = 600000000
val genesisAmount = Constants.TotalWaves * Constants.UnitsInWave + reward
val genesis = results.head.getAppend.transactionStateUpdates.head.balances.head
genesis.address shouldBe ByteString.copyFrom(TxHelpers.defaultAddress.bytes)
genesis.getAmountAfter shouldBe Amount(ByteString.EMPTY, genesisAmount)
genesis.amountBefore shouldBe reward

val payment = results.last.getAppend.transactionStateUpdates.head.balances.last
payment.address shouldBe ByteString.copyFrom(TxHelpers.secondAddress.bytes)
payment.getAmountAfter shouldBe Amount(ByteString.EMPTY, 100)
payment.amountBefore shouldBe 0
val genesis = results.head match {
case bu: BlockAppended => bu.transactionStateUpdates.head.balances.head
case _ => ???
}
genesis.address shouldBe TxHelpers.defaultAddress
genesis.after shouldBe genesisAmount
genesis.before shouldBe reward
genesis.asset shouldBe Waves

val payment = results.last match {
case bu: BlockAppended => bu.transactionStateUpdates.last.balances.last
case _ => ???
}
payment.address shouldBe TxHelpers.secondAddress
payment.after shouldBe 100
payment.before shouldBe 0
payment.asset shouldBe Waves
})

"should fail stream with invalid range" in {
Expand Down Expand Up @@ -140,7 +173,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
withGenerateSubscription() { d =>
d.appendBlock(issue)
} { events =>
val event = events.last.vanilla.get.asInstanceOf[BlockAppended]
val event = events.last.asInstanceOf[BlockAppended]
val issued = event.transactionStateUpdates.head.assets
issued shouldBe Seq(AssetStateUpdate(issue.assetId, None, Some(description)))
event.referencedAssets shouldBe Seq(AssetInfo(issue.assetId, description.decimals, description.name.toStringUtf8))
Expand Down Expand Up @@ -174,7 +207,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
d.appendKeyBlock()
d.rollbackTo(1)
} { events =>
val rollback: RollbackResult = events.last.vanilla.get.asInstanceOf[RollbackCompleted].rollbackResult
val rollback: RollbackResult = events.collectFirst { case r: RollbackCompleted => r.rollbackResult }.get
rollback.removedTransactionIds shouldBe Seq(data, reissue, issue, lease, transfer).map(_.id())
rollback.removedBlocks should have length 1

Expand Down Expand Up @@ -204,12 +237,11 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with

withGenerateSubscription() { d =>
d.appendKeyBlock()
d.appendMicroBlock(TxHelpers.transfer())
val firstMicroId = d.appendMicroBlock(TxHelpers.transfer())
d.appendMicroBlock(transfer, lease, issue, reissue, data)
d.rollbackMicros()
d.appendKeyBlock(firstMicroId)
} { events =>
import com.wavesplatform.events.protobuf.serde._
val rollback: RollbackResult = events.last.vanilla.get.asInstanceOf[MicroBlockRollbackCompleted].rollbackResult
val rollback: RollbackResult = events.collectFirst { case r: MicroBlockRollbackCompleted => r.rollbackResult }.get
rollback.removedTransactionIds shouldBe Seq(data, reissue, issue, lease, transfer).map(_.id())
rollback.removedBlocks shouldBe empty

Expand Down Expand Up @@ -270,7 +302,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
}

def withGenerateSubscription(request: SubscribeRequest = SubscribeRequest.of(1, Int.MaxValue))(generateBlocks: Domain => Unit)(
f: Seq[protobuf.BlockchainUpdated] => Unit
f: Seq[BlockchainUpdated] => Unit
): Unit = {
withDomainAndRepo { (d, repo) =>
d.appendBlock(TxHelpers.genesis(TxHelpers.defaultSigner.toAddress, Constants.TotalWaves * Constants.UnitsInWave))
Expand All @@ -281,7 +313,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
subscription.cancel()

val result = Await.result(subscription, 20 seconds)
f(result.map(_.getUpdate))
f(result.map(_.toUpdate))
}
}

Expand All @@ -300,7 +332,7 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with
}

def withNEmptyBlocksSubscription(count: Int = 2, request: SubscribeRequest = SubscribeRequest.of(1, Int.MaxValue))(
f: Seq[protobuf.BlockchainUpdated] => Unit
f: Seq[BlockchainUpdated] => Unit
): Unit = withGenerateSubscription(request)(d => for (_ <- 1 to count) d.appendBlock())(f)

def withRepo[T](blocksApi: CommonBlocksApi = stub[CommonBlocksApi])(f: (UpdatesRepoImpl, BlockchainUpdateTriggers) => T): T = {
Expand Down Expand Up @@ -364,4 +396,23 @@ class BlockchainUpdatesSpec extends FreeSpec with Matchers with WithDomain with

implicit def asGrpcService(updatesRepoImpl: UpdatesRepoImpl): BlockchainUpdatesApiGrpcImpl =
new BlockchainUpdatesApiGrpcImpl(updatesRepoImpl)

// Matchers
private[this] object E {
class EventMatcher[T: ClassTag] {
def unapply(bu: BlockchainUpdated): Option[(Int, ByteStr)] = bu match {
case ba if implicitly[ClassTag[T]].runtimeClass.isInstance(bu) => Some((ba.height, ba.id))
case _ => None
}
}

object Block extends EventMatcher[BlockAppended]
object Micro extends EventMatcher[MicroBlockAppended]
object Rollback extends EventMatcher[RollbackCompleted]
object MicroRollback extends EventMatcher[MicroBlockRollbackCompleted]
}

implicit class ProtoSubscribeEventOps(e: com.wavesplatform.events.api.grpc.protobuf.SubscribeEvent) {
def toUpdate: BlockchainUpdated = e.getUpdate.vanilla.get
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ class BlockchainUpdaterImpl(
val result = prevNgState match {
case Some(ng) if ng.contains(blockId) =>
log.trace("Resetting liquid block, no rollback necessary")
blockchainUpdateTriggers.onMicroBlockRollback(this, blockId)
Right(Seq.empty)
case Some(ng) if ng.base.id() == blockId =>
log.trace("Discarding liquid block, no rollback necessary")
Expand Down
25 changes: 7 additions & 18 deletions node/src/test/scala/com/wavesplatform/history/Domain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import cats.syntax.option._
import com.wavesplatform.account.Address
import com.wavesplatform.api.BlockMeta
import com.wavesplatform.api.common.{AddressPortfolio, AddressTransactions, CommonBlocksApi}
import com.wavesplatform.block.Block.BlockId
import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.block.Block.BlockId
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.database
import com.wavesplatform.database.{DBExt, Keys, LevelDBWriter}
import com.wavesplatform.events.BlockchainUpdateTriggers
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.state._
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.{BlockchainUpdater, _}
import com.wavesplatform.transaction.Asset.IssuedAsset
import org.iq80.leveldb.DB

case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWriter: LevelDBWriter) {
Expand Down Expand Up @@ -43,7 +43,7 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite

def appendMicroBlock(b: MicroBlock): BlockId = blockchainUpdater.processMicroBlock(b).explicitGet()

def lastBlockId: ByteStr = blockchainUpdater.lastBlockId.get
def lastBlockId: ByteStr = blockchainUpdater.lastBlockId.getOrElse(randomSig)

def carryFee: Long = blockchainUpdater.carryFee

Expand Down Expand Up @@ -77,13 +77,13 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite
lastBlock
}

def appendKeyBlock(): Block = {
val block = createBlock(Block.NgBlockVersion, Nil)
def appendKeyBlock(ref: ByteStr = lastBlockId): Block = {
val block = createBlock(Block.NgBlockVersion, Nil, ref)
appendBlock(block)
lastBlock
}

def appendMicroBlock(txs: Transaction*): Unit = {
def appendMicroBlock(txs: Transaction*): BlockId = {
val lastBlock = this.lastBlock
val block = lastBlock.copy(transactionData = lastBlock.transactionData ++ txs)
val signature = com.wavesplatform.crypto.sign(defaultSigner.privateKey, block.bodyBytes())
Expand All @@ -96,18 +96,7 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite
blockchainUpdater.removeAfter(blockId).explicitGet()
}

def rollbackMicros(offset: Int = 1): Unit = {
val blockId =
blockchainUpdater.microblockIds
.drop(offset)
.headOption
.getOrElse(throw new IllegalStateException("Insufficient count of microblocks"))

blockchainUpdater.removeAfter(blockId).explicitGet()
}

def createBlock(version: Byte, txs: Seq[Transaction]): Block = {
val reference = blockchainUpdater.lastBlockId.getOrElse(randomSig)
def createBlock(version: Byte, txs: Seq[Transaction], reference: ByteStr = lastBlockId): Block = {
val timestamp = System.currentTimeMillis()
Block
.buildAndSign(
Expand Down

0 comments on commit 2bb6bfa

Please sign in to comment.