diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java index 36266c60e9b..edce12cf32e 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java @@ -69,4 +69,9 @@ public boolean isEmpty() { public BftEvent poll(final long timeout, final TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } + + @Override + public String toString() { + return "BftEventQueue{" + "queue=" + queue + ", messageQueueLimit=" + messageQueueLimit + '}'; + } } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java index a85b440cf61..53258838a56 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java @@ -69,6 +69,7 @@ public void run() { private Optional nextEvent() { try { + // LOG.debug("TODO SLD queue: " + incomingQueue); return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS)); } catch (final InterruptedException interrupt) { // If the queue was interrupted propagate it and spin to check our shutdown status diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BlockTimer.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BlockTimer.java index 5649b69e8f1..0d204d2b322 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BlockTimer.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BlockTimer.java @@ -20,13 +20,20 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** Class for starting and keeping organised block timers */ public class BlockTimer { + private static final Logger LOG = LoggerFactory.getLogger(BlockTimer.class); + private final ForksSchedule forksSchedule; private final BftExecutors bftExecutors; private Optional> currentTimerTask; @@ -76,9 +83,6 @@ public synchronized boolean isRunning() { */ public synchronized void startTimer( final ConsensusRoundIdentifier round, final BlockHeader chainHeadHeader) { - cancelTimer(); - - final long now = clock.millis(); // absolute time when the timer is supposed to expire final int blockPeriodSeconds = @@ -86,10 +90,26 @@ public synchronized void startTimer( final long minimumTimeBetweenBlocksMillis = blockPeriodSeconds * 1000L; final long expiryTime = chainHeadHeader.getTimestamp() * 1_000 + minimumTimeBetweenBlocksMillis; + startTimer(round, expiryTime); + } + + public synchronized void startTimer(final ConsensusRoundIdentifier round, final long expiryTime) { + cancelTimer(); + + final long now = clock.millis(); + + LOG.debug( + "*** TODO SLD | BlockTimer.startTimer() | cancelling existing timer and submitting new timerTask with expiryTime = {}", + Instant.ofEpochMilli(expiryTime).atZone(ZoneId.of("Australia/Brisbane"))); + if (expiryTime > now) { final long delay = expiryTime - now; - final Runnable newTimerRunnable = () -> queue.add(new BlockTimerExpiry(round)); + final Runnable newTimerRunnable = + () -> { + LOG.debug("TODO SLD BlockTimer expired -> new BlockTimerExpiry({})", round); + queue.add(new BlockTimerExpiry(round)); + }; final ScheduledFuture newTimerTask = bftExecutors.scheduleTask(newTimerRunnable, delay, TimeUnit.MILLISECONDS); diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/EventMultiplexer.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/EventMultiplexer.java index a7e91dff1ec..91715d3531c 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/EventMultiplexer.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/EventMultiplexer.java @@ -42,6 +42,7 @@ public void handleBftEvent(final BftEvent bftEvent) { eventHandler.handleMessageEvent(rxEvent); break; case ROUND_EXPIRY: + LOG.debug("TODO SLD received ROUND_EXPIRY"); final RoundExpiry roundExpiryEvent = (RoundExpiry) bftEvent; eventHandler.handleRoundExpiry(roundExpiryEvent); break; @@ -50,6 +51,7 @@ public void handleBftEvent(final BftEvent bftEvent) { eventHandler.handleNewBlockEvent(newChainHead); break; case BLOCK_TIMER_EXPIRY: + LOG.debug("TODO SLD received BLOCK_TIMER_EXPIRY"); final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) bftEvent; eventHandler.handleBlockTimerExpiry(blockTimerExpiry); break; diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/RoundTimer.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/RoundTimer.java index a500a1ad7ee..8510330f04f 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/RoundTimer.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/RoundTimer.java @@ -20,8 +20,13 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** Class for starting and keeping organised round timers */ public class RoundTimer { + + private static final Logger LOG = LoggerFactory.getLogger(RoundTimer.class); private final BftExecutors bftExecutors; private Optional> currentTimerTask; private final BftEventQueue queue; @@ -65,9 +70,19 @@ public synchronized boolean isRunning() { public synchronized void startTimer(final ConsensusRoundIdentifier round) { cancelTimer(); - final long expiryTime = baseExpiryMillis * (long) Math.pow(2, round.getRoundNumber()); + final long expiryTime = (baseExpiryMillis * (long) Math.pow(2, round.getRoundNumber())); + LOG.debug( + "*** TODO SLD | RoundTimer.startTimer() | cancelling existing timer and submitting new timerTask with expiryTime = " + + expiryTime); - final Runnable newTimerRunnable = () -> queue.add(new RoundExpiry(round)); + final Runnable newTimerRunnable = + () -> { + LOG.debug( + "*** TODO SLD | RoundTimer.startTimer() | time expired -> queue.add(new RoundExpiry(" + + round + + "))"); + queue.add(new RoundExpiry(round)); + }; final ScheduledFuture newTimerTask = bftExecutors.scheduleTask(newTimerRunnable, expiryTime, TimeUnit.MILLISECONDS); diff --git a/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManager.java b/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManager.java index 93772117837..b556b958766 100644 --- a/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManager.java +++ b/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManager.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.consensus.qbft.payload.MessageFactory; import org.hyperledger.besu.consensus.qbft.validation.FutureRoundProposalMessageValidator; import org.hyperledger.besu.consensus.qbft.validation.MessageValidatorFactory; +import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.plugin.services.securitymodule.SecurityModuleException; @@ -38,6 +39,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -104,34 +106,93 @@ public QbftBlockHeightManager( @Override public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) { + LOG.debug("TODO SLD in handleBlockTimerExpiry"); if (currentRound.isPresent()) { // It is possible for the block timer to take longer than it should due to the precision of // the timer in Java and the OS. This means occasionally the proposal can arrive before the // block timer expiry and hence the round has already been set. There is no negative impact // on the protocol in this case. + LOG.debug("TODO SLD currentRound isPresent, returning."); return; } + LOG.debug("TODO SLD start new round"); startNewRound(0); final QbftRound qbftRound = currentRound.get(); + + logIf( + () -> !roundIdentifier.equals(qbftRound.getRoundIdentifier()), + "TODO SLD roundIdentifier does not equal qbftRound.getRoundIdentifier"); + + if (roundIdentifier.equals(qbftRound.getRoundIdentifier())) { + LOG.debug("TODO SLD round is current"); + buildBlockAndMaybePropose(roundIdentifier, qbftRound); + } else { + LOG.trace( + "Block timer expired for a round ({}) other than current ({})", + roundIdentifier, + qbftRound.getRoundIdentifier()); + } + } + + private void buildBlockAndMaybePropose( + final ConsensusRoundIdentifier roundIdentifier, final QbftRound qbftRound) { + // mining will be checked against round 0 as the current round is initialised to 0 above final boolean isProposer = finalState.isLocalNodeProposerForRound(qbftRound.getRoundIdentifier()); - if (isProposer) { - if (roundIdentifier.equals(qbftRound.getRoundIdentifier())) { - final long headerTimeStampSeconds = Math.round(clock.millis() / 1000D); - qbftRound.createAndSendProposalMessage(headerTimeStampSeconds); + final long headerTimeStampSeconds = Math.round(clock.millis() / 1000D); + final Block block = qbftRound.createBlock(headerTimeStampSeconds); + LOG.debug("TODO SLD created block {}", block); + final boolean blockHasTransactions = !block.getBody().getTransactions().isEmpty(); + LOG.debug( + "TODO SLD block.getTransactions.size() = {}", block.getBody().getTransactions().size()); + if (blockHasTransactions) { + logIf(() -> !isProposer, "TODO SLD I is NOT proposer"); + logIf(() -> isProposer, "TODO SLD I IS proposer"); + if (isProposer) { + LOG.debug("TODO SLD blockHasTransactions and I am proposer so send proposal"); + qbftRound.sendProposalMessage(block); + } + } else { + LOG.debug("TODO SLD EMPTY BLOCK DETECTED"); + long emptyBlockPeriodInSeconds = 60L; + final long nowInMillis = finalState.getClock().millis(); + final long nowInSeconds = nowInMillis / 1000; + LOG.debug( + "TODO SLD parentHeader.getTimestamp() + emptyBlockPeriodInSeconds = {} + {}", + parentHeader.getTimestamp(), + emptyBlockPeriodInSeconds); + final long emptyBlockPeriodExpiryTime = + parentHeader.getTimestamp() + emptyBlockPeriodInSeconds; + LOG.debug("TODO SLD isProposer = {}", isProposer); + LOG.debug( + "TODO SLD nowInSeconds > emptyBlockPeriodExpiryTime = {} > {}", + nowInSeconds, + emptyBlockPeriodExpiryTime); + if (nowInSeconds >= emptyBlockPeriodExpiryTime) { + if (isProposer) { + LOG.debug("TODO SLD emptyBlockPeriod expired and I am proposer so send proposal"); + qbftRound.sendProposalMessage(block); + } } else { - LOG.trace( - "Block timer expired for a round ({}) other than current ({})", - roundIdentifier, - qbftRound.getRoundIdentifier()); + finalState.getRoundTimer().cancelTimer(); + long blockPeriodInMillis = 5_000L; + final long newExpiry = nowInMillis + blockPeriodInMillis; + finalState.getBlockTimer().startTimer(roundIdentifier, newExpiry); + currentRound = Optional.empty(); } } } + private void logIf(final Supplier condition, final String message) { + if (condition.get()) { + LOG.debug(message); + } + } + @Override public void roundExpired(final RoundExpiry expire) { if (currentRound.isEmpty()) { diff --git a/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftRound.java b/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftRound.java index 20096976a9b..fc8fce2c28e 100644 --- a/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftRound.java +++ b/consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftRound.java @@ -95,6 +95,16 @@ public ConsensusRoundIdentifier getRoundIdentifier() { return roundState.getRoundIdentifier(); } + public Block createBlock(final long headerTimeStampSeconds) { + LOG.debug("Creating proposed block. round={}", roundState.getRoundIdentifier()); + return blockCreator.createBlock(headerTimeStampSeconds).getBlock(); + } + + public void sendProposalMessage(final Block block) { + LOG.trace("Creating proposed block blockHeader={}", block.getHeader()); + updateStateWithProposalAndTransmit(block, emptyList(), emptyList()); + } + public void createAndSendProposalMessage(final long headerTimeStampSeconds) { LOG.debug("Creating proposed block. round={}", roundState.getRoundIdentifier()); final Block block = blockCreator.createBlock(headerTimeStampSeconds).getBlock(); diff --git a/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManagerTest.java b/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManagerTest.java index 295703209fb..47ee29ae95e 100644 --- a/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManagerTest.java +++ b/consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/statemachine/QbftBlockHeightManagerTest.java @@ -65,6 +65,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.BlockImporter; +import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.Util; import org.hyperledger.besu.ethereum.mainnet.BlockImportResult; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; @@ -107,6 +108,8 @@ public class QbftBlockHeightManagerTest { @Mock private FutureRoundProposalMessageValidator futureRoundProposalMessageValidator; @Mock private ValidatorMulticaster validatorMulticaster; + @Mock private Transaction transaction; + @Captor private ArgumentCaptor sentMessageArgCaptor; private final List
validators = Lists.newArrayList(); @@ -123,7 +126,9 @@ private void buildCreatedBlock() { headerTestFixture.extraData(bftExtraDataCodec.encode(extraData)); final BlockHeader header = headerTestFixture.buildHeader(); - createdBlock = new Block(header, new BlockBody(emptyList(), emptyList())); + // TODO SLD avoid emptyBlockPeriod logic + final List transactions = List.of(transaction); + createdBlock = new Block(header, new BlockBody(transactions, emptyList())); } @Before @@ -242,7 +247,9 @@ public void onBlockTimerExpiryRoundTimerIsStartedAndProposalMessageIsTransmitted messageValidatorFactory, messageFactory); - manager.handleBlockTimerExpiry(roundIdentifier); + manager.handleBlockTimerExpiry( + roundIdentifier); // TODO SLD test is passing because parent.timestamp is 0 so now > + // emptyBlockPeriodExpiry verify(messageTransmitter, atLeastOnce()) .multicastProposal(eq(roundIdentifier), any(), any(), any()); verify(messageTransmitter, atLeastOnce()).multicastPrepare(eq(roundIdentifier), any()); @@ -265,7 +272,9 @@ public void onBlockTimerExpiryRoundTimerIsStartedAndProposalMessageIsTransmitted messageValidatorFactory, messageFactory); - manager.handleBlockTimerExpiry(roundIdentifier); + manager.handleBlockTimerExpiry( + roundIdentifier); // TODO SLD passes for wrong reason: due to emptyBlockPeriod - add txs to + // block? verify(messageTransmitter, never()).multicastProposal(eq(roundIdentifier), any(), any(), any()); verify(messageTransmitter, never()).multicastPrepare(eq(roundIdentifier), any()); verify(roundTimer, times(1)).startTimer(roundIdentifier); @@ -343,7 +352,9 @@ public void onRoundTimerExpiryANewRoundIsCreatedWithAnIncrementedRoundNumber() { manager.handleBlockTimerExpiry(roundIdentifier); verify(roundFactory).createNewRound(any(), eq(0)); - manager.roundExpired(new RoundExpiry(roundIdentifier)); + manager.roundExpired( + new RoundExpiry(roundIdentifier)); // TODO SLD failing because currentRound.isEmpty during + // emptyBlockPeriod verify(roundFactory).createNewRound(any(), eq(1)); }