From 35315b6e79a0caec5ee3f6e446be68d1b4a4cfd1 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 15 May 2024 11:45:08 +0800 Subject: [PATCH 1/4] support --- src/frontend/src/session/cursor_manager.rs | 36 +++++++++++++-------- src/utils/pgwire/src/pg_field_descriptor.rs | 2 +- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 13eaec03b166..c9cafdaac4ae 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -199,7 +199,7 @@ impl SubscriptionCursor { pub async fn next_row( &mut self, - handle_args: HandlerArgs, + handle_args: &HandlerArgs, ) -> Result<(Option, Vec)> { loop { match &mut self.state { @@ -313,20 +313,30 @@ impl SubscriptionCursor { ) .into()); } - // `FETCH NEXT` is equivalent to `FETCH 1`. - if count != 1 { - Err(crate::error::ErrorCode::InternalError( - "FETCH count with subscription is not supported".to_string(), - ) - .into()) - } else { - let (row, pg_descs) = self.next_row(handle_args).await?; - if let Some(row) = row { - Ok((vec![row], pg_descs)) - } else { - Ok((vec![], pg_descs)) + + let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); + let mut cur = 0; + let mut pg_descs_ans = vec![]; + while cur < count + { + let (row, pg_descs) = self.next_row(&handle_args).await?; + if pg_descs_ans.is_empty(){ + pg_descs_ans = pg_descs; + }else{ + break; + } + match row{ + Some(row) => { + cur += 1; + ans.push(row); + } + None => { + break; + } } } + + Ok((ans, pg_descs_ans)) } async fn get_next_rw_timestamp( diff --git a/src/utils/pgwire/src/pg_field_descriptor.rs b/src/utils/pgwire/src/pg_field_descriptor.rs index 0b33c5743c10..c10eeacd539f 100644 --- a/src/utils/pgwire/src/pg_field_descriptor.rs +++ b/src/utils/pgwire/src/pg_field_descriptor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[derive(Debug, Clone)] +#[derive(Debug, Clone,PartialEq, Eq)] pub struct PgFieldDescriptor { name: String, table_oid: i32, From af3c25db08772e751ed45e1694cf089e4532cb81 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 15 May 2024 14:43:42 +0800 Subject: [PATCH 2/4] fix --- e2e_test/subscription/main.py | 113 ++++++++++++++---- src/frontend/src/handler/declare_cursor.rs | 3 +- .../optimizer/plan_node/generic/log_scan.rs | 13 ++ src/frontend/src/session/cursor_manager.rs | 78 ++++++++---- src/utils/pgwire/src/pg_field_descriptor.rs | 2 +- 5 files changed, 165 insertions(+), 44 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index c7fcc56a35ac..3ffaefd02cee 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -33,8 +33,7 @@ def execute_insert(sql,conn): conn.commit() cur.close() -def check_rows_data(expect_vec,rows,status): - row = rows[0] +def check_rows_data(expect_vec,row,status): value_len = len(row) for index, value in enumerate(row): if index == value_len - 1: @@ -56,7 +55,7 @@ def test_cursor_snapshot(): execute_insert("declare cur subscription cursor for sub",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row,1) + check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -75,7 +74,7 @@ def test_cursor_snapshot_log_store(): execute_insert("declare cur subscription cursor for sub",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row,1) + check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) @@ -83,9 +82,9 @@ def test_cursor_snapshot_log_store(): execute_insert("insert into t1 values(5,5)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -109,11 +108,11 @@ def test_cursor_since_begin(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row,1) + check_rows_data([6,6],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -138,7 +137,7 @@ def test_cursor_since_now(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row,1) + check_rows_data([6,6],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -164,27 +163,27 @@ def test_cursor_since_rw_timestamp(): row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_1 = row[0][valuelen - 1] - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_2 = row[0][valuelen - 1] - 1 - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_3 = row[0][valuelen - 1] + 1 - check_rows_data([6,6],row,1) + check_rows_data([6,6],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row,1) + check_rows_data([5,5],row[0],1) execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn) @@ -206,7 +205,7 @@ def test_cursor_op(): execute_insert("declare cur subscription cursor for sub",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row,1) + check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) assert row == [] @@ -215,24 +214,96 @@ def test_cursor_op(): execute_insert("update t1 set v2 = 10 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,1) + check_rows_data([4,4],row[0],1) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row,4) + check_rows_data([4,4],row[0],4) row = execute_query("fetch next from cur",conn) - check_rows_data([4,10],row,3) + check_rows_data([4,10],row[0],3) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("delete from t1 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,10],row,2) + check_rows_data([4,10],row[0],2) row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() +def test_cursor_with_table_alter(): + print(f"test_cursor_with_table_alter") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("alter table t1 add v3 int",conn) + execute_insert("insert into t1 values(4,4,4)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,2],row[0],1) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4,4],row[0],1) + execute_insert("insert into t1 values(5,5,5)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([5,5,5],row[0],1) + execute_insert("alter table t1 drop column v2",conn) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([6,6],row[0],1) + drop_table_subscription() + +def test_cursor_fetch_n(): + print(f"test_cursor_with_table_alter") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(7,7)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(8,8)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(9,9)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(10,10)",conn) + execute_insert("flush",conn) + execute_insert("update t1 set v2 = 100 where v1 = 10",conn) + execute_insert("flush",conn) + row = execute_query("fetch 6 from cur",conn) + assert len(row) == 6 + check_rows_data([1,2],row[0],1) + check_rows_data([4,4],row[1],1) + check_rows_data([5,5],row[2],1) + check_rows_data([6,6],row[3],1) + check_rows_data([7,7],row[4],1) + check_rows_data([8,8],row[5],1) + row = execute_query("fetch 6 from cur",conn) + assert len(row) == 4 + check_rows_data([9,9],row[0],1) + check_rows_data([10,10],row[1],1) + check_rows_data([10,10],row[2],4) + check_rows_data([10,100],row[3],3) + drop_table_subscription() + if __name__ == "__main__": test_cursor_snapshot() test_cursor_op() @@ -240,3 +311,5 @@ def test_cursor_op(): test_cursor_since_rw_timestamp() test_cursor_since_now() test_cursor_since_begin() + test_cursor_with_table_alter() + test_cursor_fetch_n() diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 6bd4e300ec0f..25e146fa714c 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -58,7 +58,6 @@ async fn handle_declare_subscription_cursor( let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone(); let subscription = session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?; - let table = session.get_table_by_id(&subscription.dependent_table_id)?; // Start the first query of cursor, which includes querying the table and querying the subscription's logstore let start_rw_timestamp = match rw_timestamp { Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => { @@ -81,8 +80,8 @@ async fn handle_declare_subscription_cursor( .add_subscription_cursor( cursor_name.clone(), start_rw_timestamp, + subscription.dependent_table_id, subscription, - table, &handle_args, ) .await?; diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index cd5ddebdc072..498d4a44b0fc 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -141,6 +141,19 @@ impl LogScan { Schema { fields } } + pub(crate) fn schema_without_table_name(&self) -> Schema { + let mut fields: Vec<_> = self + .output_col_idx + .iter() + .map(|tb_idx| { + let col = &self.table_desc.columns[*tb_idx]; + Field::from(col) + }) + .collect(); + fields.push(Field::with_name(OP_TYPE, OP_NAME)); + Schema { fields } + } + pub(crate) fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index c9cafdaac4ae..d8ed2a604d47 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::mem; use core::time::Duration; use std::collections::{HashMap, VecDeque}; use std::rc::Rc; @@ -30,6 +31,7 @@ use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; +use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; use crate::handler::declare_cursor::create_stream_for_cursor_stmt; use crate::handler::query::{create_stream, gen_batch_plan_fragmenter, BatchQueryPlanResult}; @@ -136,9 +138,11 @@ enum State { pub struct SubscriptionCursor { cursor_name: String, subscription: Arc, - table: Arc, + dependent_table_id: TableId, cursor_need_drop_time: Instant, state: State, + cache_seek_row: Option, + cache_seek_descs: Vec, } impl SubscriptionCursor { @@ -146,7 +150,7 @@ impl SubscriptionCursor { cursor_name: String, start_timestamp: Option, subscription: Arc, - table: Arc, + dependent_table_id: TableId, handle_args: &HandlerArgs, ) -> Result { let state = if let Some(start_timestamp) = start_timestamp { @@ -160,7 +164,7 @@ impl SubscriptionCursor { // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? let (row_stream, pg_descs) = - Self::initiate_query(None, &table, handle_args.clone()).await?; + Self::initiate_query(None, &dependent_table_id, handle_args.clone()).await?; let pinned_epoch = handle_args .session .get_pinned_snapshot() @@ -191,15 +195,42 @@ impl SubscriptionCursor { Ok(Self { cursor_name, subscription, - table, + dependent_table_id, cursor_need_drop_time, state, + cache_seek_row: None, + cache_seek_descs: vec![], }) } + pub async fn seek_descs( + &mut self, + handle_args: &HandlerArgs, + ) -> Result> { + if self.cache_seek_descs.is_empty() { + let (row, descs) = self.next_row_inner(handle_args).await?; + self.cache_seek_row = row; + self.cache_seek_descs = descs; + } + Ok(self.cache_seek_descs.clone()) + } + pub async fn next_row( &mut self, handle_args: &HandlerArgs, + ) -> Result<(Option, Vec)> { + if self.cache_seek_descs.is_empty() { + self.next_row_inner(handle_args).await + } else { + let descs = mem::take(&mut self.cache_seek_descs); + let row = self.cache_seek_row.take(); + Ok((row, descs)) + } + } + + async fn next_row_inner( + &mut self, + handle_args: &HandlerArgs, ) -> Result<(Option, Vec)> { loop { match &mut self.state { @@ -212,7 +243,7 @@ impl SubscriptionCursor { // Initiate a new batch query to continue fetching match Self::get_next_rw_timestamp( *seek_timestamp, - self.table.id.table_id, + self.dependent_table_id.table_id, *expected_timestamp, handle_args.clone(), ) @@ -221,7 +252,7 @@ impl SubscriptionCursor { Ok((Some(rw_timestamp), expected_timestamp)) => { let (mut row_stream, pg_descs) = Self::initiate_query( Some(rw_timestamp), - &self.table, + &self.dependent_table_id, handle_args.clone(), ) .await?; @@ -313,19 +344,19 @@ impl SubscriptionCursor { ) .into()); } - + let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; let mut pg_descs_ans = vec![]; - while cur < count - { - let (row, pg_descs) = self.next_row(&handle_args).await?; - if pg_descs_ans.is_empty(){ + while cur < count { + let pg_descs = self.seek_descs(&handle_args).await?; + if pg_descs_ans.is_empty() { pg_descs_ans = pg_descs; - }else{ + } else if !pg_descs_ans.eq(&pg_descs) { break; } - match row{ + let (row, _) = self.next_row(&handle_args).await?; + match row { Some(row) => { cur += 1; ans.push(row); @@ -368,16 +399,17 @@ impl SubscriptionCursor { async fn initiate_query( rw_timestamp: Option, - table_catalog: &TableCatalog, + dependent_table_id: &TableId, handle_args: HandlerArgs, ) -> Result<(PgResponseStream, Vec)> { + let session = handle_args.clone().session; + let table_catalog = session.get_table_by_id(dependent_table_id)?; let (row_stream, pg_descs) = if let Some(rw_timestamp) = rw_timestamp { - let context = OptimizerContext::from_handler_args(handle_args.clone()); - let session = handle_args.session; + let context = OptimizerContext::from_handler_args(handle_args); let plan_fragmenter_result = gen_batch_plan_fragmenter( &session, Self::create_batch_plan_for_cursor( - table_catalog, + &table_catalog, &session, context.into(), rw_timestamp, @@ -468,7 +500,8 @@ impl SubscriptionCursor { new_epoch, ); let batch_log_seq_scan = BatchLogSeqScan::new(core); - let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len()); + let out_fields = + FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema_without_table_name().len()); let out_names = batch_log_seq_scan.core().column_names(); // Here we just need a plan_root to call the method, only out_fields and out_names will be used let plan_root = PlanRoot::new_with_batch_plan( @@ -478,7 +511,10 @@ impl SubscriptionCursor { out_fields, out_names, ); - let schema = batch_log_seq_scan.core().schema().clone(); + let schema = batch_log_seq_scan + .core() + .schema_without_table_name() + .clone(); let (batch_log_seq_scan, query_mode) = match session.config().query_mode() { QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local), QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local), @@ -507,15 +543,15 @@ impl CursorManager { &self, cursor_name: String, start_timestamp: Option, + dependent_table_id: TableId, subscription: Arc, - table: Arc, handle_args: &HandlerArgs, ) -> Result<()> { let cursor = SubscriptionCursor::new( cursor_name.clone(), start_timestamp, subscription, - table, + dependent_table_id, handle_args, ) .await?; diff --git a/src/utils/pgwire/src/pg_field_descriptor.rs b/src/utils/pgwire/src/pg_field_descriptor.rs index c10eeacd539f..82d75c78f795 100644 --- a/src/utils/pgwire/src/pg_field_descriptor.rs +++ b/src/utils/pgwire/src/pg_field_descriptor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[derive(Debug, Clone,PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PgFieldDescriptor { name: String, table_oid: i32, From ddf47468beeb4f5695d0e54efeb98bf4987bfe89 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 30 May 2024 14:33:03 +0800 Subject: [PATCH 3/4] fix comm --- src/frontend/src/session/cursor_manager.rs | 59 +++++----------------- 1 file changed, 14 insertions(+), 45 deletions(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index d8ed2a604d47..4d6350ca7f6a 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::mem; use core::time::Duration; use std::collections::{HashMap, VecDeque}; use std::rc::Rc; @@ -141,8 +140,6 @@ pub struct SubscriptionCursor { dependent_table_id: TableId, cursor_need_drop_time: Instant, state: State, - cache_seek_row: Option, - cache_seek_descs: Vec, } impl SubscriptionCursor { @@ -198,39 +195,13 @@ impl SubscriptionCursor { dependent_table_id, cursor_need_drop_time, state, - cache_seek_row: None, - cache_seek_descs: vec![], }) } - pub async fn seek_descs( - &mut self, - handle_args: &HandlerArgs, - ) -> Result> { - if self.cache_seek_descs.is_empty() { - let (row, descs) = self.next_row_inner(handle_args).await?; - self.cache_seek_row = row; - self.cache_seek_descs = descs; - } - Ok(self.cache_seek_descs.clone()) - } - - pub async fn next_row( - &mut self, - handle_args: &HandlerArgs, - ) -> Result<(Option, Vec)> { - if self.cache_seek_descs.is_empty() { - self.next_row_inner(handle_args).await - } else { - let descs = mem::take(&mut self.cache_seek_descs); - let row = self.cache_seek_row.take(); - Ok((row, descs)) - } - } - - async fn next_row_inner( + async fn next_row( &mut self, handle_args: &HandlerArgs, + expected_pg_descs: &Vec, ) -> Result<(Option, Vec)> { loop { match &mut self.state { @@ -266,10 +237,14 @@ impl SubscriptionCursor { from_snapshot, rw_timestamp, row_stream, - pg_descs, + pg_descs: pg_descs.clone(), remaining_rows, expected_timestamp, }; + if (!expected_pg_descs.is_empty()) && expected_pg_descs.ne(&pg_descs) { + println!("expected_pg_descs{:?},{:?}", expected_pg_descs, pg_descs); + return Ok((None, vec![])); + } } Ok((None, _)) => return Ok((None, vec![])), Err(e) => { @@ -349,15 +324,10 @@ impl SubscriptionCursor { let mut cur = 0; let mut pg_descs_ans = vec![]; while cur < count { - let pg_descs = self.seek_descs(&handle_args).await?; - if pg_descs_ans.is_empty() { - pg_descs_ans = pg_descs; - } else if !pg_descs_ans.eq(&pg_descs) { - break; - } - let (row, _) = self.next_row(&handle_args).await?; + let (row, descs_ans) = self.next_row(&handle_args, &pg_descs_ans).await?; match row { Some(row) => { + pg_descs_ans = descs_ans; cur += 1; ans.push(row); } @@ -500,8 +470,11 @@ impl SubscriptionCursor { new_epoch, ); let batch_log_seq_scan = BatchLogSeqScan::new(core); - let out_fields = - FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema_without_table_name().len()); + let schema = batch_log_seq_scan + .core() + .schema_without_table_name() + .clone(); + let out_fields = FixedBitSet::from_iter(0..schema.len()); let out_names = batch_log_seq_scan.core().column_names(); // Here we just need a plan_root to call the method, only out_fields and out_names will be used let plan_root = PlanRoot::new_with_batch_plan( @@ -511,10 +484,6 @@ impl SubscriptionCursor { out_fields, out_names, ); - let schema = batch_log_seq_scan - .core() - .schema_without_table_name() - .clone(); let (batch_log_seq_scan, query_mode) = match session.config().query_mode() { QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local), QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local), From 8b247842ddb0a9eec649d3407f92633e43aafbdb Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 30 May 2024 15:33:26 +0800 Subject: [PATCH 4/4] fix comm --- src/frontend/src/session/cursor_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 4d6350ca7f6a..46eca3beb996 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -242,7 +242,8 @@ impl SubscriptionCursor { expected_timestamp, }; if (!expected_pg_descs.is_empty()) && expected_pg_descs.ne(&pg_descs) { - println!("expected_pg_descs{:?},{:?}", expected_pg_descs, pg_descs); + // If the user alters the table upstream of the sub, there will be different descs here. + // So we should output data for different descs in two separate batches return Ok((None, vec![])); } }