Skip to content

Commit

Permalink
Revert "[inferno-vc] Cached client and server logging. More granular …
Browse files Browse the repository at this point in the history
…cache locks. Cleanup" (#144)

Reverts #143
  • Loading branch information
smurphy8 authored Oct 16, 2024
1 parent 43ddac5 commit 10057a9
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 184 deletions.
7 changes: 0 additions & 7 deletions inferno-vc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
# Revision History for inferno-vc
*Note*: we use https://pvp.haskell.org/ (MAJOR.MAJOR.MINOR.PATCH)

## 0.3.8.0 -- 2024-10-16
* Added logging to cached client to see hits and misses
* Added logging to server to see what scriptIds are being used to request
fetchObjects and fetchObjectClosureHashes
* Made locks on cache more granular and only fetch a single upstream object per
request

## 0.3.7.1 -- 2024-09-23
* Fix overflowing threadDelay on armv7l

Expand Down
3 changes: 1 addition & 2 deletions inferno-vc/inferno-vc.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: >=1.10
name: inferno-vc
version: 0.3.8.0
version: 0.3.7.1
synopsis: Version control server for Inferno
description: A version control server for Inferno scripts
category: DSL,Scripting
Expand Down Expand Up @@ -69,7 +69,6 @@ library
, QuickCheck
, stm
, unbounded-delays
, random-shuffle

