Skip to content

Commit

Permalink
feat: connection pool metrics in admin server
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-chavez committed Apr 23, 2024
1 parent 6abfff5 commit 66885b8
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- #2676, Performance improvement on bulk json inserts, around 10% increase on requests per second by removing `json_typeof` from write queries - @steve-chavez
- #3214, Log connection pool events on log-level=info - @steve-chavez
- #3435, Add log-level=debug, for development purposes - @steve-chavez
- #1526, Add `/metrics` endpoint on admin server - @steve-chavez
- Exposes connection pool metrics

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ library
PostgREST.Error
PostgREST.Logger
PostgREST.MediaType
PostgREST.Metrics
PostgREST.Observation
PostgREST.Query
PostgREST.Query.QueryBuilder
Expand Down Expand Up @@ -124,6 +125,7 @@ library
, optparse-applicative >= 0.13 && < 0.19
, parsec >= 3.1.11 && < 3.2
, postgresql-libpq >= 0.10
, prometheus-client >= 1.1.1 && < 1.2.0
, protolude >= 0.3.1 && < 0.4
, regex-tdfa >= 1.2.2 && < 1.4
, retry >= 0.7.4 && < 0.10
Expand Down
5 changes: 5 additions & 0 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import Network.Socket.ByteString

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.Metrics (metricsToText)
import PostgREST.Observation (Observation (..))

import qualified PostgREST.AppState as AppState
import qualified PostgREST.Config as Config


import Protolude

runAdmin :: AppConfig -> AppState -> Warp.Settings -> IO ()
Expand Down Expand Up @@ -56,6 +58,9 @@ admin appState appConfig req respond = do
["schema_cache"] -> do
sCache <- AppState.getSchemaCache appState
respond $ Wai.responseLBS HTTP.status200 [] (maybe mempty JSON.encode sCache)
["metrics"] -> do
mets <- metricsToText
respond $ Wai.responseLBS HTTP.status200 [] mets
_ ->
respond $ Wai.responseLBS HTTP.status404 [] mempty

Expand Down
25 changes: 15 additions & 10 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Socket as NS
import qualified PostgREST.Error as Error
import qualified PostgREST.Logger as Logger
import qualified PostgREST.Metrics as Metrics
import PostgREST.Observation
import PostgREST.Version (prettyVersion)
import System.TimeIt (timeItT)
Expand Down Expand Up @@ -111,25 +112,28 @@ data AppState = AppState
, stateSocketREST :: NS.Socket
-- | Network socket for the admin UI
, stateSocketAdmin :: Maybe NS.Socket
-- | Logger state
, stateLogger :: Logger.LoggerState
-- | Observation handler
, stateObserver :: ObservationHandler
, stateLogger :: Logger.LoggerState
, stateMetrics :: Metrics.MetricsState
}

type AppSockets = (NS.Socket, Maybe NS.Socket)


init :: AppConfig -> IO AppState
init conf@AppConfig{configLogLevel} = do
loggerState <- Logger.init
let observer = Logger.observationLogger loggerState configLogLevel
init conf@AppConfig{configLogLevel, configDbPoolSize} = do
loggerState <- Logger.init
metricsState <- Metrics.init configDbPoolSize
let observer = liftA2 (>>) (Logger.observationLogger loggerState configLogLevel) (Metrics.observationMetrics metricsState)

pool <- initPool conf observer
(sock, adminSock) <- initSockets conf
state' <- initWithPool (sock, adminSock) pool conf loggerState observer
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState observer
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}

initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> ObservationHandler -> IO AppState
initWithPool (sock, adminSock) pool conf loggerState observer = do
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do

appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
Expand All @@ -145,8 +149,9 @@ initWithPool (sock, adminSock) pool conf loggerState observer = do
<*> C.newCache Nothing
<*> pure sock
<*> pure adminSock
<*> pure loggerState
<*> pure observer
<*> pure loggerState
<*> pure metricsState

