Skip to content

Commit

Permalink
WIP: pipeline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
d-frey committed Dec 16, 2024
1 parent 0797cbd commit 26db90e
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ It has no dependencies beyond [`libpq`➚](https://www.postgresql.org/docs/curre

## Introduction

The library provides support for database connections, transactions, nested transactions, prepared statements, large objects, connection pools, high-speed bulk data transfer, and more.
The library provides support for database connections, transactions, nested transactions, prepared statements, large objects, connection pools, pipeline mode, high-speed bulk data transfer, and more.
An extensible traits mechanism is used to convert C++ types into SQL statement parameters, and conversely to convert query results into arbitrary C++ types.
The following example shows the basic look and feel of the library.

Expand Down
61 changes: 29 additions & 32 deletions src/lib/pq/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,32 +166,33 @@ namespace tao::pq
const auto end = m_connection->timeout_end( start );

auto result = m_connection->get_result( end );
if( result ) {
switch( PQresultStatus( result.get() ) ) {
case PGRES_COPY_IN:
m_connection->put_copy_end( "unexpected COPY FROM statement" );
break;

case PGRES_COPY_OUT:
m_connection->cancel();
m_connection->clear_copy_data( end );
m_connection->clear_results( end );
throw std::runtime_error( "unexpected COPY TO statement" );

case PGRES_SINGLE_TUPLE:
if( !result ) {
throw std::runtime_error( "unable to obtain result" );
}

switch( PQresultStatus( result.get() ) ) {
case PGRES_COPY_IN:
m_connection->put_copy_end( "unexpected COPY FROM statement" );
break;

case PGRES_COPY_OUT:
m_connection->cancel();
m_connection->clear_copy_data( end );
m_connection->clear_results( end );
throw std::runtime_error( "unexpected COPY TO statement" );

case PGRES_SINGLE_TUPLE:
#if defined( LIBPQ_HAS_CHUNK_MODE )
case PGRES_TUPLES_CHUNK:
case PGRES_TUPLES_CHUNK:
#endif
return pq::result( result.release() );
return pq::result( result.release() );

default:;
}
while( auto next = m_connection->get_result( end ) ) {
result = std::move( next );
}
default:;
}
else {
throw std::runtime_error( "unable to obtain result" );

if( const auto next = m_connection->get_result( end ) ) {
const auto status = PQresultStatus( next.get() );
throw std::runtime_error( std::format( "unexpected result status: {}", PQresStatus( status ) ) );
}

return pq::result( result.release() );
Expand All @@ -203,17 +204,13 @@ namespace tao::pq
const auto end = m_connection->timeout_end( start );

auto result = m_connection->get_result( end );
if( result ) {
const auto status = PQresultStatus( result.get() );
if( status == PGRES_PIPELINE_SYNC ) {
return;
}
else {
throw std::runtime_error( std::format( "unexpected result status: {}", static_cast< int >( status ) ) );
}
if( !result ) {
throw std::runtime_error( "unable to obtain result" );
}
while( auto next = m_connection->get_result( end ) ) {
result = std::move( next );

const auto status = PQresultStatus( result.get() );
if( status != PGRES_PIPELINE_SYNC ) {
throw std::runtime_error( std::format( "unexpected result status: {}", PQresStatus( status ) ) );
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/test/pq/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ namespace
// execute an SQL statement
connection->execute( "DROP TABLE IF EXISTS tao_connection_test" );

TEST_THROWS( connection->direct()->get_result() );

// execution of empty statements fails
TEST_THROWS( connection->execute( "" ) );

Expand Down
40 changes: 38 additions & 2 deletions src/test/pq/pipeline_mode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ namespace
connection->enter_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );

connection->enter_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );
connection->exit_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

{
auto tr = connection->direct();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

connection->enter_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );

tr->send( "SELECT 42" );
tr->send( "SELECT 1234" );
connection->pipeline_sync();
Expand All @@ -45,6 +50,37 @@ namespace
connection->pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 1234 );
tr->consume_pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 1701 );
tr->consume_pipeline_sync();

TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );
connection->exit_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

tr->commit();
}

{
auto tr = connection->transaction();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

connection->enter_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );

tr->send( "SELECT 42" );
tr->send( "SELECT 1234" );
connection->pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 42 );

tr->send( "SELECT 1701" );
connection->pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 1234 );
tr->consume_pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 1701 );
tr->consume_pipeline_sync();

Expand Down

0 comments on commit 26db90e

Please sign in to comment.