diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala index d53d1cf88b..5f895c14d6 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala @@ -17,7 +17,7 @@ trait MicroBlockMiner { account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[Unit] } diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index de83f87765..f6b0f764b5 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -45,15 +45,15 @@ class MicroBlockMinerImpl( account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[Unit] = - generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, lastMicroBlock) + generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs) .flatMap { case res @ Success(newBlock, newConstraint) => Task.defer(generateMicroBlockSequence(account, newBlock, newConstraint, res.nanoTime)) case Retry => Task - .defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, lastMicroBlock)) + .defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs)) .delayExecution(1 second) case Stop => setDebugState(MinerDebugInfo.MiningBlocks) @@ -65,7 +65,7 @@ class MicroBlockMinerImpl( account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[MicroBlockMiningResult] = { val packTask = Task.cancelable[(Option[Seq[Transaction]], MiningConstraint, Option[ByteStr])] { cb => @volatile var cancelled = false @@ -93,8 +93,8 @@ class MicroBlockMinerImpl( ) ) ) - log.trace(s"Finished pack for ${accumulatedBlock.id()}") val updatedTotalConstraint = updatedMdConstraint.head + log.trace(s"Finished pack for ${accumulatedBlock.id()}, updated total constraint: $updatedTotalConstraint") cb.onSuccess((unconfirmed, updatedTotalConstraint, stateHash)) } Task.eval { @@ -104,27 +104,25 @@ class MicroBlockMinerImpl( packTask.flatMap { case (Some(unconfirmed), updatedTotalConstraint, stateHash) if unconfirmed.nonEmpty => - val delay = { - val delay = System.nanoTime() - lastMicroBlock - val requiredDelay = settings.microBlockInterval.toNanos - if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos - } - for { - _ <- Task.now(if (delay > Duration.Zero) log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock")) - _ <- Task.sleep(delay) - _ = log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") blocks <- forgeBlocks(account, accumulatedBlock, unconfirmed, stateHash) .leftWiden[Throwable] .liftTo[Task] (signedBlock, microBlock) = blocks - blockId <- appendMicroBlock(microBlock) - _ = BlockStats.mined(microBlock, blockId) - _ <- broadcastMicroBlock(account, microBlock, blockId) - } yield { + delay = { + val delay = System.nanoTime() - prevMicroBlockTs + val requiredDelay = settings.microBlockInterval.toNanos + if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos + } + _ <- + if (delay > Duration.Zero) { + log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock") + Task.sleep(delay) + } else Task.unit + _ <- appendMicroBlock(microBlock, account) + } yield if (updatedTotalConstraint.isFull) Stop else Success(signedBlock, updatedTotalConstraint) - } case (_, updatedTotalConstraint, _) => if (updatedTotalConstraint.isFull) { @@ -142,23 +140,27 @@ class MicroBlockMinerImpl( } } - private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] = - Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference))) - - private def appendMicroBlock(microBlock: MicroBlock): Task[BlockId] = - MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None) - .flatMap { - case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err)) - case Right(v) => Task.now(v) - } + private def appendMicroBlock(microBlock: MicroBlock, account: KeyPair): Task[BlockId] = + MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None).flatMap { + case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err)) + case Right(blockId) => + Task.evalAsync { + BlockStats.mined(microBlock, blockId) + if (allChannels != null) { + allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference)) + } + blockId + } + }.uncancelable private def forgeBlocks( account: KeyPair, accumulatedBlock: Block, - unconfirmed: Seq[Transaction], + packedTxs: Seq[Transaction], stateHash: Option[ByteStr] ): Either[MicroBlockMiningError, (Block, MicroBlock)] = microBlockBuildTimeStats.measureSuccessful { + log.trace(s"Forging microBlock for ${account.toAddress}") for { signedBlock <- Block .buildAndSign( @@ -167,7 +169,7 @@ class MicroBlockMinerImpl( reference = accumulatedBlock.header.reference, baseTarget = accumulatedBlock.header.baseTarget, generationSignature = accumulatedBlock.header.generationSignature, - txs = accumulatedBlock.transactionData ++ unconfirmed, + txs = accumulatedBlock.transactionData ++ packedTxs, signer = account, featureVotes = accumulatedBlock.header.featureVotes, rewardVote = accumulatedBlock.header.rewardVote, @@ -176,7 +178,7 @@ class MicroBlockMinerImpl( ) .leftMap(BlockBuildError) microBlock <- MicroBlock - .buildAndSign(signedBlock.header.version, account, unconfirmed, accumulatedBlock.id(), signedBlock.signature, stateHash) + .buildAndSign(signedBlock.header.version, account, packedTxs, accumulatedBlock.id(), signedBlock.signature, stateHash) .leftMap(MicroBlockBuildError) } yield (signedBlock, microBlock) } diff --git a/node/src/main/scala/com/wavesplatform/network/messages.scala b/node/src/main/scala/com/wavesplatform/network/messages.scala index 896b906246..f5b09c7413 100644 --- a/node/src/main/scala/com/wavesplatform/network/messages.scala +++ b/node/src/main/scala/com/wavesplatform/network/messages.scala @@ -80,9 +80,9 @@ case class MicroBlockInv(sender: PublicKey, totalBlockId: ByteStr, reference: By } object MicroBlockInv { - def apply(sender: KeyPair, totalBlockRef: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = { - val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockRef.arr ++ prevBlockRef.arr) - new MicroBlockInv(sender.publicKey, totalBlockRef, prevBlockRef, signature) + def apply(sender: KeyPair, totalBlockId: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = { + val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockId.arr ++ prevBlockRef.arr) + new MicroBlockInv(sender.publicKey, totalBlockId, prevBlockRef, signature) } }