debWorker <-
let decisecond = 100000 in
Expand All @@ -156,7 +161,7 @@ initWithPool (sock, adminSock) pool conf loggerState observer = do
, debounceEdge = leadingEdge -- runs the worker at the start and the end
}

return appState { debouncedConnectionWorker = debWorker }
return appState { debouncedConnectionWorker = debWorker}

destroy :: AppState -> IO ()
destroy = destroyPool
Expand Down
4 changes: 4 additions & 0 deletions src/PostgREST/Logger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ observationLogger loggerState logLevel obs = case obs of
o@(SchemaCacheLoadedObs _) -> do
when (logLevel >= LogDebug) $ do
logWithZTime loggerState $ observationMessage o
PoolRequest ->
pure ()
PoolRequestFullfilled ->
pure ()
o ->
logWithZTime loggerState $ observationMessage o

Expand Down
55 changes: 55 additions & 0 deletions src/PostgREST/Metrics.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
module PostgREST.Metrics
( init
, MetricsState (..)
, observationMetrics
, metricsToText
) where

import qualified Data.ByteString.Lazy as LBS
import qualified Hasql.Pool.Observation as SQL

import qualified Prometheus as Prom

import PostgREST.Observation

import Protolude

data MetricsState = MetricsState
{ poolTimeouts :: Prom.Counter
, poolAvailable :: Prom.Gauge
, poolWaiting :: Prom.Gauge
, poolMaxSize :: Prom.Gauge
}

