Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline mode API with cabal flag #52

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
timeout-minutes:
60
container:
image: buildpack-deps:bionic
image: buildpack-deps:jammy
services:
postgres:
image: postgres:14
Expand Down
9 changes: 9 additions & 0 deletions postgresql-libpq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ flag use-pkg-config
default: False
manual: True

-- If true, use libpq bindings for Pipeline Mode, otherwise throw
-- errors on usage. It requires libpq >=14
flag enable-pipeline-mode
default: True
manual: False

library
default-language: Haskell2010
hs-source-dirs: src
Expand Down Expand Up @@ -76,6 +82,9 @@ library
if os(windows)
build-depends: Win32 >=2.2.0.2 && <2.14

if flag(enable-pipeline-mode)
cpp-options: -DHASKELL_LIBPQ_PIPELINE_MODE

if flag(use-pkg-config)
pkgconfig-depends: libpq >=10.22

Expand Down
97 changes: 96 additions & 1 deletion src/Database/PostgreSQL/LibPQ.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ module Database.PostgreSQL.LibPQ
, FlushStatus(..)
, flush

-- * Pipeline Mode
-- $pipeline
, PipelineStatus(..)
, pipelineStatus
, enterPipelineMode
, exitPipelineMode
, pipelineSync
, sendFlushRequest
, isEnabledPipeline

-- * Cancelling Queries in Progress
-- $cancel
, Cancel
Expand Down Expand Up @@ -1636,6 +1646,92 @@ flush connection =
1 -> return FlushWriting
_ -> return FlushFailed

-- $pipelinemode
-- These functions control behaviour in pipeline mode.
--
-- Pipeline mode allows applications to send a query
-- without having to read the result of the previously
-- sent query. Taking advantage of the pipeline mode,
-- a client will wait less for the server, since multiple
-- queries/results can be sent/received in
-- a single network transaction.