default-language: Haskell2010
default-extensions:
Expand Down
213 changes: 76 additions & 137 deletions inferno-vc/src/Inferno/VersionControl/Client/Cached.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import Control.Concurrent.STM
retry,
writeTVar,
)
import Control.Monad (forM, forM_, guard)
import Control.Monad.Catch (MonadMask, bracket_, tryJust)
import Control.Monad (forM, forM_)
import Control.Monad.Catch (MonadMask, bracket_)
import Control.Monad.Error.Lens (throwing)
import Control.Monad.Except (MonadError (..))
import Control.Monad.IO.Class (MonadIO (..))
Expand All @@ -27,14 +27,15 @@ import Data.Aeson (FromJSON, ToJSON, eitherDecodeStrict, encode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Base64.URL as Base64
import qualified Data.ByteString.Char8 as Char8
import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.ByteString.Lazy as BL
import Data.Either (partitionEithers)
import Data.Generics.Product (HasType, getTyped)
import Data.Generics.Sum (AsType (..))
import Data.List (foldl')
import qualified Data.Map as Map
import qualified Data.Set as Set
import GHC.Generics (Generic)
import qualified Inferno.VersionControl.Client as VCClient
import Inferno.VersionControl.Log (VCCacheTrace (..))
import Inferno.VersionControl.Operations.Error (VCStoreError (..))
import Inferno.VersionControl.Server (VCServerError)
import Inferno.VersionControl.Types
Expand All @@ -43,72 +44,44 @@ import Inferno.VersionControl.Types
VCObjectHash (..),
vcObjectHashToByteString,
)
import Plow.Logging (IOTracer (..), traceWith)
import Servant.Client (ClientEnv, ClientError)
import Servant.Typed.Error (TypedClientM, runTypedClientM)
import System.AtomicWrite.Writer.LazyByteString (atomicWriteFile)
import System.Directory (createDirectoryIfMissing)
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath.Posix ((</>))
import System.IO.Error (isDoesNotExistError)
import System.Random.Shuffle (shuffleM)

data VCCacheEnv = VCCacheEnv
{ cachePath :: FilePath,
cacheObjInFlight :: TVar (Set.Set VCObjectHash),
cacheDepInFlight :: TVar (Set.Set VCObjectHash),
tracer :: IOTracer VCCacheTrace
cacheInFlight :: TVar (Set.Set VCObjectHash)
}
deriving (Generic)

-- | Prevents thundering-herd problem by locking the key 'k' so only a single
-- 'fetchAndSave' runs concurrently
withSingleConcurrentFetch ::
(MonadMask m, MonadIO m, Ord k) =>
-- | A 'TVar' holding the set of keys which are currently being fetched
TVar (Set.Set k) ->
-- | A function that returns Just the value being cached if found locally or
-- Nothing if it hasn't being cached yet. No lock is taken when calling this
-- function
(k -> m (Maybe a)) ->
-- | A function that fetches the value and caches it. A lock is taken so only
-- a single call per key to this function runs concurrently
(k -> m a) ->
-- | The key associated to the value being cached
k ->
m a
withSingleConcurrentFetch keySetRef check fetchAndSave key =
check key >>= \case
Just x -> pure x
Nothing ->
bracket_ acquire release $
-- check again because another thread may have fetched while we were
-- blocked
check key >>= \case
Just x -> pure x
Nothing -> fetchAndSave key
-- | Makes sure only one thread at a time fetches the closure for certain
-- VCObjectHashes
withInFlight :: (MonadMask m, MonadIO m) => VCCacheEnv -> [VCObjectHash] -> m a -> m a
withInFlight VCCacheEnv {cacheInFlight} hashes = bracket_ acquire release
where
acquire = liftIO $ atomically $ do
inFlight <- readTVar keySetRef
if key `Set.member` inFlight
inFlight <- readTVar cacheInFlight
if any (`Set.member` inFlight) hashes
then retry
else do
writeTVar keySetRef $! key `Set.insert` inFlight
writeTVar cacheInFlight $! foldl' (flip Set.insert) inFlight hashes
release = liftIO $ atomically $ do
inFlight <- readTVar keySetRef
writeTVar keySetRef $! key `Set.delete` inFlight
inFlight <- readTVar cacheInFlight
writeTVar cacheInFlight $! foldl' (flip Set.delete) inFlight hashes

data CachedVCClientError
= ClientVCStoreError VCServerError
| ClientServantError ClientError
| LocalVCStoreError VCStoreError
deriving (Show, Generic)

initVCCachedClient :: FilePath -> IOTracer VCCacheTrace -> IO VCCacheEnv
initVCCachedClient cachePath tracer = do
initVCCachedClient :: FilePath -> IO VCCacheEnv
initVCCachedClient cachePath = do
createDirectoryIfMissing True $ cachePath </> "deps"
cacheObjInFlight <- newTVarIO mempty
cacheDepInFlight <- newTVarIO mempty
pure VCCacheEnv {cachePath, cacheObjInFlight, cacheDepInFlight, tracer}
cacheInFlight <- newTVarIO mempty
pure VCCacheEnv {cachePath, cacheInFlight}

fetchVCObjectClosure ::
( MonadError err m,
Expand All @@ -130,116 +103,82 @@ fetchVCObjectClosure ::
VCObjectHash ->
m (Map.Map VCObjectHash (VCMeta a g VCObject))
fetchVCObjectClosure fetchVCObjects remoteFetchVCObjectClosureHashes objHash = do
VCCacheEnv {cacheObjInFlight, cacheDepInFlight} <- asks getTyped
deps <- withSingleConcurrentFetch cacheDepInFlight maybeReadCachedClosureHashes (fetchAndCacheClosureHashes remoteFetchVCObjectClosureHashes) objHash
-- shuffle scriptIds to improve concurrent performance when cache is cold
shuffledDeps <- liftIO $ shuffleM $ objHash : deps
mconcat
<$> mapM (withSingleConcurrentFetch cacheObjInFlight maybeReadCachedVCObject (fetchAndCacheVCObject fetchVCObjects)) shuffledDeps
env@VCCacheEnv {cachePath} <- asks getTyped
deps <-
withInFlight env [objHash] $
liftIO (doesFileExist $ cachePath </> "deps" </> show objHash) >>= \case
False -> do
deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash
liftIO
$ atomicWriteFile
(cachePath </> "deps" </> show objHash)
$ BL.concat [BL.fromStrict (vcObjectHashToByteString h) <> "\n" | h <- deps]
pure deps
True -> fetchVCObjectClosureHashes objHash
withInFlight env deps $ do
(nonLocalHashes, localHashes) <-
partitionEithers
<$> forM
(objHash : deps)
( \depHash -> do
liftIO (doesFileExist $ cachePath </> show depHash) >>= \case
True -> pure $ Right depHash
False -> pure $ Left depHash
)
localObjs <-
Map.fromList
<$> forM
localHashes
( \h ->
(h,) <$> fetchVCObjectUnsafe h
)

maybeReadCachedClosureHashes ::
nonLocalObjs <- liftServantClient $ fetchVCObjects nonLocalHashes
forM_ (Map.toList nonLocalObjs) $ \(h, o) ->
liftIO $ atomicWriteFile (cachePath </> show h) $ encode o
pure $ localObjs `Map.union` nonLocalObjs

fetchVCObjectClosureHashes ::
( MonadError err m,
HasType VCCacheEnv env,
AsType VCStoreError err,
MonadReader env m,
MonadIO m,
MonadMask m
MonadReader env m,
AsType VCStoreError err,
HasType VCCacheEnv env
) =>
VCObjectHash ->
m (Maybe [VCObjectHash])
maybeReadCachedClosureHashes objHash = do
VCCacheEnv {tracer} <- asks getTyped
tryJust (guard . isDoesNotExistError) readCachedClosureHashes >>= \case
Right deps ->
Just deps <$ traceWith tracer (VCCacheDepsHit objHash)
Left _ ->
Nothing <$ traceWith tracer (VCCacheDepsMiss objHash)
where
readCachedClosureHashes = do
path <- cachedDepsPath objHash
deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile path)
forM deps $ \dep -> do
decoded <-
either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $
Base64.decode dep
maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $
digestFromByteString decoded
m [VCObjectHash]
fetchVCObjectClosureHashes h = do
VCCacheEnv {cachePath} <- asks getTyped
let fp = cachePath </> "deps" </> show h
readVCObjectHashTxt fp

fetchAndCacheClosureHashes ::
readVCObjectHashTxt ::
( MonadError err m,
HasType VCCacheEnv env,
HasType ClientEnv env,
AsType VCServerError err,
AsType ClientError err,
MonadReader env m,
AsType VCStoreError err,
MonadIO m
) =>
(VCObjectHash -> VCClient.ClientMWithVCStoreError [VCObjectHash]) ->
VCObjectHash ->
FilePath ->
m [VCObjectHash]
fetchAndCacheClosureHashes remoteFetchVCObjectClosureHashes objHash = do
deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash
path <- cachedDepsPath objHash
liftIO $
atomicWriteFile path $
BL.unlines $
map (BL.fromStrict . vcObjectHashToByteString) deps
pure deps
readVCObjectHashTxt fp = do
deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile fp)
forM deps $ \dep -> do
decoded <- either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $ Base64.decode dep
maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $ digestFromByteString decoded