init :: Int -> IO MetricsState
init poolMaxSize = do
timeouts <- Prom.register $ Prom.counter (Prom.Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")
available <- Prom.register $ Prom.gauge (Prom.Info "pgrst_db_pool_available" "Available connections in the pool")
waiting <- Prom.register $ Prom.gauge (Prom.Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")
maxSize <- Prom.register $ Prom.gauge (Prom.Info "pgrst_db_pool_max" "Max pool connections")
Prom.setGauge maxSize (fromIntegral poolMaxSize)
pure $ MetricsState timeouts available waiting maxSize

observationMetrics :: MetricsState -> ObservationHandler
observationMetrics MetricsState{poolTimeouts, poolAvailable, poolWaiting} obs = case obs of
(PoolAcqTimeoutObs _) -> do
Prom.incCounter poolTimeouts
(HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of
SQL.ReadyForUseConnectionStatus -> do
Prom.incGauge poolAvailable
SQL.InUseConnectionStatus -> do
Prom.decGauge poolAvailable
SQL.TerminatedConnectionStatus _ -> do
Prom.decGauge poolAvailable
SQL.ConnectingConnectionStatus -> pure ()
PoolRequest ->
Prom.incGauge poolWaiting
PoolRequestFullfilled ->
Prom.decGauge poolWaiting
_ ->
pure ()

metricsToText :: IO LBS.ByteString
metricsToText = Prom.exportMetricsAsText
3 changes: 3 additions & 0 deletions src/PostgREST/Observation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ data Observation
| QueryErrorCodeHighObs SQL.UsageError
| PoolAcqTimeoutObs SQL.UsageError
| HasqlPoolObs SQL.Observation
| PoolRequest
| PoolRequestFullfilled

type ObservationHandler = Observation -> IO ()

Expand Down Expand Up @@ -125,6 +127,7 @@ observationMessage = \case
SQL.ReleaseConnectionTerminationReason -> "release"
SQL.NetworkErrorConnectionTerminationReason _ -> "network error" -- usage error is already logged, no need to repeat the same message.
)
_ -> mempty
where
showMillis :: Double -> Text
showMillis x = toS $ showFFloat (Just 1) (x * 1000) ""
Expand Down
7 changes: 7 additions & 0 deletions src/PostgREST/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import PostgREST.Config (AppConfig (..),
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.MediaType (MediaType (..))
import PostgREST.Observation (Observation (..))
import PostgREST.Plan (ActionPlan (..),
CallReadPlan (..),
CrudPlan (..),
Expand Down Expand Up @@ -77,10 +78,16 @@ data QueryResult
runQuery :: AppState.AppState -> AppConfig -> AuthResult -> ApiRequest -> ActionPlan -> SchemaCache -> PgVersion -> Bool -> ExceptT Error IO QueryResult
runQuery _ _ _ _ (NoDb x) _ _ _ = pure $ NoDbResult x
runQuery appState config AuthResult{..} apiReq (Db plan) sCache pgVer authenticated = do
let observer = AppState.getObserver appState

lift $ observer PoolRequest

dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState (transaction isoLvl txMode $ runExceptT dbHandler)

lift $ observer PoolRequestFullfilled

resp <-
liftEither . mapLeft Error.PgErr $
mapLeft (Error.PgError authenticated) dbResp
Expand Down
12 changes: 12 additions & 0 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1427,3 +1427,15 @@ def test_multiple_func_settings(defaultenv):
response = postgrest.session.post("/rpc/multiple_func_settings_test")

assert response.text == '[{"work_mem":"5000kB","statement_timeout":"10s"}]'


def test_admin_metrics(defaultenv):
"Should get metrics from the admin endpoint"

with run(env=defaultenv, port=freeport()) as postgrest:
response = postgrest.admin.get("/metrics")
assert response.status_code == 200
assert "pgrst_db_pool_max" in response.text
assert "pgrst_db_pool_waiting" in response.text
assert "pgrst_db_pool_available" in response.text
assert "pgrst_db_pool_timeouts_total" in response.text
21 changes: 11 additions & 10 deletions test/spec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import SpecHelper

import qualified PostgREST.AppState as AppState
import qualified PostgREST.Logger as Logger
import qualified PostgREST.Metrics as Metrics

import qualified Feature.Auth.AsymmetricJwtSpec
import qualified Feature.Auth.AudienceJwtSecretSpec
Expand Down Expand Up @@ -70,7 +71,6 @@ import qualified Feature.RpcPreRequestGucsSpec

main :: IO ()
main = do
let observer = const $ pure ()
pool <- P.acquire $ P.settings
[ P.size 3
, P.acquisitionTimeout 10
Expand All @@ -85,22 +85,22 @@ main = do
baseSchemaCache <- loadSCache pool testCfg
sockets <- AppState.initSockets testCfg
loggerState <- Logger.init
metricsState <- Metrics.init (configDbPoolSize testCfg)

let
-- For tests that run with the same refSchemaCache
app config = do
appState <- AppState.initWithPool sockets pool config loggerState observer
initApp sCache config = do
appState <- AppState.initWithPool sockets pool config loggerState metricsState (const $ pure ())
AppState.putPgVersion appState actualPgVersion
AppState.putSchemaCache appState (Just baseSchemaCache)
AppState.putSchemaCache appState (Just sCache)
return ((), postgrest (configLogLevel config) appState (pure ()))

-- For tests that run with a different SchemaCache(depends on configSchemas)
-- For tests that run with the same schema cache
app = initApp baseSchemaCache

-- For tests that run with a different SchemaCache (depends on configSchemas)
appDbs config = do
customSchemaCache <- loadSCache pool config
appState <- AppState.initWithPool sockets pool config loggerState observer
AppState.putPgVersion appState actualPgVersion
AppState.putSchemaCache appState (Just customSchemaCache)
return ((), postgrest (configLogLevel config) appState (pure ()))
initApp customSchemaCache config

let withApp = app testCfg
maxRowsApp = app testMaxRowsCfg
Expand Down Expand Up @@ -280,3 +280,4 @@ main = do
where
loadSCache pool conf =
either (panic.show) id <$> P.use pool (HT.transaction HT.ReadCommitted HT.Read $ querySchemaCache conf)

0 comments on commit 66885b8

Please sign in to comment.