Skip to content

Commit

Permalink
txgen-mvar: pass NixServiceOptions to keepalive
Browse files Browse the repository at this point in the history
This took a fair amount of rearrangement to broaden the constant
environment in order to pass the keepalive interval in the
NixServiceOptions around. So a few different things happened:

I. create EnvConsts structure encompassing
  A. AsyncBenchmarkControl TVar (potentially changing to IORef)
  B. IOManager
  C. Maybe NixServiceOptions
  This moves the mutable reference in A. to the Reader environment from
  the State of the ExceptionT Env.Error (RWST EnvConsts () Env IO)
  ActionM monad. The reference stays constant though the referenced
  data changes.
II. pass EnvConsts to runScript and runSelftest
III. update Env.hs and NixService.hs accessors

Some of it represents changing a little of the design of the Env and
ActionM once again even after the prior commits, so a fair amount of
squashing commits that entirely redo earlier commits' changes and
rewriting commit messages will need to be done in the sequel.
  • Loading branch information
NadiaYvette committed May 23, 2024
1 parent 60b4254 commit c401430
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 64 deletions.
34 changes: 19 additions & 15 deletions bench/tx-generator/src/Cardano/Benchmarking/Command.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ where
#endif

import Cardano.Benchmarking.Compiler (compileOptions)
import Cardano.Benchmarking.LogTypes (AsyncBenchmarkControl (..))
import Cardano.Benchmarking.LogTypes (AsyncBenchmarkControl (..), EnvConsts (..))
import Cardano.Benchmarking.Script (parseScriptFileAeson, runScript)
import Cardano.Benchmarking.Script.Aeson (parseJSONFile, prettyPrint)
import Cardano.Benchmarking.Script.Env as Env (Env (Env, envThreads), mkNewEnv)
import Cardano.Benchmarking.Script.Env as Env (emptyEnv, newEnvConsts)
import Cardano.Benchmarking.Script.Selftest (runSelftest)
import Cardano.Benchmarking.Version as Version
import Cardano.TxGenerator.Setup.NixService
import Ouroboros.Network.NodeToClient (withIOManager)
import Ouroboros.Network.NodeToClient (IOManager, withIOManager)

import Prelude

Expand Down Expand Up @@ -66,41 +66,45 @@ data Command
| VersionCmd

runCommand :: IO ()
runCommand = withIOManager $ \iocp -> do
env <- installSignalHandler
runCommand = withIOManager runCommand'

runCommand' :: IOManager -> IO ()
runCommand' iocp = do
envConsts <- installSignalHandler
cmd <- customExecParser
(prefs showHelpOnEmpty)
(info commandParser mempty)
case cmd of
Json file -> do
script <- parseScriptFileAeson file
runScript env script iocp >>= handleError . fst
JsonHL file nodeConfigOverwrite cardanoTracerOverwrite -> do
opts <- parseJSONFile fromJSON file
Json actionFile -> do
script <- parseScriptFileAeson actionFile
runScript emptyEnv script envConsts >>= handleError . fst
JsonHL nixSvcOptsFile nodeConfigOverwrite cardanoTracerOverwrite -> do
opts <- parseJSONFile fromJSON nixSvcOptsFile
finalOpts <- mangleTracerConfig cardanoTracerOverwrite <$> mangleNodeConfig nodeConfigOverwrite opts
let consts = envConsts { envNixSvcOpts = Just finalOpts }

Prelude.putStrLn $
"--> initial options:\n" ++ show opts ++
"\n--> final options:\n" ++ show finalOpts

