diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 87c94fa..136796d 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -27,7 +27,7 @@ jobs: timeout-minutes: 60 container: - image: buildpack-deps:bionic + image: buildpack-deps:jammy services: postgres: image: postgres:14 diff --git a/postgresql-libpq.cabal b/postgresql-libpq.cabal index 29544dd..64eb163 100644 --- a/postgresql-libpq.cabal +++ b/postgresql-libpq.cabal @@ -39,6 +39,12 @@ flag use-pkg-config default: False manual: True +-- If true, use libpq bindings for Pipeline Mode, otherwise throw +-- errors on usage. It requires libpq >=14 +flag enable-pipeline-mode + default: True + manual: False + library default-language: Haskell2010 hs-source-dirs: src @@ -76,6 +82,9 @@ library if os(windows) build-depends: Win32 >=2.2.0.2 && <2.14 + if flag(enable-pipeline-mode) + cpp-options: -DHASKELL_LIBPQ_PIPELINE_MODE + if flag(use-pkg-config) pkgconfig-depends: libpq >=10.22 diff --git a/src/Database/PostgreSQL/LibPQ.hs b/src/Database/PostgreSQL/LibPQ.hs index aeb3e89..596d685 100644 --- a/src/Database/PostgreSQL/LibPQ.hs +++ b/src/Database/PostgreSQL/LibPQ.hs @@ -171,6 +171,16 @@ module Database.PostgreSQL.LibPQ , FlushStatus(..) , flush + -- * Pipeline Mode + -- $pipeline + , PipelineStatus(..) + , pipelineStatus + , enterPipelineMode + , exitPipelineMode + , pipelineSync + , sendFlushRequest + , isEnabledPipeline + -- * Cancelling Queries in Progress -- $cancel , Cancel @@ -1636,6 +1646,92 @@ flush connection = 1 -> return FlushWriting _ -> return FlushFailed +-- $pipelinemode +-- These functions control behaviour in pipeline mode. +-- +-- Pipeline mode allows applications to send a query +-- without having to read the result of the previously +-- sent query. Taking advantage of the pipeline mode, +-- a client will wait less for the server, since multiple +-- queries/results can be sent/received in +-- a single network transaction. + +ifEnabledPipeline :: IO a -> IO a +ifEnabledPipeline m = + if isEnabledPipeline + then m + else error "pipeline mode is disabled" +{-# INLINE ifEnabledPipeline #-} + +-- | Returns the current pipeline mode status of the connection. +pipelineStatus :: Connection + -> IO PipelineStatus +pipelineStatus connection = ifEnabledPipeline $ do + stat <- withConn connection c_PQpipelineStatus + maybe + (fail $ "Unknown pipeline status " ++ show stat) + return + (fromCInt stat) + +-- | Causes a connection to enter pipeline mode +-- if it is currently idle or already in pipeline mode. +-- +-- Returns 'True' for success. Returns 'False' and has no effect +-- if the connection is not currently idle, i.e., +-- it has a result ready, or it is waiting for more +-- input from the server, etc. +-- This function does not actually send anything to the server, +-- it just changes the libpq connection state. +enterPipelineMode :: Connection + -> IO Bool +enterPipelineMode connection = ifEnabledPipeline $ + enumFromConn connection c_PQenterPipelineMode + +-- | Causes a connection to exit pipeline mode +-- if it is currently in pipeline mode with +-- an empty queue and no pending results. +-- +-- Returns 'True' for success. +-- Returns 'True' and takes no action if not in pipeline mode. +-- If the current statement isn't finished processing, +-- or 'getResult' has not been called to collect results +-- from all previously sent query, returns True +-- (in which case, use 'errorMessage' to get more +-- information about the failure). +exitPipelineMode :: Connection + -> IO Bool +exitPipelineMode connection = ifEnabledPipeline $ + enumFromConn connection c_PQexitPipelineMode + +-- | Marks a synchronization point in a pipeline by sending +-- a sync message and flushing the send buffer. +-- This serves as the delimiter of an implicit transaction +-- and an error recovery point. +-- +-- Returns 'True' for success. +-- Returns 'False' if the connection is not in pipeline mode or +-- sending a sync message failed. +pipelineSync :: Connection + -> IO Bool +pipelineSync connection = ifEnabledPipeline $ + enumFromConn connection c_PQpipelineSync + +-- | Sends a request for the server to flush its output buffer. +-- +-- Returns 'True' for success. Returns 'False' on any failure. +-- +-- The server flushes its output buffer automatically +-- as a result of 'pipelineSync' being called, +-- or on any request when not in pipeline mode; +-- this function is useful to cause the server to flush +-- its output buffer in pipeline mode without establishing +-- a synchronization point. +-- Note that the request is not itself flushed +-- to the server automatically; use 'flush' if necessary. +sendFlushRequest :: Connection + -> IO Bool +sendFlushRequest connection = ifEnabledPipeline $ + enumFromConn connection c_PQsendFlushRequest -- $cancel -- A client application can request cancellation of a command that is @@ -1724,7 +1820,6 @@ notifies connection = c_PQfreemem mn return result - -- $control -- These functions control miscellaneous details of libpq's behavior. diff --git a/src/Database/PostgreSQL/LibPQ/Compat.hs b/src/Database/PostgreSQL/LibPQ/Compat.hs index 4bdd48a..e313b8e 100644 --- a/src/Database/PostgreSQL/LibPQ/Compat.hs +++ b/src/Database/PostgreSQL/LibPQ/Compat.hs @@ -25,3 +25,13 @@ mkPS fp off len = BS (plusForeignPtr fp off) len mkPS fp off len = PS fp off len #endif {-# INLINE mkPS #-} + +-- | Shows that compiled code was linked with LibPQ supporting Pipeline Mode. +-- LibPQ version >= 14. +isEnabledPipeline :: Bool +#if HASKELL_LIBPQ_PIPELINE_MODE +isEnabledPipeline = True +#else +isEnabledPipeline = False +#endif +{-# INLINE isEnabledPipeline #-} diff --git a/src/Database/PostgreSQL/LibPQ/Enums.hsc b/src/Database/PostgreSQL/LibPQ/Enums.hsc index 2c995dd..d7ceeeb 100644 --- a/src/Database/PostgreSQL/LibPQ/Enums.hsc +++ b/src/Database/PostgreSQL/LibPQ/Enums.hsc @@ -37,23 +37,38 @@ data ExecStatus | NonfatalError -- ^ A nonfatal error (a notice or -- warning) occurred. | FatalError -- ^ A fatal error occurred. - | SingleTuple -- ^ The PGresult contains a single result tuple + | SingleTuple -- ^ The 'Result' contains a single result tuple -- from the current command. This status occurs -- only when single-row mode has been selected -- for the query. + | PipelineSync -- ^ The 'Result' represents a synchronization + -- point in pipeline mode, requested by + -- 'pipelineSync'. This status occurs only + -- when pipeline mode has been selected. + | PipelineAbort -- ^ The 'Result' represents a pipeline that + -- has received an error from the server. + -- 'getResult' must be called repeatedly, + -- and each time it will return this status + -- code until the end of the current pipeline, + -- at which point it will return 'PipelineSync' + -- and normal processing can resume. deriving (Eq, Show) instance FromCInt ExecStatus where - fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery - fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk - fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk - fromCInt (#const PGRES_COPY_OUT) = Just CopyOut - fromCInt (#const PGRES_COPY_IN) = Just CopyIn - fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth - fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse - fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError - fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError - fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple + fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery + fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk + fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk + fromCInt (#const PGRES_COPY_OUT) = Just CopyOut + fromCInt (#const PGRES_COPY_IN) = Just CopyIn + fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth + fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse + fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError + fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError + fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple + #if HASKELL_LIBPQ_PIPELINE_MODE + fromCInt (#const PGRES_PIPELINE_SYNC) = Just PipelineSync + fromCInt (#const PGRES_PIPELINE_ABORTED) = Just PipelineAbort + #endif fromCInt _ = Nothing instance ToCInt ExecStatus where @@ -67,6 +82,16 @@ instance ToCInt ExecStatus where toCInt NonfatalError = (#const PGRES_NONFATAL_ERROR) toCInt FatalError = (#const PGRES_FATAL_ERROR) toCInt SingleTuple = (#const PGRES_SINGLE_TUPLE) + #if HASKELL_LIBPQ_PIPELINE_MODE + toCInt PipelineSync = (#const PGRES_PIPELINE_SYNC) + #else + toCInt PipelineSync = error "pipeline mode is disabled" + #endif + #if HASKELL_LIBPQ_PIPELINE_MODE + toCInt PipelineAbort = (#const PGRES_PIPELINE_ABORTED) + #else + toCInt PipelineSync = error "pipeline mode is disabled" + #endif data FieldCode @@ -230,7 +255,7 @@ instance FromCInt ConnStatus where fromCInt (#const CONNECTION_SSL_STARTUP) = return ConnectionSSLStartup -- fromCInt (#const CONNECTION_NEEDED) = return ConnectionNeeded fromCInt _ = Nothing - + data TransactionStatus = TransIdle -- ^ currently idle @@ -263,6 +288,23 @@ instance FromCInt Format where fromCInt 1 = Just Binary fromCInt _ = Nothing +data PipelineStatus + = PipelineOn -- ^ The 'Connection' is in pipeline mode. + | PipelineOff -- ^ The 'Connection' is /not/ in pipeline mode. + | PipelineAborted -- ^ The 'Connection' is in pipeline mode and an error + -- occurred while processing the current pipeline. The + -- aborted flag is cleared when 'getResult' returns a + -- result with status 'PipelineSync'. + deriving (Eq, Show) + +instance FromCInt PipelineStatus where + #if HASKELL_LIBPQ_PIPELINE_MODE + fromCInt (#const PQ_PIPELINE_ON) = return PipelineOn + fromCInt (#const PQ_PIPELINE_OFF) = return PipelineOff + fromCInt (#const PQ_PIPELINE_ABORTED) = return PipelineAborted + #endif + fromCInt _ = Nothing + ------------------------------------------------------------------------------- -- System.IO enumerations ------------------------------------------------------------------------------- diff --git a/src/Database/PostgreSQL/LibPQ/FFI.hs b/src/Database/PostgreSQL/LibPQ/FFI.hs index 0693a59..37224f8 100644 --- a/src/Database/PostgreSQL/LibPQ/FFI.hs +++ b/src/Database/PostgreSQL/LibPQ/FFI.hs @@ -118,7 +118,7 @@ foreign import capi "hs-libpq.h PQputCopyData" foreign import capi "hs-libpq.h PQputCopyEnd" c_PQputCopyEnd :: Ptr PGconn -> CString -> IO CInt - + -- TODO: GHC #22043 foreign import ccall "hs-libpq.h PQgetCopyData" c_PQgetCopyData :: Ptr PGconn -> Ptr (Ptr Word8) -> CInt -> IO CInt @@ -302,6 +302,26 @@ foreign import capi "hs-libpq.h &PQfreemem" foreign import capi "hs-libpq.h PQfreemem" c_PQfreemem :: Ptr a -> IO () +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQpipelineStatus" + c_PQpipelineStatus :: Ptr PGconn -> IO CInt + +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQenterPipelineMode" + c_PQenterPipelineMode :: Ptr PGconn -> IO CInt + +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQexitPipelineMode" + c_PQexitPipelineMode :: Ptr PGconn -> IO CInt + +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQpipelineSync" + c_PQpipelineSync :: Ptr PGconn -> IO CInt + +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQsendFlushRequest" + c_PQsendFlushRequest :: Ptr PGconn -> IO CInt + ------------------------------------------------------------------------------- -- FFI imports: noticebuffers ------------------------------------------------------------------------------- diff --git a/test/Smoke.hs b/test/Smoke.hs index 1802d4c..457c281 100644 --- a/test/Smoke.hs +++ b/test/Smoke.hs @@ -1,6 +1,6 @@ module Main (main) where -import Control.Monad (unless) +import Control.Monad (unless, when) import Database.PostgreSQL.LibPQ import Data.Foldable (toList) import System.Environment (getEnvironment) @@ -11,7 +11,9 @@ import qualified Data.ByteString.Char8 as BS8 main :: IO () main = do libpqVersion >>= print - withConnstring smoke + withConnstring $ \connstring -> do + smoke connstring + when isEnabledPipeline $ testPipeline connstring withConnstring :: (BS8.ByteString -> IO ()) -> IO () withConnstring kont = do @@ -53,3 +55,38 @@ smoke connstring = do unless (s == ConnectionOk) exitFailure finish conn + +testPipeline :: BS8.ByteString -> IO () +testPipeline connstring = do + conn <- connectdb connstring + + pipelineStatus conn >>= print + + setnonblocking conn True `shouldReturn` True + enterPipelineMode conn `shouldReturn` True + pipelineStatus conn `shouldReturn` PipelineOn + sendQueryParams conn (BS8.pack "select 1") [] Text `shouldReturn` True + sendQueryParams conn (BS8.pack "select 2") [] Text `shouldReturn` True + pipelineSync conn `shouldReturn` True + + Just r1 <- getResult conn + resultStatus r1 `shouldReturn` TuplesOk + getvalue r1 0 0 `shouldReturn` Just (BS8.pack "1") + Nothing <- getResult conn + + Just r2 <- getResult conn + getvalue r2 0 0 `shouldReturn` Just (BS8.pack "2") + Nothing <- getResult conn + + Just r3 <- getResult conn + resultStatus r3 `shouldReturn` PipelineSync + + finish conn + where + shouldBe r value = + unless (r == value) $ do + print $ "expected " <> show value <> ", got " <> show r + exitFailure + shouldReturn action value = do + r <- action + r `shouldBe` value