From 7efbfec692911de7ea218e1484d989c2dd004cdf Mon Sep 17 00:00:00 2001 From: Robert Vollmert Date: Wed, 1 Mar 2023 11:55:41 +0100 Subject: [PATCH 1/5] Add pipeline mode API https://www.postgresql.org/docs/current/libpq-pipeline-mode.html --- src/Database/PostgreSQL/LibPQ.hs | 39 +++++++++++++++++++ src/Database/PostgreSQL/LibPQ/Enums.hsc | 52 +++++++++++++++++++------ src/Database/PostgreSQL/LibPQ/FFI.hs | 15 +++++++ 3 files changed, 95 insertions(+), 11 deletions(-) diff --git a/src/Database/PostgreSQL/LibPQ.hs b/src/Database/PostgreSQL/LibPQ.hs index aeb3e89..517a183 100644 --- a/src/Database/PostgreSQL/LibPQ.hs +++ b/src/Database/PostgreSQL/LibPQ.hs @@ -171,6 +171,15 @@ module Database.PostgreSQL.LibPQ , FlushStatus(..) , flush + -- * Pipeline Mode + -- $pipeline + , PipelineStatus(..) + , pipelineStatus + , enterPipelineMode + , exitPipelineMode + , pipelineSync + , sendFlushRequest + -- * Cancelling Queries in Progress -- $cancel , Cancel @@ -1637,6 +1646,36 @@ flush connection = _ -> return FlushFailed +pipelineStatus :: Connection + -> IO PipelineStatus +pipelineStatus connection = do + stat <- withConn connection c_PQpipelineStatus + maybe + (fail $ "Unknown pipeline status " ++ show stat) + return + (fromCInt stat) + +enterPipelineMode :: Connection + -> IO Bool +enterPipelineMode connection = + enumFromConn connection c_PQenterPipelineMode + +exitPipelineMode :: Connection + -> IO Bool +exitPipelineMode connection = + enumFromConn connection c_PQexitPipelineMode + +pipelineSync :: Connection + -> IO Bool +pipelineSync connection = + enumFromConn connection c_PQpipelineSync + +sendFlushRequest :: Connection + -> IO Bool +sendFlushRequest connection = + enumFromConn connection c_PQsendFlushRequest + + -- $cancel -- A client application can request cancellation of a command that is -- still being processed by the server, using the functions described diff --git a/src/Database/PostgreSQL/LibPQ/Enums.hsc b/src/Database/PostgreSQL/LibPQ/Enums.hsc index 2c995dd..5d13e43 100644 --- a/src/Database/PostgreSQL/LibPQ/Enums.hsc +++ b/src/Database/PostgreSQL/LibPQ/Enums.hsc @@ -37,23 +37,36 @@ data ExecStatus | NonfatalError -- ^ A nonfatal error (a notice or -- warning) occurred. | FatalError -- ^ A fatal error occurred. - | SingleTuple -- ^ The PGresult contains a single result tuple + | SingleTuple -- ^ The 'Result' contains a single result tuple -- from the current command. This status occurs -- only when single-row mode has been selected -- for the query. + | PipelineSync -- ^ The 'Result' represents a synchronization + -- point in pipeline mode, requested by + -- 'pipelineSync'. This status occurs only + -- when pipeline mode has been selected. + | PipelineAbort -- ^ The 'Result' represents a pipeline that + -- has received an error from the server. + -- 'getResult' must be called repeatedly, + -- and each time it will return this status + -- code until the end of the current pipeline, + -- at which point it will return 'PipelineSync' + -- and normal processing can resume. deriving (Eq, Show) instance FromCInt ExecStatus where - fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery - fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk - fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk - fromCInt (#const PGRES_COPY_OUT) = Just CopyOut - fromCInt (#const PGRES_COPY_IN) = Just CopyIn - fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth - fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse - fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError - fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError - fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple + fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery + fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk + fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk + fromCInt (#const PGRES_COPY_OUT) = Just CopyOut + fromCInt (#const PGRES_COPY_IN) = Just CopyIn + fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth + fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse + fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError + fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError + fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple + fromCInt (#const PGRES_PIPELINE_SYNC) = Just PipelineSync + fromCInt (#const PGRES_PIPELINE_ABORTED) = Just PipelineAbort fromCInt _ = Nothing instance ToCInt ExecStatus where @@ -67,6 +80,8 @@ instance ToCInt ExecStatus where toCInt NonfatalError = (#const PGRES_NONFATAL_ERROR) toCInt FatalError = (#const PGRES_FATAL_ERROR) toCInt SingleTuple = (#const PGRES_SINGLE_TUPLE) + toCInt PipelineSync = (#const PGRES_PIPELINE_SYNC) + toCInt PipelineAbort = (#const PGRES_PIPELINE_ABORTED) data FieldCode @@ -263,6 +278,21 @@ instance FromCInt Format where fromCInt 1 = Just Binary fromCInt _ = Nothing +data PipelineStatus + = PipelineOn -- ^ The 'Connection' is in pipeline mode. + | PipelineOff -- ^ The 'Connection' is /not/ in pipeline mode. + | PipelineAborted -- ^ The 'Connection' is in pipeline mode and an error + -- occurred while processing the current pipeline. The + -- aborted flag is cleared when 'getResult' returns a + -- result with status 'PipelineSync'. + deriving (Eq, Show) + +instance FromCInt PipelineStatus where + fromCInt (#const PQ_PIPELINE_ON) = return PipelineOn + fromCInt (#const PQ_PIPELINE_OFF) = return PipelineOff + fromCInt (#const PQ_PIPELINE_ABORTED) = return PipelineAborted + fromCInt _ = Nothing + ------------------------------------------------------------------------------- -- System.IO enumerations ------------------------------------------------------------------------------- diff --git a/src/Database/PostgreSQL/LibPQ/FFI.hs b/src/Database/PostgreSQL/LibPQ/FFI.hs index 0693a59..5bc11ac 100644 --- a/src/Database/PostgreSQL/LibPQ/FFI.hs +++ b/src/Database/PostgreSQL/LibPQ/FFI.hs @@ -302,6 +302,21 @@ foreign import capi "hs-libpq.h &PQfreemem" foreign import capi "hs-libpq.h PQfreemem" c_PQfreemem :: Ptr a -> IO () +foreign import capi "hs-libpq.h PQpipelineStatus" + c_PQpipelineStatus :: Ptr PGconn -> IO CInt + +foreign import capi "hs-libpq.h PQenterPipelineMode" + c_PQenterPipelineMode :: Ptr PGconn -> IO CInt + +foreign import capi "hs-libpq.h PQexitPipelineMode" + c_PQexitPipelineMode :: Ptr PGconn -> IO CInt + +foreign import capi "hs-libpq.h PQpipelineSync" + c_PQpipelineSync :: Ptr PGconn -> IO CInt + +foreign import capi "hs-libpq.h PQsendFlushRequest" + c_PQsendFlushRequest :: Ptr PGconn -> IO CInt + ------------------------------------------------------------------------------- -- FFI imports: noticebuffers ------------------------------------------------------------------------------- From 1d26ce42088c107fe9bc8ca2086ad191b4a042ae Mon Sep 17 00:00:00 2001 From: Robert Vollmert Date: Wed, 1 Mar 2023 12:30:28 +0100 Subject: [PATCH 2/5] Test pipelineStatus in smoke test --- test/Smoke.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Smoke.hs b/test/Smoke.hs index 1802d4c..21adffa 100644 --- a/test/Smoke.hs +++ b/test/Smoke.hs @@ -48,6 +48,7 @@ smoke connstring = do transactionStatus conn >>= print protocolVersion conn >>= print serverVersion conn >>= print + pipelineStatus conn >>= print s <- status conn unless (s == ConnectionOk) exitFailure From f25f1c0f6f961455689fe8d0cd79804d2b86db1c Mon Sep 17 00:00:00 2001 From: Robert Vollmert Date: Wed, 1 Mar 2023 13:11:17 +0100 Subject: [PATCH 3/5] Test pipeline mode API --- test/Smoke.hs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/test/Smoke.hs b/test/Smoke.hs index 21adffa..88e036f 100644 --- a/test/Smoke.hs +++ b/test/Smoke.hs @@ -11,7 +11,9 @@ import qualified Data.ByteString.Char8 as BS8 main :: IO () main = do libpqVersion >>= print - withConnstring smoke + withConnstring $ \connstring -> do + smoke connstring + testPipeline connstring withConnstring :: (BS8.ByteString -> IO ()) -> IO () withConnstring kont = do @@ -54,3 +56,36 @@ smoke connstring = do unless (s == ConnectionOk) exitFailure finish conn + +testPipeline :: BS8.ByteString -> IO () +testPipeline connstring = do + conn <- connectdb connstring + + setnonblocking conn True `shouldReturn` True + enterPipelineMode conn `shouldReturn` True + pipelineStatus conn `shouldReturn` PipelineOn + sendQueryParams conn (BS8.pack "select 1") [] Text `shouldReturn` True + sendQueryParams conn (BS8.pack "select 2") [] Text `shouldReturn` True + pipelineSync conn `shouldReturn` True + + Just r1 <- getResult conn + resultStatus r1 `shouldReturn` TuplesOk + getvalue r1 0 0 `shouldReturn` Just (BS8.pack "1") + Nothing <- getResult conn + + Just r2 <- getResult conn + getvalue r2 0 0 `shouldReturn` Just (BS8.pack "2") + Nothing <- getResult conn + + Just r3 <- getResult conn + resultStatus r3 `shouldReturn` PipelineSync + + finish conn + where + shouldBe r value = + unless (r == value) $ do + print $ "expected " <> show value <> ", got " <> show r + exitFailure + shouldReturn action value = do + r <- action + r `shouldBe` value From b4cc8421d00d18babd351cfc56acce13dda53f65 Mon Sep 17 00:00:00 2001 From: Robert Vollmert Date: Fri, 17 Mar 2023 18:21:48 +0100 Subject: [PATCH 4/5] ci: regenerate with --distribution jammy --- .github/workflows/haskell-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 87c94fa..136796d 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -27,7 +27,7 @@ jobs: timeout-minutes: 60 container: - image: buildpack-deps:bionic + image: buildpack-deps:jammy services: postgres: image: postgres:14 From 30b69e55855cabf3356e186c30a1756b4b0c6c95 Mon Sep 17 00:00:00 2001 From: "Serge S. Gulin" Date: Mon, 6 Nov 2023 23:25:50 +0300 Subject: [PATCH 5/5] Add Pipeline Mode cabal flag enabled by default --- postgresql-libpq.cabal | 9 ++++ src/Database/PostgreSQL/LibPQ.hs | 70 ++++++++++++++++++++++--- src/Database/PostgreSQL/LibPQ/Compat.hs | 10 ++++ src/Database/PostgreSQL/LibPQ/Enums.hsc | 14 ++++- src/Database/PostgreSQL/LibPQ/FFI.hs | 27 ++++++---- test/Smoke.hs | 7 +-- 6 files changed, 115 insertions(+), 22 deletions(-) diff --git a/postgresql-libpq.cabal b/postgresql-libpq.cabal index 29544dd..64eb163 100644 --- a/postgresql-libpq.cabal +++ b/postgresql-libpq.cabal @@ -39,6 +39,12 @@ flag use-pkg-config default: False manual: True +-- If true, use libpq bindings for Pipeline Mode, otherwise throw +-- errors on usage. It requires libpq >=14 +flag enable-pipeline-mode + default: True + manual: False + library default-language: Haskell2010 hs-source-dirs: src @@ -76,6 +82,9 @@ library if os(windows) build-depends: Win32 >=2.2.0.2 && <2.14 + if flag(enable-pipeline-mode) + cpp-options: -DHASKELL_LIBPQ_PIPELINE_MODE + if flag(use-pkg-config) pkgconfig-depends: libpq >=10.22 diff --git a/src/Database/PostgreSQL/LibPQ.hs b/src/Database/PostgreSQL/LibPQ.hs index 517a183..596d685 100644 --- a/src/Database/PostgreSQL/LibPQ.hs +++ b/src/Database/PostgreSQL/LibPQ.hs @@ -179,6 +179,7 @@ module Database.PostgreSQL.LibPQ , exitPipelineMode , pipelineSync , sendFlushRequest + , isEnabledPipeline -- * Cancelling Queries in Progress -- $cancel @@ -1645,37 +1646,93 @@ flush connection = 1 -> return FlushWriting _ -> return FlushFailed +-- $pipelinemode +-- These functions control behaviour in pipeline mode. +-- +-- Pipeline mode allows applications to send a query +-- without having to read the result of the previously +-- sent query. Taking advantage of the pipeline mode, +-- a client will wait less for the server, since multiple +-- queries/results can be sent/received in +-- a single network transaction. + +ifEnabledPipeline :: IO a -> IO a +ifEnabledPipeline m = + if isEnabledPipeline + then m + else error "pipeline mode is disabled" +{-# INLINE ifEnabledPipeline #-} +-- | Returns the current pipeline mode status of the connection. pipelineStatus :: Connection -> IO PipelineStatus -pipelineStatus connection = do +pipelineStatus connection = ifEnabledPipeline $ do stat <- withConn connection c_PQpipelineStatus maybe (fail $ "Unknown pipeline status " ++ show stat) return (fromCInt stat) +-- | Causes a connection to enter pipeline mode +-- if it is currently idle or already in pipeline mode. +-- +-- Returns 'True' for success. Returns 'False' and has no effect +-- if the connection is not currently idle, i.e., +-- it has a result ready, or it is waiting for more +-- input from the server, etc. +-- This function does not actually send anything to the server, +-- it just changes the libpq connection state. enterPipelineMode :: Connection -> IO Bool -enterPipelineMode connection = +enterPipelineMode connection = ifEnabledPipeline $ enumFromConn connection c_PQenterPipelineMode +-- | Causes a connection to exit pipeline mode +-- if it is currently in pipeline mode with +-- an empty queue and no pending results. +-- +-- Returns 'True' for success. +-- Returns 'True' and takes no action if not in pipeline mode. +-- If the current statement isn't finished processing, +-- or 'getResult' has not been called to collect results +-- from all previously sent query, returns True +-- (in which case, use 'errorMessage' to get more +-- information about the failure). exitPipelineMode :: Connection -> IO Bool -exitPipelineMode connection = +exitPipelineMode connection = ifEnabledPipeline $ enumFromConn connection c_PQexitPipelineMode +-- | Marks a synchronization point in a pipeline by sending +-- a sync message and flushing the send buffer. +-- This serves as the delimiter of an implicit transaction +-- and an error recovery point. +-- +-- Returns 'True' for success. +-- Returns 'False' if the connection is not in pipeline mode or +-- sending a sync message failed. pipelineSync :: Connection -> IO Bool -pipelineSync connection = +pipelineSync connection = ifEnabledPipeline $ enumFromConn connection c_PQpipelineSync +-- | Sends a request for the server to flush its output buffer. +-- +-- Returns 'True' for success. Returns 'False' on any failure. +-- +-- The server flushes its output buffer automatically +-- as a result of 'pipelineSync' being called, +-- or on any request when not in pipeline mode; +-- this function is useful to cause the server to flush +-- its output buffer in pipeline mode without establishing +-- a synchronization point. +-- Note that the request is not itself flushed +-- to the server automatically; use 'flush' if necessary. sendFlushRequest :: Connection -> IO Bool -sendFlushRequest connection = +sendFlushRequest connection = ifEnabledPipeline $ enumFromConn connection c_PQsendFlushRequest - -- $cancel -- A client application can request cancellation of a command that is -- still being processed by the server, using the functions described @@ -1763,7 +1820,6 @@ notifies connection = c_PQfreemem mn return result - -- $control -- These functions control miscellaneous details of libpq's behavior. diff --git a/src/Database/PostgreSQL/LibPQ/Compat.hs b/src/Database/PostgreSQL/LibPQ/Compat.hs index 4bdd48a..e313b8e 100644 --- a/src/Database/PostgreSQL/LibPQ/Compat.hs +++ b/src/Database/PostgreSQL/LibPQ/Compat.hs @@ -25,3 +25,13 @@ mkPS fp off len = BS (plusForeignPtr fp off) len mkPS fp off len = PS fp off len #endif {-# INLINE mkPS #-} + +-- | Shows that compiled code was linked with LibPQ supporting Pipeline Mode. +-- LibPQ version >= 14. +isEnabledPipeline :: Bool +#if HASKELL_LIBPQ_PIPELINE_MODE +isEnabledPipeline = True +#else +isEnabledPipeline = False +#endif +{-# INLINE isEnabledPipeline #-} diff --git a/src/Database/PostgreSQL/LibPQ/Enums.hsc b/src/Database/PostgreSQL/LibPQ/Enums.hsc index 5d13e43..d7ceeeb 100644 --- a/src/Database/PostgreSQL/LibPQ/Enums.hsc +++ b/src/Database/PostgreSQL/LibPQ/Enums.hsc @@ -65,8 +65,10 @@ instance FromCInt ExecStatus where fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple + #if HASKELL_LIBPQ_PIPELINE_MODE fromCInt (#const PGRES_PIPELINE_SYNC) = Just PipelineSync fromCInt (#const PGRES_PIPELINE_ABORTED) = Just PipelineAbort + #endif fromCInt _ = Nothing instance ToCInt ExecStatus where @@ -80,8 +82,16 @@ instance ToCInt ExecStatus where toCInt NonfatalError = (#const PGRES_NONFATAL_ERROR) toCInt FatalError = (#const PGRES_FATAL_ERROR) toCInt SingleTuple = (#const PGRES_SINGLE_TUPLE) + #if HASKELL_LIBPQ_PIPELINE_MODE toCInt PipelineSync = (#const PGRES_PIPELINE_SYNC) + #else + toCInt PipelineSync = error "pipeline mode is disabled" + #endif + #if HASKELL_LIBPQ_PIPELINE_MODE toCInt PipelineAbort = (#const PGRES_PIPELINE_ABORTED) + #else + toCInt PipelineSync = error "pipeline mode is disabled" + #endif data FieldCode @@ -245,7 +255,7 @@ instance FromCInt ConnStatus where fromCInt (#const CONNECTION_SSL_STARTUP) = return ConnectionSSLStartup -- fromCInt (#const CONNECTION_NEEDED) = return ConnectionNeeded fromCInt _ = Nothing - + data TransactionStatus = TransIdle -- ^ currently idle @@ -288,9 +298,11 @@ data PipelineStatus deriving (Eq, Show) instance FromCInt PipelineStatus where + #if HASKELL_LIBPQ_PIPELINE_MODE fromCInt (#const PQ_PIPELINE_ON) = return PipelineOn fromCInt (#const PQ_PIPELINE_OFF) = return PipelineOff fromCInt (#const PQ_PIPELINE_ABORTED) = return PipelineAborted + #endif fromCInt _ = Nothing ------------------------------------------------------------------------------- diff --git a/src/Database/PostgreSQL/LibPQ/FFI.hs b/src/Database/PostgreSQL/LibPQ/FFI.hs index 5bc11ac..37224f8 100644 --- a/src/Database/PostgreSQL/LibPQ/FFI.hs +++ b/src/Database/PostgreSQL/LibPQ/FFI.hs @@ -118,7 +118,7 @@ foreign import capi "hs-libpq.h PQputCopyData" foreign import capi "hs-libpq.h PQputCopyEnd" c_PQputCopyEnd :: Ptr PGconn -> CString -> IO CInt - + -- TODO: GHC #22043 foreign import ccall "hs-libpq.h PQgetCopyData" c_PQgetCopyData :: Ptr PGconn -> Ptr (Ptr Word8) -> CInt -> IO CInt @@ -302,20 +302,25 @@ foreign import capi "hs-libpq.h &PQfreemem" foreign import capi "hs-libpq.h PQfreemem" c_PQfreemem :: Ptr a -> IO () -foreign import capi "hs-libpq.h PQpipelineStatus" - c_PQpipelineStatus :: Ptr PGconn -> IO CInt +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQpipelineStatus" + c_PQpipelineStatus :: Ptr PGconn -> IO CInt -foreign import capi "hs-libpq.h PQenterPipelineMode" - c_PQenterPipelineMode :: Ptr PGconn -> IO CInt +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQenterPipelineMode" + c_PQenterPipelineMode :: Ptr PGconn -> IO CInt -foreign import capi "hs-libpq.h PQexitPipelineMode" - c_PQexitPipelineMode :: Ptr PGconn -> IO CInt +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQexitPipelineMode" + c_PQexitPipelineMode :: Ptr PGconn -> IO CInt -foreign import capi "hs-libpq.h PQpipelineSync" - c_PQpipelineSync :: Ptr PGconn -> IO CInt +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQpipelineSync" + c_PQpipelineSync :: Ptr PGconn -> IO CInt -foreign import capi "hs-libpq.h PQsendFlushRequest" - c_PQsendFlushRequest :: Ptr PGconn -> IO CInt +-- requires libpq >= 14 +foreign import capi "hs-libpq.h PQsendFlushRequest" + c_PQsendFlushRequest :: Ptr PGconn -> IO CInt ------------------------------------------------------------------------------- -- FFI imports: noticebuffers diff --git a/test/Smoke.hs b/test/Smoke.hs index 88e036f..457c281 100644 --- a/test/Smoke.hs +++ b/test/Smoke.hs @@ -1,6 +1,6 @@ module Main (main) where -import Control.Monad (unless) +import Control.Monad (unless, when) import Database.PostgreSQL.LibPQ import Data.Foldable (toList) import System.Environment (getEnvironment) @@ -13,7 +13,7 @@ main = do libpqVersion >>= print withConnstring $ \connstring -> do smoke connstring - testPipeline connstring + when isEnabledPipeline $ testPipeline connstring withConnstring :: (BS8.ByteString -> IO ()) -> IO () withConnstring kont = do @@ -50,7 +50,6 @@ smoke connstring = do transactionStatus conn >>= print protocolVersion conn >>= print serverVersion conn >>= print - pipelineStatus conn >>= print s <- status conn unless (s == ConnectionOk) exitFailure @@ -61,6 +60,8 @@ testPipeline :: BS8.ByteString -> IO () testPipeline connstring = do conn <- connectdb connstring + pipelineStatus conn >>= print + setnonblocking conn True `shouldReturn` True enterPipelineMode conn `shouldReturn` True pipelineStatus conn `shouldReturn` PipelineOn