Skip to content

Commit

Permalink
WIP: pipeline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
d-frey committed Dec 19, 2024
1 parent 6cffecb commit 0b8d84f
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
5 changes: 0 additions & 5 deletions include/tao/pq/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ namespace tao::pq
[[nodiscard]] auto subtransaction() -> std::shared_ptr< transaction >;
[[nodiscard]] auto pipeline() -> std::shared_ptr< pq::pipeline >;

void set_single_row_mode();
#if defined( LIBPQ_HAS_CHUNK_MODE )
void set_chunk_mode( const int rows );
#endif

template< parameter_type... As >
auto execute( const internal::zsv statement, As&&... as )
{
Expand Down
5 changes: 5 additions & 0 deletions include/tao/pq/transaction_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ namespace tao::pq
send_params( statement, p.m_size, p.m_types, p.m_values, p.m_lengths, p.m_formats );
}

void set_single_row_mode();
#if defined( LIBPQ_HAS_CHUNK_MODE )
void set_chunk_mode( const int rows );
#endif

[[nodiscard]] auto get_result( const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now() ) -> result;
void consume_pipeline_sync( const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now() );
};
Expand Down
18 changes: 0 additions & 18 deletions src/lib/pq/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,6 @@ namespace tao::pq
return std::make_shared< pq::pipeline >( m_connection );
}

void transaction::set_single_row_mode()
{
check_current_transaction();
if( PQsetSingleRowMode( m_connection->underlying_raw_ptr() ) == 0 ) {
throw std::runtime_error( "unable to switch to single row mode" );
}
}

#if defined( LIBPQ_HAS_CHUNK_MODE )
void transaction::set_chunk_mode( const int rows )
{
check_current_transaction();
if( PQsetChunkedRowsMode( m_connection->underlying_raw_ptr(), rows ) == 0 ) {
throw std::runtime_error( "unable to switch to chunk mode" );
}
}
#endif

void transaction::commit()
{
check_current_transaction();
Expand Down
18 changes: 18 additions & 0 deletions src/lib/pq/transaction_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ namespace tao::pq
m_connection->send_params( statement, n_params, types, values, lengths, formats );
}

void transaction_base::set_single_row_mode()
{
check_current_transaction();
if( PQsetSingleRowMode( m_connection->underlying_raw_ptr() ) == 0 ) {
throw std::runtime_error( "unable to switch to single row mode" );
}
}

#if defined( LIBPQ_HAS_CHUNK_MODE )
void transaction_base::set_chunk_mode( const int rows )
{
check_current_transaction();
if( PQsetChunkedRowsMode( m_connection->underlying_raw_ptr(), rows ) == 0 ) {
throw std::runtime_error( "unable to switch to chunk mode" );
}
}
#endif

auto transaction_base::get_result( const std::chrono::steady_clock::time_point start ) -> result
{
check_current_transaction();
Expand Down
36 changes: 36 additions & 0 deletions src/test/pq/pipeline_mode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,42 @@ namespace

pl->finish();
}

{
connection->execute( "DROP TABLE IF EXISTS tao_pipeline_mode" );
connection->execute( "CREATE TABLE tao_pipeline_mode ( name TEXT PRIMARY KEY, age INTEGER NOT NULL )" );

connection->prepare( "insert_user", "INSERT INTO tao_pipeline_mode ( name, age ) VALUES ( $1, $2 )" );

auto pl = connection->pipeline();

pl->send( "insert_user", "Daniel", 42 );
pl->send( "insert_user", "Tom", 41 );
pl->send( "insert_user", "Jerry", 29 );
pl->sync();

TEST_ASSERT( pl->get_result().rows_affected() == 1 ); // daniel
TEST_ASSERT( pl->get_result().rows_affected() == 1 ); // tom
TEST_ASSERT( pl->get_result().rows_affected() == 1 ); // jerry
TEST_EXECUTE( pl->consume_sync() );

pl->send( "SELECT name, age FROM tao_pipeline_mode" );
pl->send( "SELECT name, age FROM tao_pipeline_mode" );
pl->sync();

pl->set_single_row_mode();

TEST_ASSERT( pl->get_result().size() == 1 );
TEST_ASSERT( pl->get_result().size() == 1 );
TEST_ASSERT( pl->get_result().size() == 1 );
TEST_ASSERT( pl->get_result().empty() );

TEST_ASSERT( pl->get_result().size() == 3 );

TEST_EXECUTE( pl->consume_sync() );

pl->finish();
}
}

} // namespace
Expand Down

0 comments on commit 0b8d84f

Please sign in to comment.