From 4bef0868a4272877e5f8c188a5036e66a1e38c05 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 30 May 2024 16:09:04 +0800 Subject: [PATCH] feat(frontend): support fetch n from subscription cursor (#16764) --- e2e_test/subscription/main.py | 113 ++++++++++++++---- src/frontend/src/handler/declare_cursor.rs | 3 +- .../optimizer/plan_node/generic/log_scan.rs | 13 ++ src/frontend/src/session/cursor_manager.rs | 74 +++++++----- src/utils/pgwire/src/pg_field_descriptor.rs | 2 +- 5 files changed, 153 insertions(+), 52 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index c7fcc56a35ac..3ffaefd02cee 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -33,8 +33,7 @@ def execute_insert(sql,conn): conn.commit() cur.close() -def check_rows_data(expect_vec,rows,status): - row = rows[0] +def check_rows_data(expect_vec,row,status): value_len = len(row) for index, value in enumerate(row): if index == value_len - 1: @@ -56,7 +55,7 @@ def test_cursor_snapshot(): execute_insert("declare cur subscription cursor for sub",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row,1) + check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -75,7 +74,7 @@ def test_cursor_snapshot_log_store(): execute_insert("declare cur subscription cursor for sub",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row,1) + check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) @@ -83,9 +82,9 @@ def test_cursor_snapshot_log_store(): execute_insert("insert into t1 values(5,5)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -109,11 +108,11 @@ def test_cursor_since_begin(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row,1) + check_rows_data([6,6],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -138,7 +137,7 @@ def test_cursor_since_now(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row,1) + check_rows_data([6,6],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -164,27 +163,27 @@ def test_cursor_since_rw_timestamp(): row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_1 = row[0][valuelen - 1] - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_2 = row[0][valuelen - 1] - 1 - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_3 = row[0][valuelen - 1] + 1 - check_rows_data([6,6],row,1) + check_rows_data([6,6],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn) @@ -206,7 +205,7 @@ def test_cursor_op(): execute_insert("declare cur subscription cursor for sub",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row,1) + check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] @@ -215,24 +214,96 @@ def test_cursor_op(): execute_insert("update t1 set v2 = 10 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,4) + check_rows_data([4,4],row[0],4) row = execute_query("fetch next from cur",conn) - check_rows_data([4,10],row,3) + check_rows_data([4,10],row[0],3) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("delete from t1 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,10],row,2) + check_rows_data([4,10],row[0],2) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() +def test_cursor_with_table_alter(): + print(f"test_cursor_with_table_alter") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("alter table t1 add v3 int",conn) + execute_insert("insert into t1 values(4,4,4)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,2],row[0],1) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4,4],row[0],1) + execute_insert("insert into t1 values(5,5,5)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([5,5,5],row[0],1) + execute_insert("alter table t1 drop column v2",conn) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([6,6],row[0],1) + drop_table_subscription() + +def test_cursor_fetch_n(): + print(f"test_cursor_with_table_alter") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(7,7)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(8,8)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(9,9)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(10,10)",conn) + execute_insert("flush",conn) + execute_insert("update t1 set v2 = 100 where v1 = 10",conn) + execute_insert("flush",conn) + row = execute_query("fetch 6 from cur",conn) + assert len(row) == 6 + check_rows_data([1,2],row[0],1) + check_rows_data([4,4],row[1],1) + check_rows_data([5,5],row[2],1) + check_rows_data([6,6],row[3],1) + check_rows_data([7,7],row[4],1) + check_rows_data([8,8],row[5],1) + row = execute_query("fetch 6 from cur",conn) + assert len(row) == 4 + check_rows_data([9,9],row[0],1) + check_rows_data([10,10],row[1],1) + check_rows_data([10,10],row[2],4) + check_rows_data([10,100],row[3],3) + drop_table_subscription() + if __name__ == "__main__": test_cursor_snapshot() test_cursor_op() @@ -240,3 +311,5 @@ def test_cursor_op(): test_cursor_since_rw_timestamp() test_cursor_since_now() test_cursor_since_begin() + test_cursor_with_table_alter() + test_cursor_fetch_n() diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 6bd4e300ec0f..25e146fa714c 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -58,7 +58,6 @@ async fn handle_declare_subscription_cursor( let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone(); let subscription = session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?; - let table = session.get_table_by_id(&subscription.dependent_table_id)?; // Start the first query of cursor, which includes querying the table and querying the subscription's logstore let start_rw_timestamp = match rw_timestamp { Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => { @@ -81,8 +80,8 @@ async fn handle_declare_subscription_cursor( .add_subscription_cursor( cursor_name.clone(), start_rw_timestamp, + subscription.dependent_table_id, subscription, - table, &handle_args, ) .await?; diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index cd5ddebdc072..498d4a44b0fc 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -141,6 +141,19 @@ impl LogScan { Schema { fields } } + pub(crate) fn schema_without_table_name(&self) -> Schema { + let mut fields: Vec<_> = self + .output_col_idx + .iter() + .map(|tb_idx| { + let col = &self.table_desc.columns[*tb_idx]; + Field::from(col) + }) + .collect(); + fields.push(Field::with_name(OP_TYPE, OP_NAME)); + Schema { fields } + } + pub(crate) fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 13eaec03b166..46eca3beb996 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -30,6 +30,7 @@ use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; +use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; use crate::handler::declare_cursor::create_stream_for_cursor_stmt; use crate::handler::query::{create_stream, gen_batch_plan_fragmenter, BatchQueryPlanResult}; @@ -136,7 +137,7 @@ enum State { pub struct SubscriptionCursor { cursor_name: String, subscription: Arc, - table: Arc, + dependent_table_id: TableId, cursor_need_drop_time: Instant, state: State, } @@ -146,7 +147,7 @@ impl SubscriptionCursor { cursor_name: String, start_timestamp: Option, subscription: Arc, - table: Arc, + dependent_table_id: TableId, handle_args: &HandlerArgs, ) -> Result { let state = if let Some(start_timestamp) = start_timestamp { @@ -160,7 +161,7 @@ impl SubscriptionCursor { // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? let (row_stream, pg_descs) = - Self::initiate_query(None, &table, handle_args.clone()).await?; + Self::initiate_query(None, &dependent_table_id, handle_args.clone()).await?; let pinned_epoch = handle_args .session .get_pinned_snapshot() @@ -191,15 +192,16 @@ impl SubscriptionCursor { Ok(Self { cursor_name, subscription, - table, + dependent_table_id, cursor_need_drop_time, state, }) } - pub async fn next_row( + async fn next_row( &mut self, - handle_args: HandlerArgs, + handle_args: &HandlerArgs, + expected_pg_descs: &Vec, ) -> Result<(Option, Vec)> { loop { match &mut self.state { @@ -212,7 +214,7 @@ impl SubscriptionCursor { // Initiate a new batch query to continue fetching match Self::get_next_rw_timestamp( *seek_timestamp, - self.table.id.table_id, + self.dependent_table_id.table_id, *expected_timestamp, handle_args.clone(), ) @@ -221,7 +223,7 @@ impl SubscriptionCursor { Ok((Some(rw_timestamp), expected_timestamp)) => { let (mut row_stream, pg_descs) = Self::initiate_query( Some(rw_timestamp), - &self.table, + &self.dependent_table_id, handle_args.clone(), ) .await?; @@ -235,10 +237,15 @@ impl SubscriptionCursor { from_snapshot, rw_timestamp, row_stream, - pg_descs, + pg_descs: pg_descs.clone(), remaining_rows, expected_timestamp, }; + if (!expected_pg_descs.is_empty()) && expected_pg_descs.ne(&pg_descs) { + // If the user alters the table upstream of the sub, there will be different descs here. + // So we should output data for different descs in two separate batches + return Ok((None, vec![])); + } } Ok((None, _)) => return Ok((None, vec![])), Err(e) => { @@ -313,20 +320,25 @@ impl SubscriptionCursor { ) .into()); } - // `FETCH NEXT` is equivalent to `FETCH 1`. - if count != 1 { - Err(crate::error::ErrorCode::InternalError( - "FETCH count with subscription is not supported".to_string(), - ) - .into()) - } else { - let (row, pg_descs) = self.next_row(handle_args).await?; - if let Some(row) = row { - Ok((vec![row], pg_descs)) - } else { - Ok((vec![], pg_descs)) + + let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); + let mut cur = 0; + let mut pg_descs_ans = vec![]; + while cur < count { + let (row, descs_ans) = self.next_row(&handle_args, &pg_descs_ans).await?; + match row { + Some(row) => { + pg_descs_ans = descs_ans; + cur += 1; + ans.push(row); + } + None => { + break; + } } } + + Ok((ans, pg_descs_ans)) } async fn get_next_rw_timestamp( @@ -358,16 +370,17 @@ impl SubscriptionCursor { async fn initiate_query( rw_timestamp: Option, - table_catalog: &TableCatalog, + dependent_table_id: &TableId, handle_args: HandlerArgs, ) -> Result<(PgResponseStream, Vec)> { + let session = handle_args.clone().session; + let table_catalog = session.get_table_by_id(dependent_table_id)?; let (row_stream, pg_descs) = if let Some(rw_timestamp) = rw_timestamp { - let context = OptimizerContext::from_handler_args(handle_args.clone()); - let session = handle_args.session; + let context = OptimizerContext::from_handler_args(handle_args); let plan_fragmenter_result = gen_batch_plan_fragmenter( &session, Self::create_batch_plan_for_cursor( - table_catalog, + &table_catalog, &session, context.into(), rw_timestamp, @@ -458,7 +471,11 @@ impl SubscriptionCursor { new_epoch, ); let batch_log_seq_scan = BatchLogSeqScan::new(core); - let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len()); + let schema = batch_log_seq_scan + .core() + .schema_without_table_name() + .clone(); + let out_fields = FixedBitSet::from_iter(0..schema.len()); let out_names = batch_log_seq_scan.core().column_names(); // Here we just need a plan_root to call the method, only out_fields and out_names will be used let plan_root = PlanRoot::new_with_batch_plan( @@ -468,7 +485,6 @@ impl SubscriptionCursor { out_fields, out_names, ); - let schema = batch_log_seq_scan.core().schema().clone(); let (batch_log_seq_scan, query_mode) = match session.config().query_mode() { QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local), QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local), @@ -497,15 +513,15 @@ impl CursorManager { &self, cursor_name: String, start_timestamp: Option, + dependent_table_id: TableId, subscription: Arc, - table: Arc, handle_args: &HandlerArgs, ) -> Result<()> { let cursor = SubscriptionCursor::new( cursor_name.clone(), start_timestamp, subscription, - table, + dependent_table_id, handle_args, ) .await?; diff --git a/src/utils/pgwire/src/pg_field_descriptor.rs b/src/utils/pgwire/src/pg_field_descriptor.rs index 0b33c5743c10..82d75c78f795 100644 --- a/src/utils/pgwire/src/pg_field_descriptor.rs +++ b/src/utils/pgwire/src/pg_field_descriptor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PgFieldDescriptor { name: String, table_oid: i32,