case compileOptions finalOpts of
Right script -> runScript env script iocp >>= handleError . fst
Right script -> runScript emptyEnv script consts >>= handleError . fst
err -> die $ "tx-generator:Cardano.Command.runCommand JsonHL: " ++ show err
Compile file -> do
o <- parseJSONFile fromJSON file
case compileOptions o of
Right script -> BSL.putStr $ prettyPrint script
Left err -> die $ "tx-generator:Cardano.Command.runCommand Compile: " ++ show err
Selftest outFile -> runSelftest env iocp outFile >>= handleError
Selftest outFile -> runSelftest emptyEnv envConsts outFile >>= handleError
VersionCmd -> runVersionCommand
where
handleError :: Show a => Either a b -> IO ()
handleError = \case
Right _ -> exitSuccess
Left err -> die $ "tx-generator:Cardano.Command.runCommand handleError: " ++ show err
installSignalHandler :: IO Env
installSignalHandler :: IO EnvConsts
installSignalHandler = do
env@Env { .. } <- STM.atomically mkNewEnv
envConsts@EnvConsts { .. } <- STM.atomically $ newEnvConsts iocp Nothing
abc <- STM.atomically $ STM.readTVar envThreads
_ <- pure abc
#ifdef UNIX
Expand Down Expand Up @@ -137,7 +141,7 @@ runCommand = withIOManager $ \iocp -> do
Fold.forM_ [Sig.sigINT, Sig.sigTERM] $ \sig ->
Sig.installHandler sig signalHandler $ Just fullSignalSet
#endif
pure env
pure envConsts

mangleNodeConfig :: Maybe FilePath -> NixServiceOptions -> IO NixServiceOptions
mangleNodeConfig fp opts = case (getNodeConfigFile opts, fp) of
Expand Down
2 changes: 1 addition & 1 deletion bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import Cardano.Benchmarking.Wallet (TxStream)
import Cardano.Logging
import Cardano.Node.Configuration.NodeAddress
import Cardano.Prelude
import Cardano.TxGenerator.Setup.NixService
import Cardano.TxGenerator.Setup.NixService as Nix (NodeDescription (..))
import Cardano.TxGenerator.Types (NumberOfTxs, TPSRate, TxGenError (..))

