diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index 09fe9b28a0..6eb44e49dc 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -56,6 +56,7 @@ library Ouroboros.Consensus.Node.ErrorPolicy Ouroboros.Consensus.Node.Exit Ouroboros.Consensus.Node.ExitPolicy + Ouroboros.Consensus.Node.GSM Ouroboros.Consensus.Node.Recovery Ouroboros.Consensus.Node.RethrowPolicy Ouroboros.Consensus.Node.Tracers @@ -69,6 +70,7 @@ library build-depends: , base >=4.14 && <4.19 , bytestring >=0.10 && <0.12 + , cardano-slotting , cborg ^>=0.2.2 , containers >=0.5 && <0.7 , contra-tracer diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index 9ffe6fa4fd..d26c3fd995 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -569,8 +569,9 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe (contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel))) (CsClient.defaultChainDbView (getChainDB kernel)) (getNodeCandidates kernel) + (getNodeIdlers kernel) them - version $ \varCandidate -> do + version $ \varCandidate (startIdling, stopIdling) -> do chainSyncTimeout <- genChainSyncTimeout (r, trailing) <- runPipelinedPeerWithLimits @@ -588,6 +589,8 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe , CsClient.controlMessageSTM , CsClient.headerMetricsTracer = TraceLabelPeer them `contramap` reportHeader , CsClient.varCandidate + , CsClient.startIdling + , CsClient.stopIdling } return (ChainSyncInitiatorResult r, trailing) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index 73711886dc..0facc23f4d 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -65,6 +65,7 @@ import Data.Hashable (Hashable) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isNothing) +import Data.Time (NominalDiffTime) import Data.Typeable (Typeable) import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime hiding (getSystemStart) @@ -81,6 +82,8 @@ import Ouroboros.Consensus.Node.DbLock import Ouroboros.Consensus.Node.DbMarker import Ouroboros.Consensus.Node.ErrorPolicy import Ouroboros.Consensus.Node.ExitPolicy +import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..)) +import qualified Ouroboros.Consensus.Node.GSM as GSM import Ouroboros.Consensus.Node.InitStorage import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.ProtocolInfo @@ -130,7 +133,7 @@ import System.FilePath (()) import System.FS.API (SomeHasFS (..)) import System.FS.API.Types import System.FS.IO (ioHasFS) -import System.Random (StdGen, newStdGen, randomIO, randomRIO) +import System.Random (StdGen, newStdGen, split, randomIO, randomRIO) {------------------------------------------------------------------------------- The arguments to the Consensus Layer node functionality @@ -254,6 +257,10 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk -- | node-to-client protocol versions to run. , llrnNodeToClientVersions :: Map NodeToClientVersion (BlockNodeToClientVersion blk) + -- | If the volatile tip is older than this, then the node will exit the + -- @CaughtUp@ state. + , llrnMaxCaughtUpAge :: NominalDiffTime + -- | Maximum clock skew , llrnMaxClockSkew :: ClockSkew } @@ -364,8 +371,15 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = , ChainDB.cdbVolatileDbValidation = ValidateAll } - chainDB <- openChainDB registry inFuture cfg initLedger - llrnChainDbArgsDefaults customiseChainDbArgs' + let finalChainDbArgs = + mkFinalChainDbArgs + registry + inFuture + cfg + initLedger + llrnChainDbArgsDefaults + customiseChainDbArgs' + chainDB <- ChainDB.openDB finalChainDbArgs continueWithCleanChainDB chainDB $ do btime <- @@ -384,17 +398,28 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = , hfbtMaxClockRewind = secondsToNominalDiffTime 20 } - nodeKernelArgs <- - fmap (nodeKernelArgsEnforceInvariants . llrnCustomiseNodeKernelArgs) $ - mkNodeKernelArgs - registry - llrnBfcSalt - llrnKeepAliveRng - cfg - rnTraceConsensus - btime - (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) - chainDB + nodeKernelArgs <- do + durationUntilTooOld <- GSM.realDurationUntilTooOld + (configLedger cfg) + (ledgerState <$> ChainDB.getCurrentLedger chainDB) + llrnMaxCaughtUpAge + systemTime + let gsmMarkerFileView = + case ChainDB.cdbHasFSGsmDB finalChainDbArgs of + SomeHasFS x -> GSM.realMarkerFileView chainDB x + fmap (nodeKernelArgsEnforceInvariants . llrnCustomiseNodeKernelArgs) + $ mkNodeKernelArgs + registry + llrnBfcSalt + llrnKeepAliveRng + cfg + rnTraceConsensus + btime + (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) + chainDB + llrnMaxCaughtUpAge + (Just durationUntilTooOld) + gsmMarkerFileView nodeKernel <- initNodeKernel nodeKernelArgs rnNodeKernelHook registry nodeKernel @@ -600,13 +625,25 @@ openChainDB -- ^ Customise the 'ChainDbArgs' -> m (ChainDB m blk) openChainDB registry inFuture cfg initLedger defArgs customiseArgs = - ChainDB.openDB args - where - args :: ChainDbArgs Identity m blk - args = customiseArgs $ - mkChainDbArgs registry inFuture cfg initLedger - (nodeImmutableDbChunkInfo (configStorage cfg)) - defArgs + ChainDB.openDB + $ mkFinalChainDbArgs registry inFuture cfg initLedger defArgs customiseArgs + +mkFinalChainDbArgs + :: forall m blk. (RunNode blk, IOLike m) + => ResourceRegistry m + -> CheckInFuture m blk + -> TopLevelConfig blk + -> ExtLedgerState blk + -- ^ Initial ledger + -> ChainDbArgs Defaults m blk + -> (ChainDbArgs Identity m blk -> ChainDbArgs Identity m blk) + -- ^ Customise the 'ChainDbArgs' + -> ChainDbArgs Identity m blk +mkFinalChainDbArgs registry inFuture cfg initLedger defArgs customiseArgs = + customiseArgs $ + mkChainDbArgs registry inFuture cfg initLedger + (nodeImmutableDbChunkInfo (configStorage cfg)) + defArgs mkChainDbArgs :: forall m blk. (RunNode blk, IOLike m) @@ -645,6 +682,9 @@ mkNodeKernelArgs -> BlockchainTime m -> InFutureCheck.SomeHeaderInFutureCheck m blk -> ChainDB m blk + -> NominalDiffTime + -> Maybe (GSM.WrapDurationUntilTooOld m blk) + -> GSM.MarkerFileView m -> m (NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk) mkNodeKernelArgs registry @@ -655,6 +695,9 @@ mkNodeKernelArgs btime chainSyncFutureCheck chainDB + maxCaughtUpAge + gsmDurationUntilTooOld + gsmMarkerFileView = do return NodeKernelArgs { tracers @@ -669,6 +712,12 @@ mkNodeKernelArgs , miniProtocolParameters = defaultMiniProtocolParameters , blockFetchConfiguration = defaultBlockFetchConfiguration , keepAliveRng + , gsmArgs = GsmNodeKernelArgs { + gsmAntiThunderingHerd = snd (split keepAliveRng) + , gsmDurationUntilTooOld + , gsmMarkerFileView + , gsmMinCaughtUpDuration = maxCaughtUpAge + } } where defaultBlockFetchConfiguration :: BlockFetchConfiguration @@ -879,6 +928,7 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo (supportedNodeToClientVersions (Proxy @blk)) , llrnWithCheckedDB = stdWithCheckedDB (Proxy @blk) srnDatabasePath networkMagic + , llrnMaxCaughtUpAge = secondsToNominalDiffTime $ 20 * 60 -- 20 min , llrnMaxClockSkew = InFuture.defaultClockSkew } diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs new file mode 100644 index 0000000000..47585be0bd --- /dev/null +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs @@ -0,0 +1,428 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE Rank2Types #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE ViewPatterns #-} + +-- | The Genesis State Machine decides whether the node is caught-up or not. +module Ouroboros.Consensus.Node.GSM ( + CandidateVersusSelection (..) + , DurationFromNow (..) + , GsmEntryPoints (..) + , GsmNodeKernelArgs (..) + , GsmView (..) + , MarkerFileView (..) + , WrapDurationUntilTooOld (..) + -- * Auxiliaries + , initializationLedgerJudgement + -- * Constructors + , realDurationUntilTooOld + , realGsmEntryPoints + , realMarkerFileView + ) where + +import qualified Cardano.Slotting.Slot as Slot +import qualified Control.Concurrent.Class.MonadSTM.TVar as LazySTM +import Control.Monad (join, unless, when) +import Control.Monad.Class.MonadSTM (MonadSTM, STM, atomically, + check, retry) +import Control.Monad.Class.MonadThrow (MonadThrow) +import Control.Monad.Class.MonadTimer (MonadTimer, registerDelay, + threadDelay) +import Data.Functor ((<&>)) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Time (NominalDiffTime) +import qualified Ouroboros.Consensus.BlockchainTime.WallClock.Types as Clock +import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork +import qualified Ouroboros.Consensus.HardFork.History as HardFork +import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry +import qualified Ouroboros.Consensus.Ledger.Basics as L +import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar) +import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM +import Ouroboros.Consensus.Util.Time (nominalDiffTimeToMicroseconds) +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement (..)) +import System.FS.API (HasFS, createDirectoryIfMissing, doesFileExist, + removeFile, withFile) +import System.FS.API.Types (AllowExisting (..), FsPath, OpenMode (..), + mkFsPath) +import System.Random (StdGen, uniformR) + +{------------------------------------------------------------------------------- + Interface +-------------------------------------------------------------------------------} + +data DurationFromNow = + After !NominalDiffTime + -- ^ INVARIANT positive + | + Already + -- ^ This value represents all non-positive durations, ie events from the + -- past + +data CandidateVersusSelection = + CandidateDoesNotIntersect + -- ^ The GSM assumes that this is ephemeral + -- + -- For example, the ChainSync client will either disconnect from the peer + -- or update the candidate to one that is not stale. It's also technically + -- possible that the selection is stale, which the ChainDB would also + -- resolve as soon as possible. + | + WhetherCandidateIsBetter !Bool + -- ^ Whether the candidate is better than the selection + deriving (Eq, Show) + +data GsmView m blk upstreamPeer selection candidate = GsmView { + antiThunderingHerd :: StdGen + -- ^ A initial seed used to randomly increase 'minCaughtUpDuration' by up + -- to 15% per transition from OnlyBootstrap to CaughtUp in order to avoid a + -- thundering herd phenemenon. + , + candidateOverSelection :: + selection -> candidate -> CandidateVersusSelection + , + durationUntilTooOld :: Maybe (selection -> m DurationFromNow) + -- ^ How long from now until the selection will be so old that the node + -- should exit the @CaughtUp@ state + -- + -- 'Nothing' means the selection can never become too old. + , + equivalent :: selection -> selection -> Bool + -- ^ Whether the two selections are equivalent for the purpose of the + -- Genesis State Machine + , + getChainSyncCandidates :: + STM m (Map.Map upstreamPeer (StrictTVar m candidate)) + -- ^ The latest candidates from the upstream ChainSync peers + , + getChainSyncIdlers :: STM m (Set.Set upstreamPeer) + -- ^ The ChainSync peers whose latest message claimed that they have no + -- subsequent headers + , + getCurrentSelection :: STM m selection + -- ^ The node's current selection + , + minCaughtUpDuration :: NominalDiffTime + -- ^ How long the node must stay in CaughtUp after transitioning to it from + -- OnlyBootstrap, regardless of the selection's age. + -- + -- See 'antiThunderingHerd'. + , + setCaughtUpPersistentMark :: Bool -> m () + -- ^ EG touch/delete the marker file on disk + , + varLedgerStateJudgement :: StrictTVar m LedgerStateJudgement + -- ^ EG update the TVar that the diffusion layer monitors + } + +-- | The two proper GSM states for boot strap peers +-- +-- See the @BootstrapPeersIER.md@ document for their specification. +data GsmEntryPoints m = GsmEntryPoints { + enterCaughtUp :: forall neverTerminates. m neverTerminates + -- ^ ASSUMPTION the marker file is present on disk, a la + -- @'setCaughtUpPersistentMark' True@ + -- + -- Thus this can be invoked at node start up after determining the marker + -- file is present (and the tip is still not stale) + , + enterOnlyBootstrap :: forall neverTerminates. m neverTerminates + -- ^ ASSUMPTION the marker file is absent on disk, a la + -- @'setCaughtUpPersistentMark' False@ + -- + -- Thus this can be invoked at node start up after determining the marker + -- file is absent. + } + +----- + +-- | Determine the initial 'LedgerStateJudgment' +-- +-- Also initializes the persistent marker file. +initializationLedgerJudgement :: + ( L.GetTip (L.LedgerState blk) + , Monad m + ) + => m (L.LedgerState blk) + -> Maybe (WrapDurationUntilTooOld m blk) + -- ^ 'Nothing' if @blk@ has no age limit + -> MarkerFileView m + -> m LedgerStateJudgement +initializationLedgerJudgement + getCurrentLedger + mbDurationUntilTooOld + markerFileView + = do + wasCaughtUp <- hasMarkerFile markerFileView + if not wasCaughtUp then pure TooOld else do + case mbDurationUntilTooOld of + Nothing -> return YoungEnough + Just wd -> do + sno <- L.getTipSlot <$> getCurrentLedger + getDurationUntilTooOld wd sno >>= \case + After{} -> return YoungEnough + Already -> do + removeMarkerFile markerFileView + return TooOld + +{------------------------------------------------------------------------------- + A real implementation +-------------------------------------------------------------------------------} + +-- | The actual GSM logic for boot strap peers +-- +-- See the @BootstrapPeersIER.md@ document for the specification of this logic. +realGsmEntryPoints :: forall m blk upstreamPeer selection candidate. + ( MonadTimer m + , Eq upstreamPeer + ) + => GsmView m blk upstreamPeer selection candidate + -> GsmEntryPoints m +realGsmEntryPoints gsmView = GsmEntryPoints { + enterCaughtUp + , + enterOnlyBootstrap + } + where + GsmView { + antiThunderingHerd + , + candidateOverSelection + , + durationUntilTooOld + , + equivalent + , + getChainSyncCandidates + , + getChainSyncIdlers + , + getCurrentSelection + , + minCaughtUpDuration + , + setCaughtUpPersistentMark + , + varLedgerStateJudgement + } = gsmView + + enterCaughtUp :: forall neverTerminates. m neverTerminates + enterCaughtUp = enterCaughtUp' antiThunderingHerd + + enterOnlyBootstrap :: forall neverTerminates. m neverTerminates + enterOnlyBootstrap = enterOnlyBootstrap' antiThunderingHerd + + enterCaughtUp' :: forall neverTerminates. StdGen -> m neverTerminates + enterCaughtUp' g = do + blockWhileCaughtUp + + setCaughtUpPersistentMark False + enterOnlyBootstrap' g + + enterOnlyBootstrap' :: StdGen -> forall neverTerminates. m neverTerminates + enterOnlyBootstrap' g = do + atomically $ StrictSTM.writeTVar varLedgerStateJudgement TooOld + blockUntilCaughtUp + atomically $ StrictSTM.writeTVar varLedgerStateJudgement YoungEnough + + setCaughtUpPersistentMark True + + -- When transitioning from OnlyBootstrap to CaughtUp, the node will + -- remain in CaughtUp for at least 'minCaughtUpDuration', regardless of + -- the selection's age. And randomly up to 15% longer. + let (bonus, !g') = uniformR (0, 15 :: Int) g + threadDelay + $ ceiling + $ nominalDiffTimeToMicroseconds minCaughtUpDuration + * + (1 + fromIntegral bonus / 100 :: Double) + + enterCaughtUp' g' + + blockWhileCaughtUp :: m () + blockWhileCaughtUp = + atomically getCurrentSelection >>= blockWhileCaughtUpHelper + + blockWhileCaughtUpHelper :: selection -> m () + blockWhileCaughtUpHelper selection = do + let computeDuration = case durationUntilTooOld of + Nothing -> pure Nothing + Just f -> Just <$> f selection + computeDuration >>= \case + Nothing -> atomically retry -- never unblock + Just Already -> pure () -- it's already too old + Just (After dur) -> do + let microSeconds = nominalDiffTimeToMicroseconds dur + varTimeoutExpired <- registerDelay (ceiling microSeconds) + + -- If the selection changes before the timeout expires, setup a + -- new timeout for the new tip and loop. + -- + -- Otherwise the timeout expired before the selection changed + -- (or they both happened after the previous attempt of this + -- STM transaction), so the node is no longer in @CaughtUp@. + join $ atomically $ do + expired <- LazySTM.readTVar varTimeoutExpired + if expired then pure (pure ()) else do + selection' <- getCurrentSelection + check $ not $ equivalent selection selection' + pure $ blockWhileCaughtUpHelper selection' + + blockUntilCaughtUp :: m () + blockUntilCaughtUp = do + varsCandidate <- atomically $ do + -- STAGE 1: all ChainSync clients report no subsequent headers + idlers <- getChainSyncIdlers + varsCandidate <- getChainSyncCandidates + check $ + 0 < Map.size varsCandidate + && Set.size idlers == Map.size varsCandidate + && idlers == Map.keysSet varsCandidate + + -- STAGE 2: no candidate is better than the node's current + -- selection + candidates <- traverse StrictSTM.readTVar varsCandidate + -- TODO Simon Marlow wrote "Never read an unbounded number of + -- TVars in a single transaction because the O(n) performance of + -- readTVar then gives O(n*n) for the whole transaction." Is it + -- possible that varsCandidate will be big enough that this STM + -- transaction won't terminate even when the node is actually + -- done syncing? + selection <- getCurrentSelection + let ok candidate = + WhetherCandidateIsBetter False + == candidateOverSelection selection candidate + check $ all ok candidates + + pure varsCandidate + + -- STAGE 3: the previous check wasn't so slow that the peer set changed + -- + -- If the node is actually done syncing, then this should rarely fail. + -- + -- TODO is this even unnecessary? + varsCandidate' <- atomically getChainSyncCandidates + when (Map.keys varsCandidate /= Map.keys varsCandidate') $ do + blockUntilCaughtUp + +{------------------------------------------------------------------------------- + A helper for constructing a real 'GsmView' +-------------------------------------------------------------------------------} + +newtype WrapDurationUntilTooOld m blk = DurationUntilTooOld { + getDurationUntilTooOld :: Slot.WithOrigin Slot.SlotNo -> m DurationFromNow + } + +-- | The real system's 'durationUntilTooOld' +realDurationUntilTooOld :: + ( HardFork.HasHardForkHistory blk + , MonadSTM m + ) + => L.LedgerConfig blk + -> STM m (L.LedgerState blk) + -> NominalDiffTime + -- ^ If the volatile tip is older than this, then the node will exit the + -- @CaughtUp@ state. + -- + -- Eg 'Ouroboros.Consensus.Node.llrnMaxCaughtUpAge' + -> Clock.SystemTime m + -> m (WrapDurationUntilTooOld m blk) +realDurationUntilTooOld lcfg getLedgerState maxCaughtUpAge systemTime = do + runner <- + HardFork.runWithCachedSummary + $ HardFork.hardForkSummary lcfg <$> getLedgerState + pure $ DurationUntilTooOld $ \woSlot -> do + now <- Clock.systemTimeCurrent systemTime + case woSlot of + Slot.Origin -> pure $ toDur now $ Clock.RelativeTime 0 + Slot.At slot -> do + let qry = Qry.slotToWallclock slot + atomically $ HardFork.cachedRunQuery runner qry <&> \case + Left Qry.PastHorizon{} -> Already + Right (onset, _slotLen) -> toDur now onset + where + toDur + (Clock.RelativeTime now) + (Clock.getRelativeTime -> (+ maxCaughtUpAge) -> limit) + = if limit <= now then Already else After (limit - now) + +{------------------------------------------------------------------------------- + A helper for constructing a real 'GsmView' + + TODO should these operations properly be part of the ChainDB? +-------------------------------------------------------------------------------} + +-- | A view on the GSM's /Caught-Up persistent marker/ file +-- +-- These comments constrain the result of 'realMarkerFile'; mock views in +-- testing are free to be different. +data MarkerFileView m = MarkerFileView { + hasMarkerFile :: m Bool + , + -- | Remove the marker file + -- + -- Will throw an 'FsResourceDoesNotExist' error when it does not exist. + removeMarkerFile :: m () + , + -- | Create the marker file + -- + -- Idempotent. + touchMarkerFile :: m () + } + +-- | The real system's 'MarkerFileView' +-- +-- The strict 'ChainDB' argument is unused, but its existence ensures there's +-- only one process using this file system. +realMarkerFileView :: + MonadThrow m + => ChainDB m blk + -> HasFS m h + -- ^ should be independent of other filesystems, eg @gsm/@ + -> MarkerFileView m +realMarkerFileView !_cdb hasFS = + MarkerFileView { + hasMarkerFile + , + removeMarkerFile = removeFile hasFS markerFile + , + touchMarkerFile = do + createDirectoryIfMissing hasFS True (mkFsPath []) + alreadyExists <- hasMarkerFile + unless alreadyExists $ + withFile hasFS markerFile (WriteMode MustBeNew) $ \_h -> + return () + } + where + hasMarkerFile = doesFileExist hasFS markerFile + +-- | The path to the GSM's /Caught-Up persistent marker/ inside its dedicated +-- 'HasFS' +-- +-- If the file is present on node initialization, then the node was in the +-- @CaughtUp@ state when it shut down. +markerFile :: FsPath +markerFile = mkFsPath ["CaughtUpMarker"] + +{------------------------------------------------------------------------------- + A helper for the NodeKernel +-------------------------------------------------------------------------------} + +-- | Arguments the NodeKernel has to take because of the GSM +data GsmNodeKernelArgs m blk = GsmNodeKernelArgs { + gsmAntiThunderingHerd :: StdGen + -- ^ See 'antiThunderingHerd' + , + gsmDurationUntilTooOld :: Maybe (WrapDurationUntilTooOld m blk) + -- ^ See 'durationTooOld' + , + gsmMarkerFileView :: MarkerFileView m + , + gsmMinCaughtUpDuration :: NominalDiffTime + -- ^ See 'minCaughtUpDuration' + } diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index 5158378e4c..2a9745864b 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -31,16 +31,20 @@ module Ouroboros.Consensus.NodeKernel ( import qualified Control.Concurrent.Class.MonadSTM as LazySTM import Control.DeepSeq (force) import Control.Monad +import Control.Monad.Class.MonadTimer (MonadTimer) import Control.Monad.Except import Control.Tracer import Data.Bifunctor (second) import Data.Data (Typeable) import Data.Foldable (traverse_) +import Data.Function (on) +import Data.Functor ((<&>)) import Data.Hashable (Hashable) import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import Data.Maybe (isJust, mapMaybe) import Data.Proxy +import Data.Set (Set) import qualified Data.Text as Text import Data.Void (Void) import Ouroboros.Consensus.Block hiding (blockMatchesHeader) @@ -58,6 +62,8 @@ import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck (SomeHeaderInFutureCheck) +import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..)) +import qualified Ouroboros.Consensus.Node.GSM as GSM import Ouroboros.Consensus.Node.Run import Ouroboros.Consensus.Node.Tracers import Ouroboros.Consensus.Protocol.Abstract @@ -67,6 +73,8 @@ import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as InvalidBlockPunishment import Ouroboros.Consensus.Storage.ChainDB.Init (InitChainDB) import qualified Ouroboros.Consensus.Storage.ChainDB.Init as InitChainDB +import Ouroboros.Consensus.Util.AnchoredFragment + (preferAnchoredCandidate) import Ouroboros.Consensus.Util.EarlyExit import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () @@ -78,6 +86,8 @@ import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.BlockFetch import Ouroboros.Network.NodeToNode (ConnectionId, MiniProtocolParameters (..)) +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement(..)) import Ouroboros.Network.PeerSharing (PeerSharingRegistry, newPeerSharingRegistry) import Ouroboros.Network.TxSubmission.Inbound @@ -95,36 +105,44 @@ import System.Random (StdGen) -- | Interface against running relay node data NodeKernel m addrNTN addrNTC blk = NodeKernel { -- | The 'ChainDB' of the node - getChainDB :: ChainDB m blk + getChainDB :: ChainDB m blk -- | The node's mempool - , getMempool :: Mempool m blk + , getMempool :: Mempool m blk -- | The node's top-level static configuration - , getTopLevelConfig :: TopLevelConfig blk + , getTopLevelConfig :: TopLevelConfig blk -- | The fetch client registry, used for the block fetch clients. - , getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m + , getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m -- | The fetch mode, used by diffusion. -- - , getFetchMode :: STM m FetchMode + , getFetchMode :: STM m FetchMode + + -- | The ledger judgement, used by diffusion. + -- + , getLedgerStateJudgement :: STM m LedgerStateJudgement -- | Read the current candidates - , getNodeCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk)))) + , getNodeCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk)))) + + -- | Read the set of peers that have claimed to have no subsequent + -- headers beyond their current candidate + , getNodeIdlers :: StrictTVar m (Set (ConnectionId addrNTN)) -- | Read the current peer sharing registry, used for interacting with -- the PeerSharing protocol - , getPeerSharingRegistry :: PeerSharingRegistry addrNTN m + , getPeerSharingRegistry :: PeerSharingRegistry addrNTN m -- | The node's tracers - , getTracers :: Tracers m (ConnectionId addrNTN) addrNTC blk + , getTracers :: Tracers m (ConnectionId addrNTN) addrNTC blk -- | Set block forging -- -- When set with the empty list '[]' block forging will be disabled. -- - , setBlockForging :: [BlockForging m blk] -> m () + , setBlockForging :: [BlockForging m blk] -> m () } -- | Arguments required when initializing a node @@ -141,11 +159,13 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs { , miniProtocolParameters :: MiniProtocolParameters , blockFetchConfiguration :: BlockFetchConfiguration , keepAliveRng :: StdGen + , gsmArgs :: GsmNodeKernelArgs m blk } initNodeKernel :: forall m addrNTN addrNTC blk. ( IOLike m + , MonadTimer m , RunNode blk , Ord addrNTN , Hashable addrNTN @@ -156,19 +176,61 @@ initNodeKernel initNodeKernel args@NodeKernelArgs { registry, cfg, tracers , chainDB, initChainDB , blockFetchConfiguration + , gsmArgs } = do -- using a lazy 'TVar', 'BlockForging' does not have a 'NoThunks' instance. blockForgingVar :: LazySTM.TMVar m [BlockForging m blk] <- LazySTM.newTMVarIO [] initChainDB (configStorage cfg) (InitChainDB.fromFull chainDB) st <- initInternalState args + let IS + { blockFetchInterface + , fetchClientRegistry + , mempool + , peerSharingRegistry + , varCandidates + , varIdlers + , varLedgerJudgement + } = st + + do let GsmNodeKernelArgs {..} = gsmArgs + let gsm = GSM.realGsmEntryPoints GSM.GsmView + { GSM.antiThunderingHerd = gsmAntiThunderingHerd + , GSM.candidateOverSelection = \(headers, _lst) candidate -> + case AF.intersectionPoint headers candidate of + Nothing -> GSM.CandidateDoesNotIntersect + Just{} -> + GSM.WhetherCandidateIsBetter + $ -- precondition requires intersection + preferAnchoredCandidate + (configBlock cfg) + headers + candidate + , GSM.durationUntilTooOld = + gsmDurationUntilTooOld + <&> \wd (_headers, lst) -> + GSM.getDurationUntilTooOld wd (getTipSlot lst) + , GSM.equivalent = (==) `on` (AF.headPoint . fst) + , GSM.getChainSyncCandidates = readTVar varCandidates + , GSM.getChainSyncIdlers = readTVar varIdlers + , GSM.getCurrentSelection = do + headers <- ChainDB.getCurrentChain chainDB + extLedgerState <- ChainDB.getCurrentLedger chainDB + return (headers, ledgerState extLedgerState) + , GSM.minCaughtUpDuration = gsmMinCaughtUpDuration + , GSM.setCaughtUpPersistentMark = \upd -> + (if upd then GSM.touchMarkerFile else GSM.removeMarkerFile) + gsmMarkerFileView + , GSM.varLedgerStateJudgement = varLedgerJudgement + } + judgment <- readTVarIO varLedgerJudgement + void $ forkLinkedThread registry "NodeKernel.GSM" $ case judgment of + TooOld -> GSM.enterOnlyBootstrap gsm + YoungEnough -> GSM.enterCaughtUp gsm void $ forkLinkedThread registry "NodeKernel.blockForging" $ blockForgingController st (LazySTM.takeTMVar blockForgingVar) - let IS { blockFetchInterface, fetchClientRegistry, varCandidates, - peerSharingRegistry, mempool } = st - -- Run the block fetch logic in the background. This will call -- 'addFetchedBlock' whenever a new block is downloaded. void $ forkLinkedThread registry "NodeKernel.blockFetchLogic" $ @@ -180,15 +242,17 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers blockFetchConfiguration return NodeKernel - { getChainDB = chainDB - , getMempool = mempool - , getTopLevelConfig = cfg - , getFetchClientRegistry = fetchClientRegistry - , getFetchMode = readFetchMode blockFetchInterface - , getNodeCandidates = varCandidates - , getPeerSharingRegistry = peerSharingRegistry - , getTracers = tracers - , setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a + { getChainDB = chainDB + , getMempool = mempool + , getTopLevelConfig = cfg + , getFetchClientRegistry = fetchClientRegistry + , getFetchMode = readFetchMode blockFetchInterface + , getLedgerStateJudgement = readTVar varLedgerJudgement + , getNodeCandidates = varCandidates + , getNodeIdlers = varIdlers + , getPeerSharingRegistry = peerSharingRegistry + , getTracers = tracers + , setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a } where blockForgingController :: InternalState m remotePeer localPeer blk @@ -216,8 +280,10 @@ data InternalState m addrNTN addrNTC blk = IS { , blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m , fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m , varCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk)))) + , varIdlers :: StrictTVar m (Set (ConnectionId addrNTN)) , mempool :: Mempool m blk , peerSharingRegistry :: PeerSharingRegistry addrNTN m + , varLedgerJudgement :: StrictTVar m LedgerStateJudgement } initInternalState @@ -232,8 +298,18 @@ initInternalState initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg , blockFetchSize, btime , mempoolCapacityOverride + , gsmArgs } = do + varLedgerJudgement <- do + let GsmNodeKernelArgs {..} = gsmArgs + j <- GSM.initializationLedgerJudgement + (atomically $ ledgerState <$> ChainDB.getCurrentLedger chainDB) + gsmDurationUntilTooOld + gsmMarkerFileView + newTVarIO j + varCandidates <- newTVarIO mempty + varIdlers <- newTVarIO mempty mempool <- openMempool registry (chainDBLedgerInterface chainDB) (configLedger cfg) diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 8dd019d403..0399f2f057 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -73,6 +73,7 @@ import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy +import qualified Ouroboros.Consensus.Node.GSM as GSM import Ouroboros.Consensus.Node.InitStorage import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.ProtocolInfo @@ -997,6 +998,12 @@ runThreadNetwork systemTime ThreadNetworkArgs -- blockfetch descision interval. , bfcSalt = 0 } + , durationUntilTooOld = Nothing + , gsmMarkerFileView = GSM.MarkerFileView { + touchMarkerFile = pure () + , removeMarkerFile = pure () + , hasMarkerFile = pure False + } } nodeKernel <- initNodeKernel nodeKernelArgs @@ -1458,9 +1465,10 @@ newNodeInfo = do (v1, m1) <- mk (v2, m2) <- mk (v3, m3) <- mk + (v4, m4) <- mk pure - ( NodeDBs v1 v2 v3 - , NodeDBs <$> m1 <*> m2 <*> m3 + ( NodeDBs v1 v2 v3 v4 + , NodeDBs <$> m1 <*> m2 <*> m3 <*> m4 ) pure diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs index f3eb1e3655..d2a919f285 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs @@ -103,6 +103,8 @@ basicChainSyncClient tracer cfg chainDbView varCandidate = , CSClient.controlMessageSTM = return Continue , CSClient.headerMetricsTracer = nullTracer , CSClient.varCandidate + , CSClient.startIdling = pure () + , CSClient.stopIdling = pure () } where dummyHeaderInFutureCheck :: diff --git a/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs b/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs index a67fb5b408..1f1123554d 100644 --- a/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs +++ b/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs @@ -134,6 +134,8 @@ oneBenchRun , CSClient.controlMessageSTM = return Continue , CSClient.headerMetricsTracer = nullTracer , CSClient.varCandidate + , CSClient.startIdling = pure () + , CSClient.stopIdling = pure () } server :: ChainSyncServer H (Point B) (Tip B) IO () diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index ef973f45c8..d75d47fff5 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -63,6 +63,8 @@ import Control.Tracer import Data.Kind (Type) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import Data.Set (Set) +import qualified Data.Set as Set import Data.Proxy import Data.Typeable import Data.Word (Word64) @@ -161,14 +163,28 @@ bracketChainSyncClient :: -> StrictTVar m (Map peer (StrictTVar m (AnchoredFragment (Header blk)))) -- ^ The candidate chains, we need the whole map because we -- (de)register nodes (@peer@). + -> StrictTVar m (Set peer) + -- ^ This ChainSync client should ensure that its peer is in this set while + -- and only while both of the following conditions are satisfied: the + -- peer's latest -- message has been fully processed (especially that its + -- candidate has been updated; previous argument) and its latest message + -- did not claim that it already has headers that extend its candidate. + -- + -- It's more important that the client is removed from the set promptly + -- than it is for the client to be added promptly, because of how this is + -- used by the GSM to determine that the node is done syncing. -> peer -> NodeToNodeVersion - -> (StrictTVar m (AnchoredFragment (Header blk)) -> m a) + -> ( StrictTVar m (AnchoredFragment (Header blk)) + -> (m (), m ()) + -> m a + ) -> m a bracketChainSyncClient tracer ChainDbView { getIsInvalidBlock } varCandidates + varIdling peer version body @@ -178,15 +194,20 @@ bracketChainSyncClient withWatcher "ChainSync.Client.rejectInvalidBlocks" (invalidBlockWatcher varCandidate) - $ body varCandidate + $ body + varCandidate + ( atomically $ modifyTVar varIdling $ Set.insert peer + , atomically $ modifyTVar varIdling $ Set.delete peer + ) where newCandidateVar = do varCandidate <- newTVarIO $ AF.Empty AF.AnchorGenesis atomically $ modifyTVar varCandidates $ Map.insert peer varCandidate return varCandidate - releaseCandidateVar _ = do - atomically $ modifyTVar varCandidates $ Map.delete peer + releaseCandidateVar _ = atomically $ do + modifyTVar varCandidates $ Map.delete peer + modifyTVar varIdling $ Set.delete peer invalidBlockWatcher varCandidate = invalidBlockRejector @@ -495,6 +516,12 @@ data DynamicEnv m blk = DynamicEnv { , controlMessageSTM :: ControlMessageSTM m , headerMetricsTracer :: HeaderMetricsTracer m , varCandidate :: StrictTVar m (AnchoredFragment (Header blk)) + , startIdling :: m () + -- ^ Insert the peer into the idling set argument of + -- 'bracketChainSyncClient' + , stopIdling :: m () + -- ^ Remove the peer from the idling set argument of + -- 'bracketChainSyncClient' } -- | General values collectively needed by the top-level entry points @@ -586,6 +613,10 @@ chainSyncClient cfgEnv dynEnv = getCurrentChain } = chainDbView + DynamicEnv { + stopIdling + } = dynEnv + mkIntEnv :: InFutureCheck.HeaderInFutureCheck m blk arrival judgment -> InternalEnv m blk arrival judgment @@ -629,7 +660,7 @@ chainSyncClient cfgEnv dynEnv = recvMsgRollForward = \_hdr _tip -> go n' s , recvMsgRollBackward = \_pt _tip -> go n' s } - in Stateful $ go n0 + in Stateful $ \s -> do stopIdling; go n0 s terminate :: ChainSyncClientResult @@ -812,7 +843,8 @@ findIntersectionTop cfgEnv dynEnv intEnv = disconnect $ InvalidIntersection intersection (ourTipFromChain ourFrag) theirTip - atomically $ writeTVar varCandidate theirFrag + atomically $ do + writeTVar varCandidate theirFrag let kis = assertKnownIntersectionInvariants (configConsensus cfg) $ KnownIntersectionState { @@ -855,6 +887,8 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = DynamicEnv { controlMessageSTM , headerMetricsTracer + , startIdling + , stopIdling , varCandidate } = dynEnv @@ -930,22 +964,21 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = requestNext kis mkPipelineDecision n theirTip candTipBlockNo = let theirTipBlockNo = getTipBlockNo (unTheir theirTip) decision = - runPipelineDecision - mkPipelineDecision - n - candTipBlockNo - theirTipBlockNo + runPipelineDecision + mkPipelineDecision + n + candTipBlockNo + theirTipBlockNo in case (n, decision) of (Zero, (Request, mkPipelineDecision')) -> SendMsgRequestNext + startIdling -- on MsgAwaitReply (handleNext kis mkPipelineDecision' Zero) - ( -- when we have to wait - return $ handleNext kis mkPipelineDecision' Zero - ) (_, (Pipeline, mkPipelineDecision')) -> SendMsgRequestNextPipelined + startIdling -- on MsgAwaitReply $ requestNext kis mkPipelineDecision' @@ -955,9 +988,10 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = (Succ n', (CollectOrPipeline, mkPipelineDecision')) -> CollectResponse - ( Just + ( Just $ pure $ SendMsgRequestNextPipelined + startIdling -- on MsgAwaitReply $ requestNext kis mkPipelineDecision' @@ -979,6 +1013,7 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = -> Consensus (ClientStNext n) blk m handleNext kis mkPipelineDecision n = ClientStNext { recvMsgRollForward = \hdr theirTip -> do + stopIdling traceWith tracer $ TraceDownloadedHeader hdr continueWithState kis $ rollForward @@ -988,6 +1023,7 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = (Their theirTip) , recvMsgRollBackward = \intersection theirTip -> do + stopIdling let intersection' :: Point blk intersection' = castPoint intersection traceWith tracer $ TraceRolledBack intersection' diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs index ed3b09a085..a49ab3d40c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs @@ -41,6 +41,7 @@ data ChainDbArgs f m blk = ChainDbArgs { cdbHasFSImmutableDB :: SomeHasFS m , cdbHasFSVolatileDB :: SomeHasFS m , cdbHasFSLgrDB :: SomeHasFS m + , cdbHasFSGsmDB :: SomeHasFS m -- Policy , cdbImmutableDbValidation :: ImmutableDB.ValidationPolicy @@ -93,6 +94,7 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs { -- 'cdbsGcInterval'. , cdbsRegistry :: HKD f (ResourceRegistry m) , cdbsTracer :: Tracer m (TraceEvent blk) + , cdbsHasFSGsmDB :: SomeHasFS m } -- | Default arguments @@ -117,14 +119,18 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs { -- have, because of batching) < the number of blocks sync in @gcInterval@. -- E.g., when syncing at 1k-2k blocks/s, this means 10k-20k blocks. During -- normal operation, we receive 1 block/20s, meaning at most 1 block. -defaultSpecificArgs :: Monad m => ChainDbSpecificArgs Defaults m blk -defaultSpecificArgs = ChainDbSpecificArgs { +defaultSpecificArgs :: + Monad m + => (RelativeMountPoint -> SomeHasFS m) + -> ChainDbSpecificArgs Defaults m blk +defaultSpecificArgs mkFS = ChainDbSpecificArgs { cdbsBlocksToAddSize = 10 , cdbsCheckInFuture = NoDefault , cdbsGcDelay = secondsToDiffTime 60 , cdbsGcInterval = secondsToDiffTime 10 , cdbsRegistry = NoDefault , cdbsTracer = nullTracer + , cdbsHasFSGsmDB = mkFS $ RelativeMountPoint "gsm" } -- | Default arguments @@ -142,7 +148,7 @@ defaultArgs mkFS diskPolicy = toChainDbArgs (ImmutableDB.defaultArgs immFS) (VolatileDB.defaultArgs volFS) (LgrDB.defaultArgs lgrFS diskPolicy) - defaultSpecificArgs + (defaultSpecificArgs mkFS) where immFS, volFS, lgrFS :: SomeHasFS m @@ -194,6 +200,7 @@ fromChainDbArgs ChainDbArgs{..} = ( , cdbsGcInterval = cdbGcInterval , cdbsCheckInFuture = cdbCheckInFuture , cdbsBlocksToAddSize = cdbBlocksToAddSize + , cdbsHasFSGsmDB = cdbHasFSGsmDB } ) @@ -215,6 +222,7 @@ toChainDbArgs ImmutableDB.ImmutableDbArgs {..} cdbHasFSImmutableDB = immHasFS , cdbHasFSVolatileDB = volHasFS , cdbHasFSLgrDB = lgrHasFS + , cdbHasFSGsmDB = cdbsHasFSGsmDB -- Policy , cdbImmutableDbValidation = immValidationPolicy , cdbVolatileDbValidation = volValidationPolicy diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs index b43a7f99e7..cf1e2b0cb0 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs @@ -1,10 +1,11 @@ module Ouroboros.Consensus.Util.Time ( multipleNominalDelay , nominalDelay + , nominalDiffTimeToMicroseconds , secondsToNominalDiffTime ) where -import Data.Time (DiffTime, NominalDiffTime) +import Data.Time (DiffTime, NominalDiffTime, nominalDiffTimeToSeconds) {------------------------------------------------------------------------------- Operations @@ -26,3 +27,13 @@ nominalDelay = realToFrac secondsToNominalDiffTime :: Double -> NominalDiffTime secondsToNominalDiffTime = realToFrac + +-- TODO property test for this +nominalDiffTimeToMicroseconds :: NominalDiffTime -> Double +nominalDiffTimeToMicroseconds dur = + pico / picoPerMicro + where + pico :: Double + pico = toEnum $ fromEnum $ nominalDiffTimeToSeconds dur + + picoPerMicro = 1000000 diff --git a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs index e8cff3773d..8947e6ab7a 100644 --- a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs +++ b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs @@ -48,6 +48,7 @@ data NodeDBs db = NodeDBs { nodeDBsImm :: db , nodeDBsVol :: db , nodeDBsLgr :: db + , nodeDBsGsm :: db } deriving (Functor, Foldable, Traversable) @@ -56,6 +57,7 @@ emptyNodeDBs = NodeDBs <$> uncheckedNewTVarM Mock.empty <*> uncheckedNewTVarM Mock.empty <*> uncheckedNewTVarM Mock.empty + <*> uncheckedNewTVarM Mock.empty -- | Minimal set of arguments for creating a ChainDB instance for testing purposes. data MinimalChainDbArgs m blk = MinimalChainDbArgs { @@ -86,6 +88,8 @@ fromMinimalChainDbArgs MinimalChainDbArgs {..} = ChainDbArgs { cdbHasFSImmutableDB = SomeHasFS $ simHasFS (nodeDBsImm mcdbNodeDBs') , cdbHasFSVolatileDB = SomeHasFS $ simHasFS (nodeDBsVol mcdbNodeDBs') , cdbHasFSLgrDB = SomeHasFS $ simHasFS (nodeDBsLgr mcdbNodeDBs') + , cdbHasFSGsmDB = SomeHasFS $ simHasFS (nodeDBsGsm mcdbNodeDBs') + , cdbImmutableDbValidation = ImmutableDB.ValidateAllChunks , cdbVolatileDbValidation = VolatileDB.ValidateAll , cdbMaxBlocksPerFile = VolatileDB.mkBlocksPerFile 4 diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 1a473b04ca..7be59f34ac 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -346,7 +346,8 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) (ServerUpdates serverUpdates) -- Set up the client - varCandidates <- uncheckedNewTVarM Map.empty + varCandidates <- uncheckedNewTVarM mempty + varIdlers <- uncheckedNewTVarM mempty varClientState <- uncheckedNewTVarM Genesis varClientResult <- uncheckedNewTVarM Nothing varKnownInvalid <- uncheckedNewTVarM mempty @@ -401,10 +402,11 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) -- client's and server's clock as the tolerable clock skew. client :: StrictTVar m (AnchoredFragment (Header TestBlock)) + -> (m (), m ()) -> Consensus ChainSyncClientPipelined TestBlock m - client varCandidate = + client varCandidate (startIdling, stopIdling) = chainSyncClient ConfigEnv { chainDbView @@ -419,6 +421,8 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) , controlMessageSTM = return Continue , headerMetricsTracer = nullTracer , varCandidate + , startIdling + , stopIdling } -- Set up the server @@ -489,13 +493,14 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) chainSyncTracer chainDbView varCandidates + varIdlers serverId - maxBound $ \varCandidate -> do + maxBound $ \varCandidate idlingSignals -> do atomically $ modifyTVar varFinalCandidates $ Map.insert serverId varCandidate result <- runPipelinedPeer protocolTracer codecChainSyncId clientChannel $ - chainSyncClientPeerPipelined $ client varCandidate + chainSyncClientPeerPipelined $ client varCandidate idlingSignals atomically $ writeTVar varClientResult (Just (ClientFinished result)) return () `catchAlsoLinked` \ex -> do