ifEnabledPipeline :: IO a -> IO a
ifEnabledPipeline m =
if isEnabledPipeline
then m
else error "pipeline mode is disabled"
{-# INLINE ifEnabledPipeline #-}

-- | Returns the current pipeline mode status of the connection.
pipelineStatus :: Connection
-> IO PipelineStatus
pipelineStatus connection = ifEnabledPipeline $ do
stat <- withConn connection c_PQpipelineStatus
maybe
(fail $ "Unknown pipeline status " ++ show stat)
return
(fromCInt stat)

-- | Causes a connection to enter pipeline mode
-- if it is currently idle or already in pipeline mode.
--
-- Returns 'True' for success. Returns 'False' and has no effect
-- if the connection is not currently idle, i.e.,
-- it has a result ready, or it is waiting for more
-- input from the server, etc.
-- This function does not actually send anything to the server,
-- it just changes the libpq connection state.
enterPipelineMode :: Connection
-> IO Bool
enterPipelineMode connection = ifEnabledPipeline $
enumFromConn connection c_PQenterPipelineMode

-- | Causes a connection to exit pipeline mode
-- if it is currently in pipeline mode with
-- an empty queue and no pending results.
--
-- Returns 'True' for success.
-- Returns 'True' and takes no action if not in pipeline mode.
-- If the current statement isn't finished processing,
-- or 'getResult' has not been called to collect results
-- from all previously sent query, returns True
-- (in which case, use 'errorMessage' to get more
-- information about the failure).
exitPipelineMode :: Connection
-> IO Bool
exitPipelineMode connection = ifEnabledPipeline $
enumFromConn connection c_PQexitPipelineMode

-- | Marks a synchronization point in a pipeline by sending
-- a sync message and flushing the send buffer.
-- This serves as the delimiter of an implicit transaction
-- and an error recovery point.
--
-- Returns 'True' for success.
-- Returns 'False' if the connection is not in pipeline mode or
-- sending a sync message failed.
pipelineSync :: Connection
-> IO Bool
pipelineSync connection = ifEnabledPipeline $
enumFromConn connection c_PQpipelineSync

-- | Sends a request for the server to flush its output buffer.
--
-- Returns 'True' for success. Returns 'False' on any failure.
--
-- The server flushes its output buffer automatically
-- as a result of 'pipelineSync' being called,
-- or on any request when not in pipeline mode;
-- this function is useful to cause the server to flush
-- its output buffer in pipeline mode without establishing
-- a synchronization point.
-- Note that the request is not itself flushed
-- to the server automatically; use 'flush' if necessary.
sendFlushRequest :: Connection
-> IO Bool
sendFlushRequest connection = ifEnabledPipeline $
enumFromConn connection c_PQsendFlushRequest

-- $cancel
-- A client application can request cancellation of a command that is
Expand Down Expand Up @@ -1724,7 +1820,6 @@ notifies connection =
c_PQfreemem mn
return result


-- $control
-- These functions control miscellaneous details of libpq's behavior.

Expand Down
10 changes: 10 additions & 0 deletions src/Database/PostgreSQL/LibPQ/Compat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ mkPS fp off len = BS (plusForeignPtr fp off) len
mkPS fp off len = PS fp off len
#endif
{-# INLINE mkPS #-}

-- | Shows that compiled code was linked with LibPQ supporting Pipeline Mode.
-- LibPQ version >= 14.
isEnabledPipeline :: Bool
#if HASKELL_LIBPQ_PIPELINE_MODE
isEnabledPipeline = True
#else
isEnabledPipeline = False
#endif
{-# INLINE isEnabledPipeline #-}
66 changes: 54 additions & 12 deletions src/Database/PostgreSQL/LibPQ/Enums.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,38 @@ data ExecStatus
| NonfatalError -- ^ A nonfatal error (a notice or
-- warning) occurred.
| FatalError -- ^ A fatal error occurred.
| SingleTuple -- ^ The PGresult contains a single result tuple
| SingleTuple -- ^ The 'Result' contains a single result tuple
-- from the current command. This status occurs
-- only when single-row mode has been selected
-- for the query.
| PipelineSync -- ^ The 'Result' represents a synchronization
-- point in pipeline mode, requested by
-- 'pipelineSync'. This status occurs only
-- when pipeline mode has been selected.
| PipelineAbort -- ^ The 'Result' represents a pipeline that
-- has received an error from the server.
-- 'getResult' must be called repeatedly,
-- and each time it will return this status
-- code until the end of the current pipeline,
-- at which point it will return 'PipelineSync'
-- and normal processing can resume.
deriving (Eq, Show)

instance FromCInt ExecStatus where
fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery
fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk
fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk
fromCInt (#const PGRES_COPY_OUT) = Just CopyOut
fromCInt (#const PGRES_COPY_IN) = Just CopyIn
fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth
fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse
fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError
fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError
fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple
fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery
fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk
fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk
fromCInt (#const PGRES_COPY_OUT) = Just CopyOut
fromCInt (#const PGRES_COPY_IN) = Just CopyIn
fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth
fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse
fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError
fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError
fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple
#if HASKELL_LIBPQ_PIPELINE_MODE
fromCInt (#const PGRES_PIPELINE_SYNC) = Just PipelineSync
fromCInt (#const PGRES_PIPELINE_ABORTED) = Just PipelineAbort
#endif
fromCInt _ = Nothing

instance ToCInt ExecStatus where
Expand All @@ -67,6 +82,16 @@ instance ToCInt ExecStatus where
toCInt NonfatalError = (#const PGRES_NONFATAL_ERROR)
toCInt FatalError = (#const PGRES_FATAL_ERROR)
toCInt SingleTuple = (#const PGRES_SINGLE_TUPLE)
#if HASKELL_LIBPQ_PIPELINE_MODE
toCInt PipelineSync = (#const PGRES_PIPELINE_SYNC)
#else
toCInt PipelineSync = error "pipeline mode is disabled"
#endif
#if HASKELL_LIBPQ_PIPELINE_MODE
toCInt PipelineAbort = (#const PGRES_PIPELINE_ABORTED)
#else
toCInt PipelineSync = error "pipeline mode is disabled"
#endif


data FieldCode
Expand Down Expand Up @@ -230,7 +255,7 @@ instance FromCInt ConnStatus where
fromCInt (#const CONNECTION_SSL_STARTUP) = return ConnectionSSLStartup
-- fromCInt (#const CONNECTION_NEEDED) = return ConnectionNeeded
fromCInt _ = Nothing


data TransactionStatus
= TransIdle -- ^ currently idle
Expand Down Expand Up @@ -263,6 +288,23 @@ instance FromCInt Format where
fromCInt 1 = Just Binary
fromCInt _ = Nothing

data PipelineStatus
= PipelineOn -- ^ The 'Connection' is in pipeline mode.
| PipelineOff -- ^ The 'Connection' is /not/ in pipeline mode.
| PipelineAborted -- ^ The 'Connection' is in pipeline mode and an error
-- occurred while processing the current pipeline. The
-- aborted flag is cleared when 'getResult' returns a
-- result with status 'PipelineSync'.
deriving (Eq, Show)

instance FromCInt PipelineStatus where
#if HASKELL_LIBPQ_PIPELINE_MODE
fromCInt (#const PQ_PIPELINE_ON) = return PipelineOn
fromCInt (#const PQ_PIPELINE_OFF) = return PipelineOff
fromCInt (#const PQ_PIPELINE_ABORTED) = return PipelineAborted
#endif
fromCInt _ = Nothing

-------------------------------------------------------------------------------
-- System.IO enumerations
-------------------------------------------------------------------------------
Expand Down
22 changes: 21 additions & 1 deletion src/Database/PostgreSQL/LibPQ/FFI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ foreign import capi "hs-libpq.h PQputCopyData"

foreign import capi "hs-libpq.h PQputCopyEnd"
c_PQputCopyEnd :: Ptr PGconn -> CString -> IO CInt

-- TODO: GHC #22043
foreign import ccall "hs-libpq.h PQgetCopyData"
c_PQgetCopyData :: Ptr PGconn -> Ptr (Ptr Word8) -> CInt -> IO CInt
Expand Down Expand Up @@ -302,6 +302,26 @@ foreign import capi "hs-libpq.h &PQfreemem"
foreign import capi "hs-libpq.h PQfreemem"
c_PQfreemem :: Ptr a -> IO ()

-- requires libpq >= 14
foreign import capi "hs-libpq.h PQpipelineStatus"
c_PQpipelineStatus :: Ptr PGconn -> IO CInt

-- requires libpq >= 14
foreign import capi "hs-libpq.h PQenterPipelineMode"
c_PQenterPipelineMode :: Ptr PGconn -> IO CInt

-- requires libpq >= 14
foreign import capi "hs-libpq.h PQexitPipelineMode"
c_PQexitPipelineMode :: Ptr PGconn -> IO CInt

-- requires libpq >= 14
foreign import capi "hs-libpq.h PQpipelineSync"
c_PQpipelineSync :: Ptr PGconn -> IO CInt

-- requires libpq >= 14
foreign import capi "hs-libpq.h PQsendFlushRequest"
c_PQsendFlushRequest :: Ptr PGconn -> IO CInt

-------------------------------------------------------------------------------
-- FFI imports: noticebuffers
-------------------------------------------------------------------------------
Expand Down
41 changes: 39 additions & 2 deletions test/Smoke.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Main (main) where

import Control.Monad (unless)
import Control.Monad (unless, when)
import Database.PostgreSQL.LibPQ
import Data.Foldable (toList)
import System.Environment (getEnvironment)
Expand All @@ -11,7 +11,9 @@ import qualified Data.ByteString.Char8 as BS8
main :: IO ()
main = do
libpqVersion >>= print
withConnstring smoke
withConnstring $ \connstring -> do
smoke connstring
when isEnabledPipeline $ testPipeline connstring

withConnstring :: (BS8.ByteString -> IO ()) -> IO ()
withConnstring kont = do
Expand Down Expand Up @@ -53,3 +55,38 @@ smoke connstring = do
unless (s == ConnectionOk) exitFailure

finish conn

testPipeline :: BS8.ByteString -> IO ()
testPipeline connstring = do
conn <- connectdb connstring

pipelineStatus conn >>= print

setnonblocking conn True `shouldReturn` True
enterPipelineMode conn `shouldReturn` True
pipelineStatus conn `shouldReturn` PipelineOn
sendQueryParams conn (BS8.pack "select 1") [] Text `shouldReturn` True
sendQueryParams conn (BS8.pack "select 2") [] Text `shouldReturn` True
pipelineSync conn `shouldReturn` True

Just r1 <- getResult conn
resultStatus r1 `shouldReturn` TuplesOk
getvalue r1 0 0 `shouldReturn` Just (BS8.pack "1")
Nothing <- getResult conn

Just r2 <- getResult conn
getvalue r2 0 0 `shouldReturn` Just (BS8.pack "2")
Nothing <- getResult conn

Just r3 <- getResult conn
resultStatus r3 `shouldReturn` PipelineSync

finish conn
where
shouldBe r value =
unless (r == value) $ do
print $ "expected " <> show value <> ", got " <> show r
exitFailure
shouldReturn action value = do
r <- action
r `shouldBe` value