import Prelude (String)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
Expand Down Expand Up @@ -46,7 +47,7 @@ import Ouroboros.Network.KeepAlive
import Ouroboros.Network.Magic
import Ouroboros.Network.Mux (MiniProtocolCb (..), MuxMode (..),
OuroborosApplication (..), OuroborosBundle, RunMiniProtocol (..))
import Ouroboros.Network.NodeToClient (IOManager, chainSyncPeerNull)
import Ouroboros.Network.NodeToClient (chainSyncPeerNull)
import Ouroboros.Network.NodeToNode (NetworkConnectTracers (..))
import qualified Ouroboros.Network.NodeToNode as NtN
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
Expand All @@ -64,14 +65,15 @@ import Ouroboros.Network.Protocol.PeerSharing.Client (PeerSharingClien

import Ouroboros.Network.Snocket (socketSnocket)

import Cardano.Benchmarking.LogTypes (SendRecvConnect, SendRecvTxSubmission2)
import Cardano.Benchmarking.LogTypes (EnvConsts (..), SendRecvConnect, SendRecvTxSubmission2)
import Cardano.TxGenerator.Setup.NixService (getKeepaliveTimeout')

type CardanoBlock = Consensus.CardanoBlock StandardCrypto
type ConnectClient = AddrInfo -> TxSubmissionClient (GenTxId CardanoBlock) (GenTx CardanoBlock) IO () -> IO ()

benchmarkConnectTxSubmit
:: forall blk. (blk ~ CardanoBlock, RunNode blk )
=> IOManager
=> EnvConsts
-> Tracer IO SendRecvConnect
-> Tracer IO SendRecvTxSubmission2
-> CodecConfig CardanoBlock
Expand All @@ -82,9 +84,9 @@ benchmarkConnectTxSubmit
-- ^ the particular txSubmission peer
-> IO ()

benchmarkConnectTxSubmit ioManager handshakeTracer submissionTracer codecConfig networkMagic remoteAddr myTxSubClient =
benchmarkConnectTxSubmit EnvConsts { .. } handshakeTracer submissionTracer codecConfig networkMagic remoteAddr myTxSubClient =
NtN.connectTo
(socketSnocket ioManager)
(socketSnocket envIOManager)
NetworkConnectTracers {
nctMuxTracer = mempty,
nctHandshakeTracer = handshakeTracer
Expand Down Expand Up @@ -178,7 +180,7 @@ benchmarkConnectTxSubmit ioManager handshakeTracer submissionTracer codecConfig
mempty
keepAliveRng
(continueForever (Proxy :: Proxy IO)) them peerGSVMap
(KeepAliveInterval 10)
(KeepAliveInterval $ getKeepaliveTimeout' envNixSvcOpts)

-- the null block fetch client
blockFetchClientNull
Expand Down
24 changes: 20 additions & 4 deletions bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

module Cardano.Benchmarking.LogTypes
( AsyncBenchmarkControl (..)
, BenchTracers(..)
, NodeToNodeSubmissionTrace(..)
, BenchTracers (..)
, EnvConsts (..)
, NodeToNodeSubmissionTrace (..)
, SendRecvConnect
, SendRecvTxSubmission2
, SubmissionSummary(..)
, TraceBenchTxSubmit(..)
, SubmissionSummary (..)
, TraceBenchTxSubmit (..)
) where

import Cardano.Api
Expand All @@ -33,9 +34,11 @@ import Cardano.Tracing.OrphanInstances.Consensus ()
import Cardano.Tracing.OrphanInstances.Network ()
import Cardano.Tracing.OrphanInstances.Shelley ()
import Cardano.TxGenerator.PlutusContext (PlutusBudgetSummary)
import Cardano.TxGenerator.Setup.NixService (NixServiceOptions (..))
import Cardano.TxGenerator.Types (TPSRate)
import Ouroboros.Consensus.Ledger.SupportsMempool (GenTx, GenTxId)
import Ouroboros.Network.Driver (TraceSendRecv (..))
import Ouroboros.Network.IOManager (IOManager)
import Ouroboros.Network.NodeToNode (NodeToNodeVersion, RemoteConnectionId)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.TxSubmission2.Type (TxSubmission2)
Expand All @@ -44,6 +47,7 @@ import Prelude

import qualified Codec.CBOR.Term as CBOR
import qualified Control.Concurrent.Async as Async (Async)
import qualified Control.Concurrent.STM as STM (TVar)
import Data.Text
import Data.Time.Clock (DiffTime, NominalDiffTime)
import GHC.Generics
Expand All @@ -61,6 +65,18 @@ data AsyncBenchmarkControl =
-- ^ IO action to shut down the feeder thread.
}

data EnvConsts =
EnvConsts
{ envIOManager :: IOManager
, envThreads :: STM.TVar (Maybe AsyncBenchmarkControl)
-- ^ The reference needs to be a constant, but the referred-to data
-- (`AsyncBenchmarkControl`) needs to be able to be initialized.
-- This could in principle be an `IORef` instead of a `STM.TVar`.
, envNixSvcOpts :: Maybe NixServiceOptions
-- ^ There are situations `NixServiceOptions` won't be available and
-- defaults will have to be used.
}

data BenchTracers =
BenchTracers
{ btTxSubmit_ :: Trace IO (TraceBenchTxSubmit TxId)
Expand Down
15 changes: 7 additions & 8 deletions bench/tx-generator/src/Cardano/Benchmarking/Script.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import Cardano.Benchmarking.LogTypes
import Cardano.Benchmarking.Script.Action
import Cardano.Benchmarking.Script.Aeson (parseScriptFileAeson)
import Cardano.Benchmarking.Script.Core (setProtocolParameters)
import qualified Cardano.Benchmarking.Script.Env as Env (ActionM, Env (Env, envThreads),
Error (TxGenError), getEnvThreads, runActionMEnv, traceError)
import qualified Cardano.Benchmarking.Script.Env as Env (ActionM, Env (..), Error (TxGenError),
getEnvThreads, runActionMEnv, traceError)
import Cardano.Benchmarking.Script.Types
import qualified Cardano.TxGenerator.Types as Types (TxGenError (..))
import Ouroboros.Network.NodeToClient (IOManager)

import Prelude

Expand All @@ -33,19 +32,19 @@ import System.Mem (performGC)

type Script = [Action]

runScript :: Env.Env -> Script -> IOManager -> IO (Either Env.Error (), AsyncBenchmarkControl)
runScript env script iom = do
runScript :: Env.Env -> Script -> EnvConsts -> IO (Either Env.Error (), AsyncBenchmarkControl)
runScript env script constants@EnvConsts { .. } = do
result <- go
performGC
threadDelay $ 150 * 1_000
return result
where
go :: IO (Either Env.Error (), AsyncBenchmarkControl)
go = Env.runActionMEnv env execScript iom >>= \case
go = Env.runActionMEnv env execScript constants >>= \case
(Right abc, env', ()) -> do
cleanup env' shutDownLogging
pure (Right (), abc)
(Left err, env'@Env.Env { .. }, ()) -> do
(Left err, env', ()) -> do
cleanup env' (Env.traceError (show err) >> shutDownLogging)
abcMaybe <- STM.atomically $ STM.readTVar envThreads
case abcMaybe of
Expand All @@ -55,7 +54,7 @@ runScript env script iom = do
, "AsyncBenchmarkControl uninitialized" ]
where
cleanup :: Env.Env -> Env.ActionM () -> IO ()
cleanup env' acts = void $ Env.runActionMEnv env' acts iom
cleanup env' acts = void $ Env.runActionMEnv env' acts constants
execScript :: Env.ActionM AsyncBenchmarkControl
execScript = do
setProtocolParameters QueryLocalNode
Expand Down
5 changes: 3 additions & 2 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import Prelude

import Control.Concurrent (threadDelay)
import Control.Monad
import Control.Monad.Trans.RWS.Strict (ask)
import "contra-tracer" Control.Tracer (Tracer (..))
import Data.ByteString.Lazy.Char8 as BSL (writeFile)
import Data.Ratio ((%))
Expand Down Expand Up @@ -136,9 +137,9 @@ getConnectClient = do
(Testnet networkMagic) <- getEnvNetworkId
protocol <- getEnvProtocol
void $ return $ btSubmission2_ tracers
ioManager <- askIOManager
envConsts <- lift ask
return $ benchmarkConnectTxSubmit
ioManager
envConsts
(Tracer $ traceWith (btConnect_ tracers))
mempty -- (btSubmission2_ tracers)
(protocolToCodecConfig protocol)
Expand Down
39 changes: 23 additions & 16 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
Expand All @@ -28,13 +28,16 @@ ran into circular dependency issues during the above transition.
-}
module Cardano.Benchmarking.Script.Env (
ActionM
, Env (Env, envThreads)
, Env (..)
, Error (..)
, mkNewEnv
, emptyEnv
, newEnvConsts
, runActionMEnv
, liftTxGenError
, liftIOSafe
, askIOManager
, askNixSvcOpts
, askEnvThreads
, traceDebug
, traceError
, traceBenchTxSubmit
Expand Down Expand Up @@ -72,6 +75,7 @@ import Cardano.Ledger.Crypto (StandardCrypto)
import Cardano.Logging
import Cardano.Node.Protocol.Types (SomeConsensusProtocol)
import Cardano.TxGenerator.PlutusContext (PlutusBudgetSummary)
import Cardano.TxGenerator.Setup.NixService as Nix (NixServiceOptions)
import Cardano.TxGenerator.Types (TxGenError (..))
import Ouroboros.Network.NodeToClient (IOManager)

Expand Down Expand Up @@ -102,11 +106,9 @@ data Env = Env { -- | 'Cardano.Api.ProtocolParameters' is ultimately
, envNetworkId :: Maybe NetworkId
, envSocketPath :: Maybe FilePath
, envKeys :: Map String (SigningKey PaymentKey)
, envThreads :: STM.TVar (Maybe AsyncBenchmarkControl)
, envWallets :: Map String WalletRef
, envSummary :: Maybe PlutusBudgetSummary
}

-- | `Env` uses `Maybe` to represent values that might be uninitialized.
-- This being empty means `Nothing` is used across the board, along with
-- all of the `Map.Map` structures being `Map.empty`.
Expand All @@ -118,24 +120,22 @@ emptyEnv = Env { protoParams = Nothing
, envProtocol = Nothing
, envNetworkId = Nothing
, envSocketPath = Nothing
-- This never escapes: it's always overridden.
, envThreads = undefined
, envWallets = Map.empty
, envSummary = Nothing
}

mkNewEnv :: STM Env
mkNewEnv = do
newEnvConsts :: IOManager -> Maybe Nix.NixServiceOptions -> STM Tracer.EnvConsts
newEnvConsts envIOManager envNixSvcOpts = do
envThreads <- STM.newTVar Nothing
pure emptyEnv { envThreads }
pure Tracer.EnvConsts { .. }

-- | This abbreviates an `ExceptT` and `RWST` with particular types
-- used as parameters.
type ActionM a = ExceptT Error (RWST IOManager () Env IO) a
type ActionM a = ExceptT Error (RWST Tracer.EnvConsts () Env IO) a

-- | This runs an `ActionM` starting with the `Env` being passed.
runActionMEnv :: Env -> ActionM ret -> IOManager -> IO (Either Error ret, Env, ())
runActionMEnv env action iom = RWS.runRWST (runExceptT action) iom env
runActionMEnv :: Env -> ActionM ret -> Tracer.EnvConsts -> IO (Either Error ret, Env, ())
runActionMEnv env action envConsts = RWS.runRWST (runExceptT action) envConsts env

-- | 'Error' adds two cases to 'Cardano.TxGenerator.Types.TxGenError'
-- which in turn wraps 'Cardano.Api.Error' implicit contexts to a
Expand Down Expand Up @@ -166,7 +166,14 @@ liftIOSafe a = liftIO a >>= either liftTxGenError pure

-- | Accessor for the `IOManager` reader monad aspect of the `RWST`.
askIOManager :: ActionM IOManager
askIOManager = lift RWS.ask
askIOManager = lift $ RWS.asks Tracer.envIOManager

-- | Accessor for the `NixServiceOptions` reader monad aspect of the `RWST`.
askNixSvcOpts :: ActionM (Maybe Nix.NixServiceOptions)
askNixSvcOpts = lift $ RWS.asks Tracer.envNixSvcOpts

askEnvThreads :: ActionM (STM.TVar (Maybe AsyncBenchmarkControl))
askEnvThreads = lift $ RWS.asks Tracer.envThreads

-- | Helper to modify `Env` record fields.
modifyEnv :: (Env -> Env) -> ActionM ()
Expand Down Expand Up @@ -203,7 +210,7 @@ setEnvSocketPath val = modifyEnv (\e -> e { envSocketPath = Just val })
-- | Write accessor for `envThreads`.
setEnvThreads :: AsyncBenchmarkControl -> ActionM ()
setEnvThreads abc = do
abcTVar <- lift $ RWS.gets envThreads
abcTVar <- lift $ RWS.asks Tracer.envThreads
liftIO do STM.atomically $ abcTVar `STM.writeTVar` Just abc

-- | Write accessor for `envWallets`.
Expand Down Expand Up @@ -260,7 +267,7 @@ getEnvSocketPath = File <$> getEnvVal envSocketPath "SocketPath"
-- | Read accessor for `envThreads`.
getEnvThreads :: ActionM (Maybe AsyncBenchmarkControl)
getEnvThreads = do
abcTVar <- lift $ RWS.gets envThreads
abcTVar <- lift $ RWS.asks Tracer.envThreads
liftIO do STM.atomically $ STM.readTVar abcTVar

-- | Read accessor for `envWallets`.
Expand Down
Loading

0 comments on commit c401430

Please sign in to comment.