Skip to content

Commit

Permalink
refactor: add observation module (#3232)
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-chavez authored Feb 20, 2024
1 parent 32e1900 commit 6d506df
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 141 deletions.
1 change: 1 addition & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ library
PostgREST.Error
PostgREST.Logger
PostgREST.MediaType
PostgREST.Observation
PostgREST.Query
PostgREST.Query.QueryBuilder
PostgREST.Query.SqlFragment
Expand Down
20 changes: 10 additions & 10 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,32 @@ import qualified Data.ByteString.Lazy as LBS
import Network.Socket
import Network.Socket.ByteString

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.Observation (Observation (..))

import qualified PostgREST.AppState as AppState
import qualified PostgREST.Config as Config

import Protolude
import Protolude.Partial (fromJust)

runAdmin :: AppConfig -> AppState -> Warp.Settings -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings =
runAdmin :: AppConfig -> AppState -> Warp.Settings -> (Observation -> IO ()) -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings observer =
whenJust (AppState.getSocketAdmin appState) $ \adminSocket -> do
AppState.logWithZTime appState $ "Admin server listening on port " <> show (fromIntegral (fromJust configAdminServerPort) :: Integer)
observer $ AdminStartObs configAdminServerPort
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState conf
adminApp = admin appState conf observer

-- | PostgREST admin application
admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
admin :: AppState.AppState -> AppConfig -> (Observation -> IO ()) -> Wai.Application
admin appState appConfig observer req respond = do
isMainAppReachable <- isRight <$> reachMainApp (AppState.getSocketREST appState)
isSchemaCacheLoaded <- isJust <$> AppState.getSchemaCache appState
isConnectionUp <-
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState appConfig (SQL.sql "SELECT 1")
else isRight <$> AppState.usePool appState appConfig (SQL.sql "SELECT 1") observer

case Wai.pathInfo req of
["ready"] ->
Expand Down
47 changes: 25 additions & 22 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import PostgREST.Auth (AuthResult (..))
import PostgREST.Config (AppConfig (..))
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.Observation (Observation (..))
import PostgREST.Query (DbHandler)
import PostgREST.Response.Performance (ServerTiming (..),
serverTimingHeader)
Expand All @@ -66,26 +67,26 @@ import System.TimeIt (timeItT)

type Handler = ExceptT Error

run :: AppState -> IO ()
run appState = do
AppState.logWithZTime appState $ "Starting PostgREST " <> T.decodeUtf8 prettyVersion <> "..."
run :: AppState -> (Observation -> IO ()) -> IO ()
run appState observer = do
observer $ AppStartObs prettyVersion

conf@AppConfig{..} <- AppState.getConfig appState
AppState.connectionWorker appState -- Loads the initial SchemaCache
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.connectionWorker appState) (AppState.reReadConfig False appState)
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.connectionWorker appState) (AppState.reReadConfig False appState observer)
-- reload schema cache + config on NOTIFY
AppState.runListener conf appState
AppState.runListener conf appState observer

Admin.runAdmin conf appState $ serverSettings conf
Admin.runAdmin conf appState (serverSettings conf) observer

let app = postgrest conf appState (AppState.connectionWorker appState)
let app = postgrest conf appState (AppState.connectionWorker appState) observer

what <- case configServerUnixSocket of
Just path -> pure $ "unix socket " <> show path
case configServerUnixSocket of
Just path -> do
observer $ AppServerUnixObs path
Nothing -> do
port <- NS.socketPort $ AppState.getSocketREST appState
pure $ "port " <> show port
AppState.logWithZTime appState $ "Listening on " <> what
observer $ AppServerPortObs port

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app

Expand All @@ -97,8 +98,8 @@ serverSettings AppConfig{..} =
& setServerName ("postgrest/" <> prettyVersion)

-- | PostgREST application
postgrest :: AppConfig -> AppState.AppState -> IO () -> Wai.Application
postgrest conf appState connWorker =
postgrest :: AppConfig -> AppState.AppState -> IO () -> (Observation -> IO ()) -> Wai.Application
postgrest conf appState connWorker observer =
traceHeaderMiddleware conf .
Cors.middleware (configServerCorsAllowedOrigins conf) .
Auth.middleware appState .
Expand All @@ -115,7 +116,7 @@ postgrest conf appState connWorker =
let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
Expand All @@ -134,8 +135,9 @@ postgrestResponse
-> PgVersion
-> AuthResult
-> Wai.Request
-> (Observation -> IO ())
-> Handler IO Wai.Response
postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@AuthResult{..} req = do
postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@AuthResult{..} req observer = do
sCache <-
case maybeSchemaCache of
Just sCache ->
Expand All @@ -151,22 +153,23 @@ postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@
ApiRequest.userApiRequest conf req body sCache

let jwtTime = if configServerTimingEnabled then Auth.getJwtDur req else Nothing
handleRequest authResult conf appState (Just authRole /= configDbAnonRole) configDbPreparedStatements pgVer apiRequest sCache jwtTime parseTime
handleRequest authResult conf appState (Just authRole /= configDbAnonRole) configDbPreparedStatements pgVer apiRequest sCache jwtTime parseTime observer

runDbHandler :: AppState.AppState -> AppConfig -> SQL.IsolationLevel -> SQL.Mode -> Bool -> Bool -> DbHandler b -> Handler IO b
runDbHandler appState config isoLvl mode authenticated prepared handler = do
runDbHandler :: AppState.AppState -> AppConfig -> SQL.IsolationLevel -> SQL.Mode -> Bool -> Bool -> (Observation -> IO ()) -> DbHandler b -> Handler IO b
runDbHandler appState config isoLvl mode authenticated prepared observer handler = do
dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState config . transaction isoLvl mode $ runExceptT handler
AppState.usePool appState config (transaction isoLvl mode $ runExceptT handler) observer

resp <-
liftEither . mapLeft Error.PgErr $
mapLeft (Error.PgError authenticated) dbResp

liftEither resp

handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool -> PgVersion -> ApiRequest -> SchemaCache -> Maybe Double -> Maybe Double -> Handler IO Wai.Response
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime =
handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool -> PgVersion -> ApiRequest -> SchemaCache ->
Maybe Double -> Maybe Double -> (Observation -> IO ()) -> Handler IO Wai.Response
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime observer =
case (iAction, iTarget) of
(ActionRead headersOnly, TargetIdent identifier) -> do
(planTime', wrPlan) <- withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
Expand Down Expand Up @@ -231,7 +234,7 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A
roleSettings = fromMaybe mempty (HM.lookup authRole $ configRoleSettings conf)
roleIsoLvl = HM.findWithDefault SQL.ReadCommitted authRole $ configRoleIsoLvl conf
runQuery isoLvl funcSets mode query =
runDbHandler appState conf isoLvl mode authenticated prepared $ do
runDbHandler appState conf isoLvl mode authenticated prepared observer $ do
Query.setPgLocals conf authClaims authRole (HM.toList roleSettings) funcSets apiReq
Query.runPreReq conf
query
Expand Down
Loading

0 comments on commit 6d506df

Please sign in to comment.