Skip to content

Commit

Permalink
Fix dropping schema cache reload notifications
Browse files Browse the repository at this point in the history
* test: bad schema reload
* refactor: DRY using the "extra" lib
* refactor: move worker funtions inside AppState
* Also rename Workers.hs to Admin.hs
  • Loading branch information
steve-chavez authored Jun 2, 2023
1 parent 14be3fb commit a852b76
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 409 deletions.
10 changes: 4 additions & 6 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,14 @@ It builds the OpenAPI response using the schema cache.

This module provides functions to deal with JWT authorization.

### Workers.hs

This spawns threads which are used to execute concurrent jobs.

Jobs include connection recovery, a listener for the PostgreSQL LISTEN command, and an admin server.

### SchemaCache.hs

This queries the PostgreSQL system catalogs and caches the metadata into a SchemaCache type,

### AppState.hs

The state of the App which is kept across requests.

This spawns threads which are used to execute concurrent jobs.

Jobs include connection recover and a listener for the PostgreSQL LISTEN command.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ This project adheres to [Semantic Versioning](http://semver.org/).
+ New config option `db-pre-config`(empty by default)
+ Allows using the in-database configuration without SUPERUSER

### Fixed

- #2791, Fix dropping schema cache reload notifications - @steve-chavez

## [11.0.1] - 2023-04-27

### Fixed
Expand Down
5 changes: 3 additions & 2 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ library
default-extensions: OverloadedStrings
NoImplicitPrelude
hs-source-dirs: src
exposed-modules: PostgREST.App
exposed-modules: PostgREST.Admin
PostgREST.App
PostgREST.AppState
PostgREST.Auth
PostgREST.CLI
Expand Down Expand Up @@ -70,7 +71,6 @@ library
PostgREST.Response.OpenAPI
PostgREST.Response.GucHeader
PostgREST.Version
PostgREST.Workers
other-modules: Paths_postgrest
build-depends: base >= 4.9 && < 4.17
, HTTP >= 4000.3.7 && < 4000.5
Expand All @@ -86,6 +86,7 @@ library
, contravariant-extras >= 0.3.3 && < 0.4
, cookie >= 0.4.2 && < 0.5
, either >= 4.4.1 && < 5.1
, extra >= 1.7.0 && < 2.0
, fuzzyset >= 0.2.3
, gitrev >= 1.2 && < 1.4
, hasql >= 1.6.1.1 && < 1.7
Expand Down
87 changes: 87 additions & 0 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}

module PostgREST.Admin
( runAdmin
) where

import qualified Data.Text as T
import qualified Hasql.Session as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp

import Control.Monad.Extra (whenJust)

import Network.Socket
import Network.Socket.ByteString

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))

import qualified PostgREST.AppState as AppState

import Protolude

runAdmin :: AppConfig -> AppState -> Warp.Settings -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings =
whenJust configAdminServerPort $ \adminPort -> do
AppState.logWithZTime appState $ "Admin server listening on port " <> show adminPort
void . forkIO $ Warp.runSettings (settings & Warp.setPort adminPort) adminApp
where
adminApp = admin appState conf

-- | PostgREST admin application
admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
isMainAppReachable <- any isRight <$> reachMainApp appConfig
isSchemaCacheLoaded <- isJust <$> AppState.getSchemaCache appState
isConnectionUp <-
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState (SQL.sql "SELECT 1")

case Wai.pathInfo req of
["ready"] ->
respond $ Wai.responseLBS (if isMainAppReachable && isConnectionUp && isSchemaCacheLoaded then HTTP.status200 else HTTP.status503) [] mempty
["live"] ->
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status503) [] mempty
_ ->
respond $ Wai.responseLBS HTTP.status404 [] mempty

-- Try to connect to the main app socket
-- Note that it doesn't even send a valid HTTP request, we just want to check that the main app is accepting connections
-- The code for resolving the "*4", "!4", "*6", "!6", "*" special values is taken from
-- https://hackage.haskell.org/package/streaming-commons-0.2.2.4/docs/src/Data.Streaming.Network.html#bindPortGenEx
reachMainApp :: AppConfig -> IO [Either IOException ()]
reachMainApp AppConfig{..} =
case configServerUnixSocket of
Just path -> do
sock <- socket AF_UNIX Stream 0
(:[]) <$> try (do
connect sock $ SockAddrUnix path
withSocketsDo $ bracket (pure sock) close sendEmpty)
Nothing -> do
let
host | configServerHost `elem` ["*4", "!4", "*6", "!6", "*"] = Nothing
| otherwise = Just configServerHost
filterAddrs xs =
case configServerHost of
"*4" -> ipv4Addrs xs ++ ipv6Addrs xs
"!4" -> ipv4Addrs xs
"*6" -> ipv6Addrs xs ++ ipv4Addrs xs
"!6" -> ipv6Addrs xs
_ -> xs
ipv4Addrs = filter ((/=) AF_INET6 . addrFamily)
ipv6Addrs = filter ((==) AF_INET6 . addrFamily)

addrs <- getAddrInfo (Just $ defaultHints { addrSocketType = Stream }) (T.unpack <$> host) (Just . show $ configServerPort)
tryAddr `traverse` filterAddrs addrs
where
sendEmpty sock = void $ send sock mempty
tryAddr :: AddrInfo -> IO (Either IOException ())
tryAddr addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
try $ do
connect sock $ addrAddress addr
withSocketsDo $ bracket (pure sock) close sendEmpty
10 changes: 5 additions & 5 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp

import qualified PostgREST.Admin as Admin
import qualified PostgREST.ApiRequest as ApiRequest
import qualified PostgREST.ApiRequest.Types as ApiRequestTypes
import qualified PostgREST.AppState as AppState
Expand All @@ -43,7 +44,6 @@ import qualified PostgREST.Logger as Logger
import qualified PostgREST.Plan as Plan
import qualified PostgREST.Query as Query
import qualified PostgREST.Response as Response
import qualified PostgREST.Workers as Workers

import PostgREST.ApiRequest (Action (..), ApiRequest (..),
Mutation (..), Target (..))
Expand All @@ -68,14 +68,14 @@ type SocketRunner = Warp.Settings -> Wai.Application -> FileMode -> FilePath ->
run :: SignalHandlerInstaller -> Maybe SocketRunner -> AppState -> IO ()
run installHandlers maybeRunWithSocket appState = do
conf@AppConfig{..} <- AppState.getConfig appState
Workers.connectionWorker appState -- Loads the initial SchemaCache
AppState.connectionWorker appState -- Loads the initial SchemaCache
installHandlers appState
-- reload schema cache + config on NOTIFY
Workers.runListener conf appState
AppState.runListener conf appState

Workers.runAdmin conf appState $ serverSettings conf
Admin.runAdmin conf appState $ serverSettings conf

let app = postgrest conf appState (Workers.connectionWorker appState)
let app = postgrest conf appState (AppState.connectionWorker appState)

case configServerUnixSocket of
Just socket ->
Expand Down
Loading

0 comments on commit a852b76

Please sign in to comment.