maybeReadCachedVCObject ::
fetchVCObjectUnsafe ::
( MonadReader r m,
HasType VCCacheEnv r,
MonadError e m,
AsType VCStoreError e,
MonadIO m,
MonadMask m,
FromJSON b
) =>
VCObjectHash ->
m (Maybe (Map.Map VCObjectHash b))
maybeReadCachedVCObject objHash = do
VCCacheEnv {tracer} <- asks getTyped
tryJust (guard . isDoesNotExistError) readCachedVCObject >>= \case
Left _ ->
Nothing <$ traceWith tracer (VCCacheMiss objHash)
Right obj ->
Just (Map.singleton objHash obj) <$ traceWith tracer (VCCacheHit objHash)
where
readCachedVCObject = do
path <- cachedObjPath objHash
either (throwing _Typed . CouldNotDecodeObject objHash) pure
=<< liftIO (eitherDecodeStrict <$> Char8.readFile path)

fetchAndCacheVCObject ::
( MonadError err m,
HasType VCCacheEnv env,
HasType ClientEnv env,
AsType VCServerError err,
AsType ClientError err,
MonadReader env m,
MonadIO m,
ToJSON a,
ToJSON g
) =>
([VCObjectHash] -> VCClient.ClientMWithVCStoreError (Map.Map VCObjectHash (VCMeta a g VCObject))) ->
VCObjectHash ->
m (Map.Map VCObjectHash (VCMeta a g VCObject))
fetchAndCacheVCObject fetchVCObjects objHash = do
objs <- liftServantClient $ fetchVCObjects [objHash]
forM_ (Map.toList objs) $ \(h, o) -> do
path <- cachedObjPath h
liftIO $ atomicWriteFile path $ encode o
pure objs

