Skip to content

Commit

Permalink
Hotfixes from PR 1572
Browse files Browse the repository at this point in the history
  • Loading branch information
noonio committed Nov 21, 2024
1 parent 8582200 commit 719025f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
23 changes: 12 additions & 11 deletions hydra-node/src/Hydra/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Hydra.Prelude hiding (TVar, readTVar, seq)
import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
Expand Down Expand Up @@ -98,9 +98,12 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId
pendingDepositsP <- mkProjection [] (output <$> timedOutputEvents) projectPendingDeposits

-- NOTE: we need to reverse the list because we store history in a reversed
-- list in memory but in order on disk
history <- newTVarIO (reverse timedOutputEvents)
nextSeqVar <- newTVarIO 0
let nextSeq = do
seq <- readTVar nextSeqVar
modifyTVar' nextSeqVar (+ 1)
pure seq

(notifyServerRunning, waitForServerRunning) <- setupServerNotification

let serverSettings =
Expand All @@ -117,15 +120,15 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
. simpleCors
$ websocketsOr
defaultConnectionOptions
(wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter)
(wsApp party tracer nextSeq callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter)
(httpApp tracer chain env pparams (atomically $ getLatest commitInfoP) (atomically $ getLatest snapshotUtxoP) (atomically $ getLatest pendingDepositsP) callback)
)
( do
waitForServerRunning
action $
Server
{ sendOutput = \output -> do
timedOutput <- appendToHistory history output
timedOutput <- persistOutput nextSeq output
atomically $ do
update headStatusP output
update commitInfoP output
Expand All @@ -152,13 +155,11 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
_ ->
runSettings settings app

appendToHistory history output = do
persistOutput nextSeq output = do
time <- getCurrentTime
timedOutput <- atomically $ do
seq <- nextSequenceNumber history
let timedOutput = TimedServerOutput{output, time, seq}
modifyTVar' history (timedOutput :)
pure timedOutput
seq <- nextSeq
pure TimedServerOutput{output, time, seq}
append timedOutput
pure timedOutput

Expand Down
37 changes: 19 additions & 18 deletions hydra-node/src/Hydra/API/WSServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ wsApp ::
IsChainState tx =>
Party ->
Tracer IO APIServerLog ->
TVar [TimedServerOutput tx] ->
STM IO Natural ->
(ClientInput tx -> IO ()) ->
-- | Read model to enhance 'Greetings' messages with 'HeadStatus'.
Projection STM.STM (ServerOutput tx) HeadStatus ->
Expand All @@ -65,7 +65,7 @@ wsApp ::
ServerOutputFilter tx ->
PendingConnection ->
IO ()
wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel ServerOutputFilter{txContainsAddr} pending = do
wsApp party tracer nextSeq callback headStatusP headIdP snapshotUtxoP responseChannel ServerOutputFilter{txContainsAddr} pending = do
traceWith tracer NewAPIConnection
let path = requestPath $ pendingRequest pending
queryParams <- uriQuery <$> mkURIBs path
Expand All @@ -74,20 +74,21 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh

let outConfig = mkServerOutputConfig queryParams

-- FIXME: No support of history forwarding anymore (disabled because of memory growing too much)
-- api client can decide if they want to see the past history of server outputs
unless (shouldNotServeHistory queryParams) $
forwardHistory con outConfig
-- unless (shouldNotServeHistory queryParams) $
-- forwardHistory con outConfig

forwardGreetingOnly con
-- forwardGreetingOnly con

withPingThread con 30 (pure ()) $
race_ (receiveInputs con) (sendOutputs chan con outConfig)
where
-- NOTE: We will add a 'Greetings' message on each API server start. This is
-- important to make sure the latest configured 'party' is reaching the
-- client.
forwardGreetingOnly con = do
seq <- atomically $ nextSequenceNumber history
sendGreetings con = do
seq <- atomically nextSeq
headStatus <- atomically getLatestHeadStatus
hydraHeadId <- atomically getLatestHeadId
snapshotUtxo <- atomically getLatestSnapshotUtxo
Expand Down Expand Up @@ -134,11 +135,11 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh
(QueryParam key _) | key == [queryKey|address|] -> True
_other -> False

shouldNotServeHistory qp =
flip any qp $ \case
(QueryParam key val)
| key == [queryKey|history|] -> val == [queryValue|no|]
_other -> False
-- shouldNotServeHistory qp =
-- flip any qp $ \case
-- (QueryParam key val)
-- | key == [queryKey|history|] -> val == [queryValue|no|]
-- _other -> False

sendOutputs chan con outConfig@ServerOutputConfig{addressInTx} = forever $ do
response <- STM.atomically $ readTChan chan
Expand All @@ -161,16 +162,16 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh
-- message to memory
let clientInput = decodeUtf8With lenientDecode $ toStrict msg
time <- getCurrentTime
seq <- atomically $ nextSequenceNumber history
seq <- atomically nextSeq
let timedOutput = TimedServerOutput{output = InvalidInput @tx e clientInput, time, seq}
sendTextData con $ Aeson.encode timedOutput
traceWith tracer (APIInvalidInput e clientInput)

forwardHistory con ServerOutputConfig{addressInTx} = do
rawHist <- STM.atomically (readTVar history)
let hist = filter (isAddressInTx addressInTx) rawHist
let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs
sendTextDatas con $ foldl' encodeAndReverse [] hist
-- forwardHistory con ServerOutputConfig{addressInTx} = do
-- rawHist <- STM.atomically (readTVar history)
-- let hist = filter (isAddressInTx addressInTx) rawHist
-- let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs
-- sendTextDatas con $ foldl' encodeAndReverse [] hist

isAddressInTx addressInTx tx =
case addressInTx of
Expand Down
18 changes: 9 additions & 9 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,13 @@ import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (
MonadSTM (readTQueue, writeTQueue),
modifyTVar',
newTQueueIO,
newTVarIO,
readTVarIO,
writeTVar,
)
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector (
Vector,
Expand Down Expand Up @@ -221,7 +219,7 @@ withReliability ::
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
withReliability tracer MessagePersistence{saveAcks, loadAcks, loadMessages} me otherParties withRawNetwork callback action = do
acksCache <- loadAcks >>= newTVarIO
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList
resendQ <- newTQueueIO
Expand All @@ -235,15 +233,17 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa

allParties = fromList $ sort $ me : otherParties

reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} =
reliableBroadcast _sentMessages ourIndex acksCache Network{broadcast} =
action $
Network
{ broadcast = \msg ->
case msg of
Data{} -> do
localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
saveAcks localCounter
appendMessage msg
-- localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
-- saveAcks localCounter
-- appendMessage msg
localCounter <- atomically $ do
incrementAckCounter
traceWith tracer BroadcastCounter{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
Ping{} -> do
Expand All @@ -259,8 +259,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
writeTVar acksCache newAcks
pure newAcks

cacheMessage msg =
modifyTVar' sentMessages (|> msg)
-- cacheMessage msg =
-- modifyTVar' sentMessages (|> msg)

reliableCallback acksCache sentMessages resend ourIndex =
NetworkCallback $ \(Authenticated (ReliableMsg acknowledged payload) party) -> do
Expand Down

0 comments on commit 719025f

Please sign in to comment.