cachedDepsPath :: (MonadReader r m, HasType VCCacheEnv r) => VCObjectHash -> m FilePath
cachedDepsPath objHash = do
VCCacheEnv {cachePath} <- asks getTyped
pure $ cachePath </> "deps" </> show objHash

cachedObjPath :: (MonadReader r m, HasType VCCacheEnv r) => VCObjectHash -> m FilePath
cachedObjPath objHash = do
m b
fetchVCObjectUnsafe h = do
VCCacheEnv {cachePath} <- asks getTyped
pure $ cachePath </> show objHash
let fp = cachePath </> show h
either (throwing _Typed . CouldNotDecodeObject h) pure =<< liftIO (eitherDecodeStrict <$> Char8.readFile fp)

liftServantClient ::
( MonadError e m,
Expand Down
28 changes: 2 additions & 26 deletions inferno-vc/src/Inferno/VersionControl/Log.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
{-# LANGUAGE OverloadedStrings #-}

module Inferno.VersionControl.Log
( VCServerTrace (..),
VCCacheTrace (..),
vcServerTraceToText,
vcCacheTraceToText,
)
where
module Inferno.VersionControl.Log where

import Data.Text (Text, intercalate, pack)
import Data.Text (Text, pack)
import Inferno.VersionControl.Operations.Error (VCStoreError, vcStoreErrorToString)
import Inferno.VersionControl.Types (VCObjectHash)

data VCServerTrace
= ThrownVCStoreError VCStoreError
Expand All @@ -21,8 +14,6 @@ data VCServerTrace
| ReadJSON FilePath
| ReadTxt FilePath
| DeleteFile FilePath
| VCFetchObjects [VCObjectHash]
| VCFetchObjectClosureHashes VCObjectHash

vcServerTraceToText :: VCServerTrace -> Text
vcServerTraceToText = \case
Expand All @@ -34,18 +25,3 @@ vcServerTraceToText = \case
ThrownVCStoreError e -> pack (vcStoreErrorToString e)
ThrownVCOtherError e -> "Other server error: " <> e
DeleteFile fp -> "Deleting file: " <> pack fp
VCFetchObjects objs -> "FetchObjects " <> intercalate ", " (map (pack . show) objs)
VCFetchObjectClosureHashes obj -> "FetchObjectClosureHashes " <> pack (show obj)

data VCCacheTrace
= VCCacheHit VCObjectHash
| VCCacheMiss VCObjectHash
| VCCacheDepsHit VCObjectHash
| VCCacheDepsMiss VCObjectHash

vcCacheTraceToText :: VCCacheTrace -> Text
vcCacheTraceToText = \case
VCCacheHit h -> "VC Cache hit " <> pack (show h)
VCCacheMiss h -> "VC Cache miss " <> pack (show h)
VCCacheDepsHit h -> "VC Cache deps hit " <> pack (show h)
VCCacheDepsMiss h -> "VC Cache deps miss " <> pack (show h)
Loading

0 comments on commit 10057a9

Please sign in to comment.