From b69196dcf96bea4376034e34955f506018755cad Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 24 Jul 2024 14:42:27 +0800 Subject: [PATCH 1/9] su --- src/frontend/src/binder/fetch_cursor.rs | 44 +++++++++++++++++++ src/frontend/src/binder/mod.rs | 1 + src/frontend/src/binder/statement.rs | 6 +++ src/frontend/src/handler/fetch_cursor.rs | 54 ++++++++++++++++++++++-- src/frontend/src/handler/util.rs | 2 +- 5 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 src/frontend/src/binder/fetch_cursor.rs diff --git a/src/frontend/src/binder/fetch_cursor.rs b/src/frontend/src/binder/fetch_cursor.rs new file mode 100644 index 0000000000000..73b6f1b3509c1 --- /dev/null +++ b/src/frontend/src/binder/fetch_cursor.rs @@ -0,0 +1,44 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::Schema; +use risingwave_pb::ddl_service::alter_name_request::Object; +use risingwave_sqlparser::ast::ObjectName; +use crate::error::Result; + +use crate::Binder; + +#[derive(Debug, Clone)] +pub struct BoundFetchCursor { + pub cursor_name: String, + + pub count: u32, + + pub returning_schema: Option, +} + +impl Binder { + pub fn bind_fetch_cursor( + &mut self, + cursor_name: String, + count: u32, + returning_schema: Option, + ) -> Result { + Ok(BoundFetchCursor{ + cursor_name, + count, + returning_schema, + }) + } +} \ No newline at end of file diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 8b526a78d53f4..3a97567ecffe5 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -41,6 +41,7 @@ mod statement; mod struct_field; mod update; mod values; +pub mod fetch_cursor; pub use bind_context::{BindContext, Clause, LateralBindContext}; pub use delete::BoundDelete; diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index 764ebfca8783f..4be560874ac93 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -17,6 +17,7 @@ use risingwave_common::catalog::Field; use risingwave_sqlparser::ast::Statement; use super::delete::BoundDelete; +use super::fetch_cursor::BoundFetchCursor; use super::update::BoundUpdate; use crate::binder::{Binder, BoundInsert, BoundQuery}; use crate::error::Result; @@ -28,6 +29,7 @@ pub enum BoundStatement { Delete(Box), Update(Box), Query(Box), + FetchCursor(Box) } impl BoundStatement { @@ -46,6 +48,9 @@ impl BoundStatement { .as_ref() .map_or(vec![], |s| s.fields().into()), BoundStatement::Query(q) => q.schema().fields().into(), + BoundStatement::FetchCursor(f) => f.returning_schema + .as_ref() + .map_or(vec![], |s| s.fields().into()), } } } @@ -99,6 +104,7 @@ impl RewriteExprsRecursive for BoundStatement { BoundStatement::Delete(inner) => inner.rewrite_exprs_recursive(rewriter), BoundStatement::Update(inner) => inner.rewrite_exprs_recursive(rewriter), BoundStatement::Query(inner) => inner.rewrite_exprs_recursive(rewriter), + BoundStatement::FetchCursor(_) => {}, } } } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index 05305a9657b1a..4573bc7450873 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -15,18 +15,25 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; -use risingwave_sqlparser::ast::FetchCursorStatement; +use risingwave_common::bail_not_implemented; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::DataType; +use risingwave_sqlparser::ast::{FetchCursorStatement, Statement}; +use super::extended_handle::{PrepareStatement, PreparedResult}; +use super::query::BoundResult; use super::RwPgResponse; +use crate::binder::fetch_cursor::BoundFetchCursor; +use crate::binder::BoundStatement; use crate::error::Result; use crate::handler::HandlerArgs; use crate::{Binder, PgResponseStream}; pub async fn handle_fetch_cursor( - handle_args: HandlerArgs, + handler_args: HandlerArgs, stmt: FetchCursorStatement, ) -> Result { - let session = handle_args.session.clone(); + let session = handler_args.session.clone(); let db_name = session.database(); let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; @@ -34,7 +41,7 @@ pub async fn handle_fetch_cursor( let cursor_manager = session.get_cursor_manager(); let (rows, pg_descs) = cursor_manager - .get_rows_with_cursor(cursor_name, stmt.count, handle_args) + .get_rows_with_cursor(cursor_name, stmt.count, handler_args) .await?; Ok(build_fetch_cursor_response(rows, pg_descs)) } @@ -45,3 +52,42 @@ fn build_fetch_cursor_response(rows: Vec, pg_descs: Vec) .values(PgResponseStream::from(rows), pg_descs) .into() } + +pub async fn handle_parse( + handler_args: HandlerArgs, + statement: Statement, + specific_param_types: Vec>, +) -> Result { + if let Statement::FetchCursor { stmt } = &statement{ + let session = handler_args.session.clone(); + let db_name = session.database(); + let (_, cursor_name) = + Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; + let mut binder = Binder::new_with_param_types(&session, specific_param_types); + let desc = session.get_cursor_manager().get_desc_with_cursor(cursor_name.clone(), handler_args).await?; + let schema = if desc.is_empty(){ + None + }else{ + let fields = desc.into_iter().map(|p|from_pg_field(p)).collect::>>()?; + Some(Schema::new(fields)) + }; + + let bound = binder.bind_fetch_cursor(cursor_name,stmt.count,schema)?; + + let bound_result = BoundResult{ + stmt_type: StatementType::FETCH_CURSOR, + must_dist: false, + bound: BoundStatement::FetchCursor(Box::new(bound)), + param_types: binder.export_param_types()?, + parsed_params: None, + dependent_relations: binder.included_relations(), + }; + let result = PreparedResult{ + statement, + bound_result, + }; + Ok(PrepareStatement::Prepared(result)) + }else{ + bail_not_implemented!("unsupported statement {:?}", statement) + } +} diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 73b52b977c7a4..7f56605d0f17b 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -36,7 +36,7 @@ use risingwave_sqlparser::ast::{ TableFactor, TableWithJoins, }; -use crate::error::{ErrorCode, Result as RwResult}; +use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::session::{current, SessionImpl}; pin_project! { From 65bc9c570dead0de20c8571c13851b883488b0e5 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 24 Jul 2024 17:28:17 +0800 Subject: [PATCH 2/9] support --- src/frontend/src/binder/fetch_cursor.rs | 2 -- src/frontend/src/handler/extended_handle.rs | 6 ++-- src/frontend/src/handler/fetch_cursor.rs | 31 +++++++++++++++++---- src/frontend/src/handler/privilege.rs | 1 + src/frontend/src/handler/util.rs | 7 ++++- src/frontend/src/planner/statement.rs | 1 + src/frontend/src/session.rs | 6 ++-- src/utils/pgwire/src/pg_protocol.rs | 10 +++---- src/utils/pgwire/src/pg_server.rs | 4 +-- 9 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/frontend/src/binder/fetch_cursor.rs b/src/frontend/src/binder/fetch_cursor.rs index 73b6f1b3509c1..3670a8ac5b4c5 100644 --- a/src/frontend/src/binder/fetch_cursor.rs +++ b/src/frontend/src/binder/fetch_cursor.rs @@ -13,8 +13,6 @@ // limitations under the License. use risingwave_common::catalog::Schema; -use risingwave_pb::ddl_service::alter_name_request::Object; -use risingwave_sqlparser::ast::ObjectName; use crate::error::Result; use crate::Binder; diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index b497b1164d144..928f453b1f485 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -23,7 +23,7 @@ use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{CreateSink, Query, Statement}; use super::query::BoundResult; -use super::{handle, query, HandlerArgs, RwPgResponse}; +use super::{fetch_cursor, handle, query, HandlerArgs, RwPgResponse}; use crate::error::Result; use crate::session::SessionImpl; @@ -82,7 +82,7 @@ impl std::fmt::Display for PortalResult { } } -pub fn handle_parse( +pub async fn handle_parse( session: Arc, statement: Statement, specific_param_types: Vec>, @@ -97,6 +97,7 @@ pub fn handle_parse( | Statement::Update { .. } => { query::handle_parse(handler_args, statement, specific_param_types) } + Statement::FetchCursor { .. } => fetch_cursor::handle_parse(handler_args, statement, specific_param_types).await, Statement::CreateView { query, .. } => { if have_parameter_in_query(query) { bail_not_implemented!("CREATE VIEW with parameters"); @@ -184,6 +185,7 @@ pub async fn handle_execute(session: Arc, portal: Portal) -> Result | Statement::Insert { .. } | Statement::Delete { .. } | Statement::Update { .. } => query::handle_execute(handler_args, portal).await, + Statement::FetchCursor { .. } => fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await, _ => unreachable!(), } } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index 4573bc7450873..1ca6b0cf68dbc 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -16,19 +16,40 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{FetchCursorStatement, Statement}; -use super::extended_handle::{PrepareStatement, PreparedResult}; +use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult}; use super::query::BoundResult; +use super::util::from_pg_field; use super::RwPgResponse; -use crate::binder::fetch_cursor::BoundFetchCursor; use crate::binder::BoundStatement; use crate::error::Result; use crate::handler::HandlerArgs; use crate::{Binder, PgResponseStream}; +pub async fn handle_fetch_cursor_execute( + handler_args: HandlerArgs, + portal_result: PortalResult, +) -> Result { + if let PortalResult{ + statement: Statement::FetchCursor { stmt }, + bound_result: BoundResult{ + bound: BoundStatement::FetchCursor(fetch_cursor),..}, + .. + } = portal_result { + match fetch_cursor.returning_schema { + Some(_) => { + handle_fetch_cursor(handler_args, stmt).await + }, + None => { + Ok(build_fetch_cursor_response(vec![], vec![]))}, + } + } else { + bail_not_implemented!("unsupported portal {}", portal_result) + } +} pub async fn handle_fetch_cursor( handler_args: HandlerArgs, stmt: FetchCursorStatement, @@ -63,8 +84,9 @@ pub async fn handle_parse( let db_name = session.database(); let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; - let mut binder = Binder::new_with_param_types(&session, specific_param_types); let desc = session.get_cursor_manager().get_desc_with_cursor(cursor_name.clone(), handler_args).await?; + + let mut binder = Binder::new_with_param_types(&session, specific_param_types); let schema = if desc.is_empty(){ None }else{ @@ -73,7 +95,6 @@ pub async fn handle_parse( }; let bound = binder.bind_fetch_cursor(cursor_name,stmt.count,schema)?; - let bound_result = BoundResult{ stmt_type: StatementType::FETCH_CURSOR, must_dist: false, diff --git a/src/frontend/src/handler/privilege.rs b/src/frontend/src/handler/privilege.rs index d26d1d6d4785c..ef6a11c0827da 100644 --- a/src/frontend/src/handler/privilege.rs +++ b/src/frontend/src/handler/privilege.rs @@ -115,6 +115,7 @@ pub(crate) fn resolve_privileges(stmt: &BoundStatement) -> Vec objects.push(object); } BoundStatement::Query(ref query) => objects.extend(resolve_query_privileges(query)), + BoundStatement::FetchCursor(_) => unimplemented!(), }; objects } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 7f56605d0f17b..b8ff3309f9ed7 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -35,8 +35,9 @@ use risingwave_sqlparser::ast::{ CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; +use thiserror_ext::AsReport; -use crate::error::{ErrorCode, Result as RwResult, RwError}; +use crate::error::{ErrorCode, Result as RwResult}; use crate::session::{current, SessionImpl}; pin_project! { @@ -189,6 +190,10 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor { ) } +pub fn from_pg_field(f: PgFieldDescriptor) -> RwResult { + Ok(Field::with_name(DataType::from_oid(f.get_type_oid()).map_err(|e| ErrorCode::BindError(e.to_report_string()))?, f.get_name())) +} + #[easy_ext::ext(SourceSchemaCompatExt)] impl CompatibleSourceSchema { /// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated. diff --git a/src/frontend/src/planner/statement.rs b/src/frontend/src/planner/statement.rs index 0eed65e2df7e6..eee4c34fdd715 100644 --- a/src/frontend/src/planner/statement.rs +++ b/src/frontend/src/planner/statement.rs @@ -24,6 +24,7 @@ impl Planner { BoundStatement::Delete(d) => self.plan_delete(*d), BoundStatement::Update(u) => self.plan_update(*u), BoundStatement::Query(q) => self.plan_query(*q), + BoundStatement::FetchCursor(_) => unimplemented!(), } } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7dffb5d34bea0..f1fb3bd7425c1 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1312,13 +1312,13 @@ impl Session for SessionImpl { self.id } - fn parse( + async fn parse( self: Arc, statement: Option, params_types: Vec>, ) -> std::result::Result { Ok(if let Some(statement) = statement { - handle_parse(self, statement, params_types)? + handle_parse(self, statement, params_types).await? } else { PrepareStatement::Empty }) @@ -1451,7 +1451,7 @@ fn infer(bound: Option, stmt: Statement) -> Result Ok(bound + | Statement::Update { .. } | Statement::FetchCursor { .. } => Ok(bound .unwrap() .output_fields() .iter() diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index d700e39757df1..ad933787439d5 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -414,7 +414,7 @@ where FeMessage::CancelQuery(m) => self.process_cancel_msg(m)?, FeMessage::Terminate => self.process_terminate(), FeMessage::Parse(m) => { - if let Err(err) = self.process_parse_msg(m) { + if let Err(err) = self.process_parse_msg(m).await { self.ignore_util_sync = true; return Err(err); } @@ -681,16 +681,16 @@ where self.is_terminate = true; } - fn process_parse_msg(&mut self, msg: FeParseMessage) -> PsqlResult<()> { + async fn process_parse_msg(&mut self, msg: FeParseMessage) -> PsqlResult<()> { let sql = cstr_to_str(&msg.sql_bytes).unwrap(); record_sql_in_span(sql, self.redact_sql_option_keywords.clone()); let session = self.session.clone().unwrap(); let statement_name = cstr_to_str(&msg.statement_name).unwrap().to_string(); - self.inner_process_parse_msg(session, sql, statement_name, msg.type_ids) + self.inner_process_parse_msg(session, sql, statement_name, msg.type_ids).await } - fn inner_process_parse_msg( + async fn inner_process_parse_msg( &mut self, session: Arc, sql: &str, @@ -737,6 +737,7 @@ where let prepare_statement = session .parse(stmt, param_types) + .await .map_err(PsqlError::ExtendedPrepareError)?; if statement_name.is_empty() { @@ -850,7 +851,6 @@ where .unwrap() .describe_statement(prepare_statement) .map_err(PsqlError::Uncategorized)?; - self.stream .write_no_flush(&BeMessage::ParameterDescription( ¶m_types.iter().map(|t| t.to_oid()).collect_vec(), diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index 840f21dda1be2..4b0b8657ac59f 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -85,7 +85,7 @@ pub trait Session: Send + Sync { self: Arc, sql: Option, params_types: Vec>, - ) -> Result; + ) -> impl Future> + Send; // TODO: maybe this function should be async and return the notice more timely /// try to take the current notices from the session @@ -424,7 +424,7 @@ mod tests { .into()) } - fn parse( + async fn parse( self: Arc, _sql: Option, _params_types: Vec>, From fe5a1d281e787bb33774d3c9b940b843b90042c3 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 24 Jul 2024 20:38:08 +0800 Subject: [PATCH 3/9] save --- src/frontend/src/handler/declare_cursor.rs | 7 +- src/frontend/src/handler/fetch_cursor.rs | 10 +- src/frontend/src/handler/util.rs | 4 - src/frontend/src/session/cursor_manager.rs | 111 ++++++++++++--------- 4 files changed, 70 insertions(+), 62 deletions(-) diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 25e146fa714ce..c70aee2934f35 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -14,6 +14,7 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::catalog::Field; use risingwave_common::util::epoch::Epoch; use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; @@ -124,12 +125,14 @@ async fn handle_declare_query_cursor( pub async fn create_stream_for_cursor_stmt( handle_args: HandlerArgs, stmt: Statement, -) -> Result<(PgResponseStream, Vec)> { +) -> Result<(PgResponseStream, Vec)> { let session = handle_args.session.clone(); let plan_fragmenter_result = { let context = OptimizerContext::from_handler_args(handle_args); let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; - create_stream(session, plan_fragmenter_result, vec![]).await + let fields = plan_fragmenter_result.schema.fields.clone(); + let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; + Ok((row_stream,fields)) } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index 1ca6b0cf68dbc..fea9967c89980 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -22,7 +22,6 @@ use risingwave_sqlparser::ast::{FetchCursorStatement, Statement}; use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult}; use super::query::BoundResult; -use super::util::from_pg_field; use super::RwPgResponse; use crate::binder::BoundStatement; use crate::error::Result; @@ -84,15 +83,10 @@ pub async fn handle_parse( let db_name = session.database(); let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; - let desc = session.get_cursor_manager().get_desc_with_cursor(cursor_name.clone(), handler_args).await?; + let fields = session.get_cursor_manager().get_fields_with_cursor(cursor_name.clone()).await?; let mut binder = Binder::new_with_param_types(&session, specific_param_types); - let schema = if desc.is_empty(){ - None - }else{ - let fields = desc.into_iter().map(|p|from_pg_field(p)).collect::>>()?; - Some(Schema::new(fields)) - }; + let schema = Some(Schema::new(fields)); let bound = binder.bind_fetch_cursor(cursor_name,stmt.count,schema)?; let bound_result = BoundResult{ diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index b8ff3309f9ed7..b706d705803e8 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -190,10 +190,6 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor { ) } -pub fn from_pg_field(f: PgFieldDescriptor) -> RwResult { - Ok(Field::with_name(DataType::from_oid(f.get_type_oid()).map_err(|e| ErrorCode::BindError(e.to_report_string()))?, f.get_name())) -} - #[easy_ext::ext(SourceSchemaCompatExt)] impl CompatibleSourceSchema { /// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated. diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index bcd1aa11ec749..8145469c2b13b 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -34,12 +34,13 @@ use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; use crate::handler::declare_cursor::create_stream_for_cursor_stmt; use crate::handler::query::{create_stream, gen_batch_plan_fragmenter, BatchQueryPlanResult}; -use crate::handler::util::{convert_logstore_u64_to_unix_millis, gen_query_from_table_name}; +use crate::handler::util::{convert_logstore_u64_to_unix_millis, gen_query_from_table_name, to_pg_field}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog}; +use risingwave_common::catalog::Field; pub enum Cursor { Subscription(SubscriptionCursor), @@ -56,16 +57,24 @@ impl Cursor { Cursor::Query(cursor) => cursor.next(count).await, } } + pub async fn get_fields( + &mut self, + ) -> Vec { + match self { + Cursor::Subscription(cursor) => cursor.fields.clone(), + Cursor::Query(cursor) => cursor.pg_descs.clone(), + } + } } pub struct QueryCursor { row_stream: PgResponseStream, - pg_descs: Vec, + pg_descs: Vec, remaining_rows: VecDeque, } impl QueryCursor { - pub fn new(row_stream: PgResponseStream, pg_descs: Vec) -> Result { + pub fn new(row_stream: PgResponseStream, pg_descs: Vec) -> Result { Ok(Self { row_stream, pg_descs, @@ -91,13 +100,14 @@ impl QueryCursor { // min with 100 to avoid allocating too many memory at once. let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; + let desc = self.pg_descs.iter().map(|f| to_pg_field(f)).collect(); while cur < count && let Some(row) = self.next_once().await? { cur += 1; ans.push(row); } - Ok((ans, self.pg_descs.clone())) + Ok((ans, desc)) } } @@ -122,9 +132,9 @@ enum State { // It is returned from the batch execution. row_stream: PgResponseStream, - // The pg descs to from the batch query read. - // It is returned from the batch execution. - pg_descs: Vec, + // // The pg descs to from the batch query read. + // // It is returned from the batch execution. + // pg_descs: Vec, // A cache to store the remaining rows from the row stream. remaining_rows: VecDeque, @@ -140,6 +150,7 @@ pub struct SubscriptionCursor { dependent_table_id: TableId, cursor_need_drop_time: Instant, state: State, + fields: Vec, } impl SubscriptionCursor { @@ -150,17 +161,21 @@ impl SubscriptionCursor { dependent_table_id: TableId, handle_args: &HandlerArgs, ) -> Result { - let state = if let Some(start_timestamp) = start_timestamp { - State::InitLogStoreQuery { + let (state,fields) = if let Some(start_timestamp) = start_timestamp { + let table_catalog = handle_args + .session.get_table_by_id(&dependent_table_id)?; + let fields = table_catalog.columns.iter().map(|c| Field::with_name(c.data_type().clone(), c.name())).collect(); + let fields = Self::build_desc(fields, true); + (State::InitLogStoreQuery { seek_timestamp: start_timestamp, expected_timestamp: None, - } + },fields) } else { // The query stream needs to initiated on cursor creation to make sure // future fetch on the cursor starts from the snapshot when the cursor is declared. // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? - let (row_stream, pg_descs) = + let (row_stream, fields) = Self::initiate_query(None, &dependent_table_id, handle_args.clone()).await?; let pinned_epoch = handle_args .session @@ -177,14 +192,13 @@ impl SubscriptionCursor { .0; let start_timestamp = pinned_epoch; - State::Fetch { + (State::Fetch { from_snapshot: true, rw_timestamp: start_timestamp, row_stream, - pg_descs, remaining_rows: VecDeque::new(), expected_timestamp: None, - } + },fields) }; let cursor_need_drop_time = @@ -195,14 +209,14 @@ impl SubscriptionCursor { dependent_table_id, cursor_need_drop_time, state, + fields, }) } async fn next_row( &mut self, handle_args: &HandlerArgs, - expected_pg_descs: &Vec, - ) -> Result<(Option, Vec)> { + ) -> Result> { loop { match &mut self.state { State::InitLogStoreQuery { @@ -238,17 +252,15 @@ impl SubscriptionCursor { from_snapshot, rw_timestamp, row_stream, - pg_descs: pg_descs.clone(), remaining_rows, expected_timestamp, }; - if (!expected_pg_descs.is_empty()) && expected_pg_descs.ne(&pg_descs) { - // If the user alters the table upstream of the sub, there will be different descs here. - // So we should output data for different descs in two separate batches - return Ok((None, vec![])); + if self.fields.ne(&pg_descs){ + self.fields = pg_descs; + return Ok(None); } } - Ok((None, _)) => return Ok((None, vec![])), + Ok((None, _)) => return Ok(None), Err(e) => { self.state = State::Invalid; return Err(e); @@ -259,7 +271,6 @@ impl SubscriptionCursor { from_snapshot, rw_timestamp, row_stream, - pg_descs, remaining_rows, expected_timestamp, } => { @@ -273,15 +284,9 @@ impl SubscriptionCursor { // 1. Fetch the next row let new_row = row.take(); if from_snapshot { - return Ok(( - Some(Row::new(Self::build_row(new_row, None)?)), - pg_descs.clone(), - )); + return Ok(Some(Row::new(Self::build_row(new_row, None)?))); } else { - return Ok(( - Some(Row::new(Self::build_row(new_row, Some(rw_timestamp))?)), - pg_descs.clone(), - )); + return Ok(Some(Row::new(Self::build_row(new_row, Some(rw_timestamp))?))); } } else { // 2. Reach EOF for the current query. @@ -324,12 +329,11 @@ impl SubscriptionCursor { let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; - let mut pg_descs_ans = vec![]; + let desc = self.fields.iter().map(|f| to_pg_field(f)).collect(); while cur < count { - let (row, descs_ans) = self.next_row(&handle_args, &pg_descs_ans).await?; + let row = self.next_row(&handle_args).await?; match row { Some(row) => { - pg_descs_ans = descs_ans; cur += 1; ans.push(row); } @@ -339,7 +343,7 @@ impl SubscriptionCursor { } } - Ok((ans, pg_descs_ans)) + Ok((ans, desc)) } async fn get_next_rw_timestamp( @@ -379,7 +383,7 @@ impl SubscriptionCursor { rw_timestamp: Option, dependent_table_id: &TableId, handle_args: HandlerArgs, - ) -> Result<(PgResponseStream, Vec)> { + ) -> Result<(PgResponseStream, Vec)> { let session = handle_args.clone().session; let table_catalog = session.get_table_by_id(dependent_table_id)?; let (row_stream, pg_descs) = if let Some(rw_timestamp) = rw_timestamp { @@ -394,7 +398,9 @@ impl SubscriptionCursor { rw_timestamp, )?, )?; - create_stream(session, plan_fragmenter_result, vec![]).await? + let fields = plan_fragmenter_result.schema.fields.clone(); + let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; + (row_stream,fields) } else { let subscription_from_table_name = ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); @@ -437,20 +443,18 @@ impl SubscriptionCursor { } pub fn build_desc( - mut descs: Vec, + mut descs: Vec, from_snapshot: bool, - ) -> Vec { + ) -> Vec { if from_snapshot { - descs.push(PgFieldDescriptor::new( - "op".to_owned(), - DataType::Int16.to_oid(), - DataType::Int16.type_len(), + descs.push(Field::with_name( + DataType::Int16, + "op" )); } - descs.push(PgFieldDescriptor::new( - "rw_timestamp".to_owned(), - DataType::Int64.to_oid(), - DataType::Int64.type_len(), + descs.push(Field::with_name( + DataType::Int64, + "rw_timestamp" )); descs } @@ -554,7 +558,7 @@ impl CursorManager { &self, cursor_name: ObjectName, row_stream: PgResponseStream, - pg_descs: Vec, + pg_descs: Vec, ) -> Result<()> { let cursor = QueryCursor::new(row_stream, pg_descs)?; self.cursor_map @@ -602,4 +606,15 @@ impl CursorManager { Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) } } + + pub async fn get_fields_with_cursor( + &self, + cursor_name: String, + ) -> Result> { + if let Some(cursor) = self.cursor_map.lock().await.get_mut(&cursor_name) { + Ok(cursor.get_fields().await) + } else { + Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) + } + } } From cf3a2a0bfcc8438dcc6a3797bda05baa1ec05cec Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 26 Jul 2024 11:31:45 +0800 Subject: [PATCH 4/9] support --- src/frontend/src/binder/fetch_cursor.rs | 6 +- src/frontend/src/binder/mod.rs | 2 +- src/frontend/src/binder/statement.rs | 11 +- src/frontend/src/handler/declare_cursor.rs | 5 +- src/frontend/src/handler/extended_handle.rs | 8 +- src/frontend/src/handler/fetch_cursor.rs | 50 ++--- src/frontend/src/handler/mod.rs | 16 +- src/frontend/src/handler/util.rs | 13 +- src/frontend/src/session.rs | 3 +- src/frontend/src/session/cursor_manager.rs | 191 +++++++++++++------- src/utils/pgwire/src/pg_protocol.rs | 5 +- 11 files changed, 198 insertions(+), 112 deletions(-) diff --git a/src/frontend/src/binder/fetch_cursor.rs b/src/frontend/src/binder/fetch_cursor.rs index 3670a8ac5b4c5..50b48f631fcac 100644 --- a/src/frontend/src/binder/fetch_cursor.rs +++ b/src/frontend/src/binder/fetch_cursor.rs @@ -13,8 +13,8 @@ // limitations under the License. use risingwave_common::catalog::Schema; -use crate::error::Result; +use crate::error::Result; use crate::Binder; #[derive(Debug, Clone)] @@ -33,10 +33,10 @@ impl Binder { count: u32, returning_schema: Option, ) -> Result { - Ok(BoundFetchCursor{ + Ok(BoundFetchCursor { cursor_name, count, returning_schema, }) } -} \ No newline at end of file +} diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 3a97567ecffe5..8b2fa39e0d4ce 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -31,6 +31,7 @@ mod bind_param; mod create; mod delete; mod expr; +pub mod fetch_cursor; mod for_system; mod insert; mod query; @@ -41,7 +42,6 @@ mod statement; mod struct_field; mod update; mod values; -pub mod fetch_cursor; pub use bind_context::{BindContext, Clause, LateralBindContext}; pub use delete::BoundDelete; diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index 4be560874ac93..af390e082e018 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -29,7 +29,7 @@ pub enum BoundStatement { Delete(Box), Update(Box), Query(Box), - FetchCursor(Box) + FetchCursor(Box), } impl BoundStatement { @@ -48,9 +48,10 @@ impl BoundStatement { .as_ref() .map_or(vec![], |s| s.fields().into()), BoundStatement::Query(q) => q.schema().fields().into(), - BoundStatement::FetchCursor(f) => f.returning_schema - .as_ref() - .map_or(vec![], |s| s.fields().into()), + BoundStatement::FetchCursor(f) => f + .returning_schema + .as_ref() + .map_or(vec![], |s| s.fields().into()), } } } @@ -104,7 +105,7 @@ impl RewriteExprsRecursive for BoundStatement { BoundStatement::Delete(inner) => inner.rewrite_exprs_recursive(rewriter), BoundStatement::Update(inner) => inner.rewrite_exprs_recursive(rewriter), BoundStatement::Query(inner) => inner.rewrite_exprs_recursive(rewriter), - BoundStatement::FetchCursor(_) => {}, + BoundStatement::FetchCursor(_) => {} } } } diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index c70aee2934f35..6f072c8b64993 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::Field; use risingwave_common::util::epoch::Epoch; @@ -133,6 +132,6 @@ pub async fn create_stream_for_cursor_stmt( gen_batch_plan_fragmenter(&session, plan_result)? }; let fields = plan_fragmenter_result.schema.fields.clone(); - let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; - Ok((row_stream,fields)) + let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; + Ok((row_stream, fields)) } diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index 928f453b1f485..ea51b45035fb4 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -97,7 +97,9 @@ pub async fn handle_parse( | Statement::Update { .. } => { query::handle_parse(handler_args, statement, specific_param_types) } - Statement::FetchCursor { .. } => fetch_cursor::handle_parse(handler_args, statement, specific_param_types).await, + Statement::FetchCursor { .. } => { + fetch_cursor::handle_parse(handler_args, statement, specific_param_types).await + } Statement::CreateView { query, .. } => { if have_parameter_in_query(query) { bail_not_implemented!("CREATE VIEW with parameters"); @@ -185,7 +187,9 @@ pub async fn handle_execute(session: Arc, portal: Portal) -> Result | Statement::Insert { .. } | Statement::Delete { .. } | Statement::Update { .. } => query::handle_execute(handler_args, portal).await, - Statement::FetchCursor { .. } => fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await, + Statement::FetchCursor { .. } => { + fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await + } _ => unreachable!(), } } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index fea9967c89980..d339e3e7a1acb 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -14,7 +14,7 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use pgwire::types::Row; +use pgwire::types::{Format, Row}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; @@ -32,18 +32,20 @@ pub async fn handle_fetch_cursor_execute( handler_args: HandlerArgs, portal_result: PortalResult, ) -> Result { - if let PortalResult{ + if let PortalResult { statement: Statement::FetchCursor { stmt }, - bound_result: BoundResult{ - bound: BoundStatement::FetchCursor(fetch_cursor),..}, + bound_result: + BoundResult { + bound: BoundStatement::FetchCursor(fetch_cursor), + .. + }, + result_formats, .. - } = portal_result { + } = portal_result + { match fetch_cursor.returning_schema { - Some(_) => { - handle_fetch_cursor(handler_args, stmt).await - }, - None => { - Ok(build_fetch_cursor_response(vec![], vec![]))}, + Some(_) => handle_fetch_cursor(handler_args, stmt, &result_formats).await, + None => Ok(build_fetch_cursor_response(vec![], vec![])), } } else { bail_not_implemented!("unsupported portal {}", portal_result) @@ -52,6 +54,7 @@ pub async fn handle_fetch_cursor_execute( pub async fn handle_fetch_cursor( handler_args: HandlerArgs, stmt: FetchCursorStatement, + formats: &Vec, ) -> Result { let session = handler_args.session.clone(); let db_name = session.database(); @@ -61,7 +64,7 @@ pub async fn handle_fetch_cursor( let cursor_manager = session.get_cursor_manager(); let (rows, pg_descs) = cursor_manager - .get_rows_with_cursor(cursor_name, stmt.count, handler_args) + .get_rows_with_cursor(cursor_name, stmt.count, handler_args, formats) .await?; Ok(build_fetch_cursor_response(rows, pg_descs)) } @@ -78,18 +81,21 @@ pub async fn handle_parse( statement: Statement, specific_param_types: Vec>, ) -> Result { - if let Statement::FetchCursor { stmt } = &statement{ + if let Statement::FetchCursor { stmt } = &statement { let session = handler_args.session.clone(); - let db_name = session.database(); - let (_, cursor_name) = - Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; - let fields = session.get_cursor_manager().get_fields_with_cursor(cursor_name.clone()).await?; - - let mut binder = Binder::new_with_param_types(&session, specific_param_types); + let db_name = session.database(); + let (_, cursor_name) = + Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; + let fields = session + .get_cursor_manager() + .get_fields_with_cursor(cursor_name.clone()) + .await?; + + let mut binder = Binder::new_with_param_types(&session, specific_param_types); let schema = Some(Schema::new(fields)); - let bound = binder.bind_fetch_cursor(cursor_name,stmt.count,schema)?; - let bound_result = BoundResult{ + let bound = binder.bind_fetch_cursor(cursor_name, stmt.count, schema)?; + let bound_result = BoundResult { stmt_type: StatementType::FETCH_CURSOR, must_dist: false, bound: BoundStatement::FetchCursor(Box::new(bound)), @@ -97,12 +103,12 @@ pub async fn handle_parse( parsed_params: None, dependent_relations: binder.included_relations(), }; - let result = PreparedResult{ + let result = PreparedResult { statement, bound_result, }; Ok(PrepareStatement::Prepared(result)) - }else{ + } else { bail_not_implemented!("unsupported statement {:?}", statement) } } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index f8beeedb19438..c0286d5a58cad 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -147,13 +147,25 @@ pub enum PgResponseStream { DistributedQuery(DataChunkToRowSetAdapter), Rows(BoxStream<'static, RowSetResult>), } +impl PgResponseStream { + pub fn set_formats(&mut self, formats: Vec) { + match self { + PgResponseStream::LocalQuery(inner) => inner.set_formats(formats), + PgResponseStream::DistributedQuery(inner) => inner.set_formats(formats), + PgResponseStream::Rows(_) => {} + } + } +} impl Stream for PgResponseStream { type Item = std::result::Result, BoxedError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { - PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx), + PgResponseStream::LocalQuery(inner) => { + println!("poll_next LocalQuery,{:?}", inner.formats); + inner.poll_next_unpin(cx) + } PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx), PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx), } @@ -401,7 +413,7 @@ pub async fn handle( declare_cursor::handle_declare_cursor(handler_args, stmt).await } Statement::FetchCursor { stmt } => { - fetch_cursor::handle_fetch_cursor(handler_args, stmt).await + fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await } Statement::CloseCursor { stmt } => { close_cursor::handle_close_cursor(handler_args, stmt).await diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index b706d705803e8..fd6631382685c 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -35,7 +35,6 @@ use risingwave_sqlparser::ast::{ CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; -use thiserror_ext::AsReport; use crate::error::{ErrorCode, Result as RwResult}; use crate::session::{current, SessionImpl}; @@ -54,14 +53,14 @@ pin_project! { #[pin] chunk_stream: VS, column_types: Vec, - formats: Vec, + pub formats: Vec, session_data: StaticSessionData, } } // Static session data frozen at the time of the creation of the stream -struct StaticSessionData { - timezone: String, +pub struct StaticSessionData { + pub timezone: String, } impl DataChunkToRowSetAdapter @@ -84,6 +83,10 @@ where session_data, } } + + pub fn set_formats(&mut self, formats: Vec) { + self.formats = formats; + } } impl Stream for DataChunkToRowSetAdapter @@ -111,7 +114,7 @@ where } /// Format scalars according to postgres convention. -fn pg_value_format( +pub fn pg_value_format( data_type: &DataType, d: ScalarRefImpl<'_>, format: Format, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index f1fb3bd7425c1..3c3511f47006f 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1451,7 +1451,8 @@ fn infer(bound: Option, stmt: Statement) -> Result Ok(bound + | Statement::Update { .. } + | Statement::FetchCursor { .. } => Ok(bound .unwrap() .output_fields() .iter() diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 8145469c2b13b..c87a4a0ac2891 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -23,7 +23,8 @@ use fixedbitset::FixedBitSet; use futures::StreamExt; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType; -use pgwire::types::Row; +use pgwire::types::{Format, Row}; +use risingwave_common::catalog::Field; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; @@ -34,13 +35,15 @@ use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; use crate::handler::declare_cursor::create_stream_for_cursor_stmt; use crate::handler::query::{create_stream, gen_batch_plan_fragmenter, BatchQueryPlanResult}; -use crate::handler::util::{convert_logstore_u64_to_unix_millis, gen_query_from_table_name, to_pg_field}; +use crate::handler::util::{ + convert_logstore_u64_to_unix_millis, gen_query_from_table_name, pg_value_format, to_pg_field, + StaticSessionData, +}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog}; -use risingwave_common::catalog::Field; pub enum Cursor { Subscription(SubscriptionCursor), @@ -51,33 +54,33 @@ impl Cursor { &mut self, count: u32, handle_args: HandlerArgs, + formats: &Vec, ) -> Result<(Vec, Vec)> { match self { - Cursor::Subscription(cursor) => cursor.next(count, handle_args).await, - Cursor::Query(cursor) => cursor.next(count).await, + Cursor::Subscription(cursor) => cursor.next(count, handle_args, formats).await, + Cursor::Query(cursor) => cursor.next(count, formats).await, } } - pub async fn get_fields( - &mut self, - ) -> Vec { + + pub fn get_fields(&mut self) -> Vec { match self { Cursor::Subscription(cursor) => cursor.fields.clone(), - Cursor::Query(cursor) => cursor.pg_descs.clone(), + Cursor::Query(cursor) => cursor.fields.clone(), } } } pub struct QueryCursor { row_stream: PgResponseStream, - pg_descs: Vec, + fields: Vec, remaining_rows: VecDeque, } impl QueryCursor { - pub fn new(row_stream: PgResponseStream, pg_descs: Vec) -> Result { + pub fn new(row_stream: PgResponseStream, fields: Vec) -> Result { Ok(Self { row_stream, - pg_descs, + fields, remaining_rows: VecDeque::::new(), }) } @@ -95,12 +98,17 @@ impl QueryCursor { Ok(Some(row)) } - pub async fn next(&mut self, count: u32) -> Result<(Vec, Vec)> { + pub async fn next( + &mut self, + count: u32, + formats: &Vec, + ) -> Result<(Vec, Vec)> { // `FETCH NEXT` is equivalent to `FETCH 1`. // min with 100 to avoid allocating too many memory at once. let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; - let desc = self.pg_descs.iter().map(|f| to_pg_field(f)).collect(); + let desc = self.fields.iter().map(to_pg_field).collect(); + self.row_stream.set_formats(formats.clone()); while cur < count && let Some(row) = self.next_once().await? { @@ -132,10 +140,6 @@ enum State { // It is returned from the batch execution. row_stream: PgResponseStream, - // // The pg descs to from the batch query read. - // // It is returned from the batch execution. - // pg_descs: Vec, - // A cache to store the remaining rows from the row stream. remaining_rows: VecDeque, @@ -161,15 +165,21 @@ impl SubscriptionCursor { dependent_table_id: TableId, handle_args: &HandlerArgs, ) -> Result { - let (state,fields) = if let Some(start_timestamp) = start_timestamp { - let table_catalog = handle_args - .session.get_table_by_id(&dependent_table_id)?; - let fields = table_catalog.columns.iter().map(|c| Field::with_name(c.data_type().clone(), c.name())).collect(); + let (state, fields) = if let Some(start_timestamp) = start_timestamp { + let table_catalog = handle_args.session.get_table_by_id(&dependent_table_id)?; + let fields = table_catalog + .columns + .iter() + .map(|c| Field::with_name(c.data_type().clone(), c.name())) + .collect(); let fields = Self::build_desc(fields, true); - (State::InitLogStoreQuery { - seek_timestamp: start_timestamp, - expected_timestamp: None, - },fields) + ( + State::InitLogStoreQuery { + seek_timestamp: start_timestamp, + expected_timestamp: None, + }, + fields, + ) } else { // The query stream needs to initiated on cursor creation to make sure // future fetch on the cursor starts from the snapshot when the cursor is declared. @@ -192,13 +202,16 @@ impl SubscriptionCursor { .0; let start_timestamp = pinned_epoch; - (State::Fetch { - from_snapshot: true, - rw_timestamp: start_timestamp, - row_stream, - remaining_rows: VecDeque::new(), - expected_timestamp: None, - },fields) + ( + State::Fetch { + from_snapshot: true, + rw_timestamp: start_timestamp, + row_stream, + remaining_rows: VecDeque::new(), + expected_timestamp: None, + }, + fields, + ) }; let cursor_need_drop_time = @@ -216,6 +229,7 @@ impl SubscriptionCursor { async fn next_row( &mut self, handle_args: &HandlerArgs, + formats: &Vec, ) -> Result> { loop { match &mut self.state { @@ -236,12 +250,14 @@ impl SubscriptionCursor { .await { Ok((Some(rw_timestamp), expected_timestamp)) => { - let (mut row_stream, pg_descs) = Self::initiate_query( + let (mut row_stream, fields) = Self::initiate_query( Some(rw_timestamp), &self.dependent_table_id, handle_args.clone(), ) .await?; + Self::set_formats(&mut row_stream, formats, &from_snapshot); + self.cursor_need_drop_time = Instant::now() + Duration::from_secs(self.subscription.retention_seconds); let mut remaining_rows = VecDeque::new(); @@ -255,8 +271,8 @@ impl SubscriptionCursor { remaining_rows, expected_timestamp, }; - if self.fields.ne(&pg_descs){ - self.fields = pg_descs; + if self.fields.ne(&fields) { + self.fields = fields; return Ok(None); } } @@ -274,6 +290,9 @@ impl SubscriptionCursor { remaining_rows, expected_timestamp, } => { + let session_data = StaticSessionData { + timezone: handle_args.session.config().timezone(), + }; let from_snapshot = *from_snapshot; let rw_timestamp = *rw_timestamp; @@ -284,9 +303,19 @@ impl SubscriptionCursor { // 1. Fetch the next row let new_row = row.take(); if from_snapshot { - return Ok(Some(Row::new(Self::build_row(new_row, None)?))); + return Ok(Some(Row::new(Self::build_row( + new_row, + None, + formats, + &session_data, + )?))); } else { - return Ok(Some(Row::new(Self::build_row(new_row, Some(rw_timestamp))?))); + return Ok(Some(Row::new(Self::build_row( + new_row, + Some(rw_timestamp), + formats, + &session_data, + )?))); } } else { // 2. Reach EOF for the current query. @@ -319,6 +348,7 @@ impl SubscriptionCursor { &mut self, count: u32, handle_args: HandlerArgs, + formats: &Vec, ) -> Result<(Vec, Vec)> { if Instant::now() > self.cursor_need_drop_time { return Err(ErrorCode::InternalError( @@ -329,9 +359,17 @@ impl SubscriptionCursor { let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; - let desc = self.fields.iter().map(|f| to_pg_field(f)).collect(); + let desc = self.fields.iter().map(to_pg_field).collect(); + if let State::Fetch { + from_snapshot, + row_stream, + .. + } = &mut self.state + { + Self::set_formats(row_stream, formats, from_snapshot); + } while cur < count { - let row = self.next_row(&handle_args).await?; + let row = self.next_row(&handle_args, formats).await?; match row { Some(row) => { cur += 1; @@ -386,7 +424,7 @@ impl SubscriptionCursor { ) -> Result<(PgResponseStream, Vec)> { let session = handle_args.clone().session; let table_catalog = session.get_table_by_id(dependent_table_id)?; - let (row_stream, pg_descs) = if let Some(rw_timestamp) = rw_timestamp { + let (row_stream, fields) = if let Some(rw_timestamp) = rw_timestamp { let context = OptimizerContext::from_handler_args(handle_args); let plan_fragmenter_result = gen_batch_plan_fragmenter( &session, @@ -400,7 +438,7 @@ impl SubscriptionCursor { )?; let fields = plan_fragmenter_result.schema.fields.clone(); let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; - (row_stream,fields) + (row_stream, fields) } else { let subscription_from_table_name = ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); @@ -411,7 +449,7 @@ impl SubscriptionCursor { }; Ok(( row_stream, - Self::build_desc(pg_descs, rw_timestamp.is_none()), + Self::build_desc(fields, rw_timestamp.is_none()), )) } @@ -430,32 +468,40 @@ impl SubscriptionCursor { pub fn build_row( mut row: Vec>, rw_timestamp: Option, + formats: &Vec, + session_data: &StaticSessionData, ) -> Result>> { + println!("param_types: {:?}", formats); + let row_len = row.len(); let new_row = if let Some(rw_timestamp) = rw_timestamp { - vec![Some(Bytes::from( - convert_logstore_u64_to_unix_millis(rw_timestamp).to_string(), - ))] + let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text); + let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp); + let rw_timestamp = pg_value_format( + &DataType::Int64, + risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64), + *rw_timestamp_formats, + session_data, + )?; + vec![Some(rw_timestamp)] } else { - vec![Some(Bytes::from(1i16.to_string())), None] + let op_formats = formats.get(row_len).unwrap_or(&Format::Text); + let op = pg_value_format( + &DataType::Int16, + risingwave_common::types::ScalarRefImpl::Int16(1_i16), + *op_formats, + session_data, + )?; + vec![Some(op), None] }; row.extend(new_row); Ok(row) } - pub fn build_desc( - mut descs: Vec, - from_snapshot: bool, - ) -> Vec { + pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { if from_snapshot { - descs.push(Field::with_name( - DataType::Int16, - "op" - )); + descs.push(Field::with_name(DataType::Int16, "op")); } - descs.push(Field::with_name( - DataType::Int64, - "rw_timestamp" - )); + descs.push(Field::with_name(DataType::Int64, "rw_timestamp")); descs } @@ -512,6 +558,19 @@ impl SubscriptionCursor { dependent_relations: table_catalog.dependent_relations.clone(), }) } + + pub fn set_formats( + row_stream: &mut PgResponseStream, + formats: &Vec, + from_snapshot: &bool, + ) { + let mut formats = formats.clone(); + formats.pop(); + if *from_snapshot { + formats.pop(); + } + row_stream.set_formats(formats); + } } #[derive(Default)] @@ -558,9 +617,9 @@ impl CursorManager { &self, cursor_name: ObjectName, row_stream: PgResponseStream, - pg_descs: Vec, + fields: Vec, ) -> Result<()> { - let cursor = QueryCursor::new(row_stream, pg_descs)?; + let cursor = QueryCursor::new(row_stream, fields)?; self.cursor_map .lock() .await @@ -599,20 +658,18 @@ impl CursorManager { cursor_name: String, count: u32, handle_args: HandlerArgs, + formats: &Vec, ) -> Result<(Vec, Vec)> { if let Some(cursor) = self.cursor_map.lock().await.get_mut(&cursor_name) { - cursor.next(count, handle_args).await + cursor.next(count, handle_args, formats).await } else { Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) } } - pub async fn get_fields_with_cursor( - &self, - cursor_name: String, - ) -> Result> { + pub async fn get_fields_with_cursor(&self, cursor_name: String) -> Result> { if let Some(cursor) = self.cursor_map.lock().await.get_mut(&cursor_name) { - Ok(cursor.get_fields().await) + Ok(cursor.get_fields()) } else { Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) } diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index ad933787439d5..f605bb3c26017 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -687,7 +687,8 @@ where let session = self.session.clone().unwrap(); let statement_name = cstr_to_str(&msg.statement_name).unwrap().to_string(); - self.inner_process_parse_msg(session, sql, statement_name, msg.type_ids).await + self.inner_process_parse_msg(session, sql, statement_name, msg.type_ids) + .await } async fn inner_process_parse_msg( @@ -851,6 +852,7 @@ where .unwrap() .describe_statement(prepare_statement) .map_err(PsqlError::Uncategorized)?; + println!("param_types: {:?}", row_descriptions); self.stream .write_no_flush(&BeMessage::ParameterDescription( ¶m_types.iter().map(|t| t.to_oid()).collect_vec(), @@ -871,6 +873,7 @@ where .describe_portal(portal) .map_err(PsqlError::Uncategorized)?; + println!("param_types: {:?}", row_descriptions); if row_descriptions.is_empty() { // According https://www.postgresql.org/docs/current/protocol-flow.html#:~:text=The%20response%20is%20a%20RowDescri[…]0a%20query%20that%20will%20return%20rows%3B, // return NoData message if the statement is not a query. From b2083cbe2f669b9248b4c0297498b7a6f449997e Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 26 Jul 2024 13:37:05 +0800 Subject: [PATCH 5/9] fmt --- src/frontend/src/handler/mod.rs | 5 +---- src/frontend/src/session/cursor_manager.rs | 9 +++++---- src/utils/pgwire/src/pg_protocol.rs | 2 -- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index c0286d5a58cad..4af196dcc2fa5 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -162,10 +162,7 @@ impl Stream for PgResponseStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { - PgResponseStream::LocalQuery(inner) => { - println!("poll_next LocalQuery,{:?}", inner.formats); - inner.poll_next_unpin(cx) - } + PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx), PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx), PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx), } diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index c87a4a0ac2891..406c4f92aee11 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -154,6 +154,8 @@ pub struct SubscriptionCursor { dependent_table_id: TableId, cursor_need_drop_time: Instant, state: State, + // fields will be set in the table's catalog when the cursor is created, + // and will be reset each time it is created row_stream, this is to avoid changes in the catalog due to alter. fields: Vec, } @@ -447,10 +449,7 @@ impl SubscriptionCursor { ))); create_stream_for_cursor_stmt(handle_args, query_stmt).await? }; - Ok(( - row_stream, - Self::build_desc(fields, rw_timestamp.is_none()), - )) + Ok((row_stream, Self::build_desc(fields, rw_timestamp.is_none()))) } async fn try_refill_remaining_rows( @@ -559,6 +558,8 @@ impl SubscriptionCursor { }) } + // In the beginning (declare cur), we will give it an empty formats, + // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client. pub fn set_formats( row_stream: &mut PgResponseStream, formats: &Vec, diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index f605bb3c26017..72b99f6d50d64 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -852,7 +852,6 @@ where .unwrap() .describe_statement(prepare_statement) .map_err(PsqlError::Uncategorized)?; - println!("param_types: {:?}", row_descriptions); self.stream .write_no_flush(&BeMessage::ParameterDescription( ¶m_types.iter().map(|t| t.to_oid()).collect_vec(), @@ -873,7 +872,6 @@ where .describe_portal(portal) .map_err(PsqlError::Uncategorized)?; - println!("param_types: {:?}", row_descriptions); if row_descriptions.is_empty() { // According https://www.postgresql.org/docs/current/protocol-flow.html#:~:text=The%20response%20is%20a%20RowDescri[…]0a%20query%20that%20will%20return%20rows%3B, // return NoData message if the statement is not a query. From 0061093bc69806acae686754cf407043818dc24f Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 29 Jul 2024 13:17:40 +0800 Subject: [PATCH 6/9] fix ci --- e2e_test/subscription/main.py | 4 ++++ src/frontend/src/session/cursor_manager.rs | 1 + 2 files changed, 5 insertions(+) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index caa1d3a141c09..5ed2b30eaedf7 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -249,6 +249,8 @@ def test_cursor_with_table_alter(): row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],1) row = execute_query("fetch next from cur",conn) + assert(row == []) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4,4],row[0],1) execute_insert("insert into t1 values(5,5,5)",conn) execute_insert("flush",conn) @@ -258,6 +260,8 @@ def test_cursor_with_table_alter(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) + assert(row == []) + row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],1) drop_table_subscription() diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 406c4f92aee11..baa8f5482e379 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -172,6 +172,7 @@ impl SubscriptionCursor { let fields = table_catalog .columns .iter() + .filter(|c| !c.is_hidden) .map(|c| Field::with_name(c.data_type().clone(), c.name())) .collect(); let fields = Self::build_desc(fields, true); From 681dac6ad2398de045462698caed1bd079c1ae69 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 1 Aug 2024 14:06:19 +0800 Subject: [PATCH 7/9] fix comm --- src/frontend/src/handler/declare_cursor.rs | 53 ++++++-- src/frontend/src/handler/mod.rs | 9 -- src/frontend/src/handler/query.rs | 5 +- src/frontend/src/session/cursor_manager.rs | 140 ++++++++++++++++----- 4 files changed, 152 insertions(+), 55 deletions(-) diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 6f072c8b64993..a4974530cfe50 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -12,18 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::Field; +use risingwave_common::session_config::QueryMode; use risingwave_common::util::epoch::Epoch; use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; -use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter}; +use super::query::{ + gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchPlanFragmenterResult, +}; use super::util::convert_unix_millis_to_logstore_u64; use super::RwPgResponse; use crate::error::{ErrorCode, Result}; -use crate::handler::query::create_stream; +use crate::handler::query::{distribute_execute, local_execute}; use crate::handler::HandlerArgs; -use crate::{Binder, OptimizerContext, PgResponseStream}; +use crate::session::cursor_manager::CursorDataChunkStream; +use crate::session::SessionImpl; +use crate::{Binder, OptimizerContext}; pub async fn handle_declare_cursor( handle_args: HandlerArgs, @@ -111,12 +118,12 @@ async fn handle_declare_query_cursor( cursor_name: ObjectName, query: Box, ) -> Result { - let (row_stream, pg_descs) = + let (chunk_stream, fields) = create_stream_for_cursor_stmt(handle_args.clone(), Statement::Query(query)).await?; handle_args .session .get_cursor_manager() - .add_query_cursor(cursor_name, row_stream, pg_descs) + .add_query_cursor(cursor_name, chunk_stream, fields) .await?; Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR)) } @@ -124,14 +131,42 @@ async fn handle_declare_query_cursor( pub async fn create_stream_for_cursor_stmt( handle_args: HandlerArgs, stmt: Statement, -) -> Result<(PgResponseStream, Vec)> { +) -> Result<(CursorDataChunkStream, Vec)> { let session = handle_args.session.clone(); let plan_fragmenter_result = { let context = OptimizerContext::from_handler_args(handle_args); let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; - let fields = plan_fragmenter_result.schema.fields.clone(); - let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; - Ok((row_stream, fields)) + create_chunk_stream_for_cursor(session, plan_fragmenter_result).await +} + +pub async fn create_chunk_stream_for_cursor( + session: Arc, + plan_fragmenter_result: BatchPlanFragmenterResult, +) -> Result<(CursorDataChunkStream, Vec)> { + let BatchPlanFragmenterResult { + plan_fragmenter, + query_mode, + schema, + .. + } = plan_fragmenter_result; + + let can_timeout_cancel = true; + + let query = plan_fragmenter.generate_complete_query().await?; + tracing::trace!("Generated query after plan fragmenter: {:?}", &query); + + Ok(( + match query_mode { + QueryMode::Auto => unreachable!(), + QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some( + local_execute(session.clone(), query, can_timeout_cancel).await?, + )), + QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some( + distribute_execute(session.clone(), query, can_timeout_cancel).await?, + )), + }, + schema.fields.clone(), + )) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 4af196dcc2fa5..dbc7da91b4800 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -147,15 +147,6 @@ pub enum PgResponseStream { DistributedQuery(DataChunkToRowSetAdapter), Rows(BoxStream<'static, RowSetResult>), } -impl PgResponseStream { - pub fn set_formats(&mut self, formats: Vec) { - match self { - PgResponseStream::LocalQuery(inner) => inner.set_formats(formats), - PgResponseStream::DistributedQuery(inner) => inner.set_formats(formats), - PgResponseStream::Rows(_) => {} - } - } -} impl Stream for PgResponseStream { type Item = std::result::Result, BoxedError>; diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index bdb32b590300b..de60743e47173 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -516,7 +516,7 @@ async fn execute( .into()) } -async fn distribute_execute( +pub async fn distribute_execute( session: Arc, query: Query, can_timeout_cancel: bool, @@ -538,8 +538,7 @@ async fn distribute_execute( .map_err(|err| err.into()) } -#[expect(clippy::unused_async)] -async fn local_execute( +pub async fn local_execute( session: Arc, query: Query, can_timeout_cancel: bool, diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index baa8f5482e379..f787b3e7c1df0 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::mem; use core::time::Duration; use std::collections::{HashMap, VecDeque}; use std::rc::Rc; @@ -25,6 +26,7 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType; use pgwire::types::{Format, Row}; use risingwave_common::catalog::Field; +use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; @@ -33,18 +35,70 @@ use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; -use crate::handler::declare_cursor::create_stream_for_cursor_stmt; -use crate::handler::query::{create_stream, gen_batch_plan_fragmenter, BatchQueryPlanResult}; +use crate::handler::declare_cursor::{ + create_chunk_stream_for_cursor, create_stream_for_cursor_stmt, +}; +use crate::handler::query::{gen_batch_plan_fragmenter, BatchQueryPlanResult}; use crate::handler::util::{ convert_logstore_u64_to_unix_millis, gen_query_from_table_name, pg_value_format, to_pg_field, - StaticSessionData, + DataChunkToRowSetAdapter, StaticSessionData, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; +use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog}; +pub enum CursorDataChunkStream { + LocalDataChunk(Option), + DistributedDataChunk(Option), + PgResponse(PgResponseStream), +} + +impl CursorDataChunkStream { + pub fn init_row_stream( + &mut self, + fields: &Vec, + formats: &Vec, + session: Arc, + ) { + let columns_type = fields.iter().map(|f| f.data_type().clone()).collect(); + match self { + CursorDataChunkStream::LocalDataChunk(data_chunk) => { + let data_chunk = mem::take(data_chunk).unwrap(); + let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( + data_chunk, + columns_type, + formats.clone(), + session, + )); + *self = CursorDataChunkStream::PgResponse(row_stream); + } + CursorDataChunkStream::DistributedDataChunk(data_chunk) => { + let data_chunk = mem::take(data_chunk).unwrap(); + let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( + data_chunk, + columns_type, + formats.clone(), + session, + )); + *self = CursorDataChunkStream::PgResponse(row_stream); + } + _ => {} + } + } + + pub async fn next(&mut self) -> Result, BoxedError>>> { + match self { + CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await), + _ => Err(ErrorCode::InternalError( + "Only 'CursorDataChunkStream' can call next and return rows".to_string(), + ) + .into()), + } + } +} pub enum Cursor { Subscription(SubscriptionCursor), Query(QueryCursor), @@ -58,7 +112,7 @@ impl Cursor { ) -> Result<(Vec, Vec)> { match self { Cursor::Subscription(cursor) => cursor.next(count, handle_args, formats).await, - Cursor::Query(cursor) => cursor.next(count, formats).await, + Cursor::Query(cursor) => cursor.next(count, formats, handle_args).await, } } @@ -71,15 +125,15 @@ impl Cursor { } pub struct QueryCursor { - row_stream: PgResponseStream, + chunk_stream: CursorDataChunkStream, fields: Vec, remaining_rows: VecDeque, } impl QueryCursor { - pub fn new(row_stream: PgResponseStream, fields: Vec) -> Result { + pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec) -> Result { Ok(Self { - row_stream, + chunk_stream, fields, remaining_rows: VecDeque::::new(), }) @@ -87,7 +141,7 @@ impl QueryCursor { pub async fn next_once(&mut self) -> Result> { while self.remaining_rows.is_empty() { - let rows = self.row_stream.next().await; + let rows = self.chunk_stream.next().await?; let rows = match rows { None => return Ok(None), Some(row) => row?, @@ -102,13 +156,16 @@ impl QueryCursor { &mut self, count: u32, formats: &Vec, + handle_args: HandlerArgs, ) -> Result<(Vec, Vec)> { // `FETCH NEXT` is equivalent to `FETCH 1`. // min with 100 to avoid allocating too many memory at once. + let session = handle_args.session; let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; let desc = self.fields.iter().map(to_pg_field).collect(); - self.row_stream.set_formats(formats.clone()); + self.chunk_stream + .init_row_stream(&self.fields, formats, session); while cur < count && let Some(row) = self.next_once().await? { @@ -138,7 +195,7 @@ enum State { // The row stream to from the batch query read. // It is returned from the batch execution. - row_stream: PgResponseStream, + chunk_stream: CursorDataChunkStream, // A cache to store the remaining rows from the row stream. remaining_rows: VecDeque, @@ -155,7 +212,7 @@ pub struct SubscriptionCursor { cursor_need_drop_time: Instant, state: State, // fields will be set in the table's catalog when the cursor is created, - // and will be reset each time it is created row_stream, this is to avoid changes in the catalog due to alter. + // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter. fields: Vec, } @@ -188,7 +245,7 @@ impl SubscriptionCursor { // future fetch on the cursor starts from the snapshot when the cursor is declared. // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? - let (row_stream, fields) = + let (chunk_stream, fields) = Self::initiate_query(None, &dependent_table_id, handle_args.clone()).await?; let pinned_epoch = handle_args .session @@ -209,7 +266,7 @@ impl SubscriptionCursor { State::Fetch { from_snapshot: true, rw_timestamp: start_timestamp, - row_stream, + chunk_stream, remaining_rows: VecDeque::new(), expected_timestamp: None, }, @@ -253,24 +310,30 @@ impl SubscriptionCursor { .await { Ok((Some(rw_timestamp), expected_timestamp)) => { - let (mut row_stream, fields) = Self::initiate_query( + let (mut chunk_stream, fields) = Self::initiate_query( Some(rw_timestamp), &self.dependent_table_id, handle_args.clone(), ) .await?; - Self::set_formats(&mut row_stream, formats, &from_snapshot); + Self::init_row_stream( + &mut chunk_stream, + formats, + &from_snapshot, + &self.fields, + handle_args.session.clone(), + ); self.cursor_need_drop_time = Instant::now() + Duration::from_secs(self.subscription.retention_seconds); let mut remaining_rows = VecDeque::new(); - Self::try_refill_remaining_rows(&mut row_stream, &mut remaining_rows) + Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows) .await?; // Transition to the Fetch state self.state = State::Fetch { from_snapshot, rw_timestamp, - row_stream, + chunk_stream, remaining_rows, expected_timestamp, }; @@ -289,7 +352,7 @@ impl SubscriptionCursor { State::Fetch { from_snapshot, rw_timestamp, - row_stream, + chunk_stream, remaining_rows, expected_timestamp, } => { @@ -300,7 +363,7 @@ impl SubscriptionCursor { let rw_timestamp = *rw_timestamp; // Try refill remaining rows - Self::try_refill_remaining_rows(row_stream, remaining_rows).await?; + Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?; if let Some(row) = remaining_rows.pop_front() { // 1. Fetch the next row @@ -365,11 +428,17 @@ impl SubscriptionCursor { let desc = self.fields.iter().map(to_pg_field).collect(); if let State::Fetch { from_snapshot, - row_stream, + chunk_stream, .. } = &mut self.state { - Self::set_formats(row_stream, formats, from_snapshot); + Self::init_row_stream( + chunk_stream, + formats, + from_snapshot, + &self.fields, + handle_args.session.clone(), + ); } while cur < count { let row = self.next_row(&handle_args, formats).await?; @@ -424,10 +493,10 @@ impl SubscriptionCursor { rw_timestamp: Option, dependent_table_id: &TableId, handle_args: HandlerArgs, - ) -> Result<(PgResponseStream, Vec)> { + ) -> Result<(CursorDataChunkStream, Vec)> { let session = handle_args.clone().session; let table_catalog = session.get_table_by_id(dependent_table_id)?; - let (row_stream, fields) = if let Some(rw_timestamp) = rw_timestamp { + let (chunk_stream, fields) = if let Some(rw_timestamp) = rw_timestamp { let context = OptimizerContext::from_handler_args(handle_args); let plan_fragmenter_result = gen_batch_plan_fragmenter( &session, @@ -439,9 +508,7 @@ impl SubscriptionCursor { rw_timestamp, )?, )?; - let fields = plan_fragmenter_result.schema.fields.clone(); - let (row_stream, _) = create_stream(session, plan_fragmenter_result, vec![]).await?; - (row_stream, fields) + create_chunk_stream_for_cursor(session, plan_fragmenter_result).await? } else { let subscription_from_table_name = ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); @@ -450,15 +517,18 @@ impl SubscriptionCursor { ))); create_stream_for_cursor_stmt(handle_args, query_stmt).await? }; - Ok((row_stream, Self::build_desc(fields, rw_timestamp.is_none()))) + Ok(( + chunk_stream, + Self::build_desc(fields, rw_timestamp.is_none()), + )) } async fn try_refill_remaining_rows( - row_stream: &mut PgResponseStream, + chunk_stream: &mut CursorDataChunkStream, remaining_rows: &mut VecDeque, ) -> Result<()> { if remaining_rows.is_empty() - && let Some(row_set) = row_stream.next().await + && let Some(row_set) = chunk_stream.next().await? { remaining_rows.extend(row_set?); } @@ -561,17 +631,19 @@ impl SubscriptionCursor { // In the beginning (declare cur), we will give it an empty formats, // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client. - pub fn set_formats( - row_stream: &mut PgResponseStream, + pub fn init_row_stream( + chunk_stream: &mut CursorDataChunkStream, formats: &Vec, from_snapshot: &bool, + fields: &Vec, + session: Arc, ) { let mut formats = formats.clone(); formats.pop(); if *from_snapshot { formats.pop(); } - row_stream.set_formats(formats); + chunk_stream.init_row_stream(fields, &formats, session); } } @@ -618,10 +690,10 @@ impl CursorManager { pub async fn add_query_cursor( &self, cursor_name: ObjectName, - row_stream: PgResponseStream, + chunk_stream: CursorDataChunkStream, fields: Vec, ) -> Result<()> { - let cursor = QueryCursor::new(row_stream, fields)?; + let cursor = QueryCursor::new(chunk_stream, fields)?; self.cursor_map .lock() .await From 5e4ad8fe549f8d5912fcaa9586d79a3ecd183024 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 1 Aug 2024 14:09:36 +0800 Subject: [PATCH 8/9] fix comm --- src/frontend/src/session/cursor_manager.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index f787b3e7c1df0..f017e9f225659 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -541,7 +541,6 @@ impl SubscriptionCursor { formats: &Vec, session_data: &StaticSessionData, ) -> Result>> { - println!("param_types: {:?}", formats); let row_len = row.len(); let new_row = if let Some(rw_timestamp) = rw_timestamp { let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text); From 72b7fbeee82436526b3c5b15061caf50d7f293b3 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 6 Aug 2024 12:17:21 +0800 Subject: [PATCH 9/9] fix ci --- e2e_test/subscription/main.py | 1 + src/frontend/src/handler/util.rs | 4 ---- src/frontend/src/session/cursor_manager.rs | 5 ++++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 5ed2b30eaedf7..fa89c9697d40c 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -328,6 +328,7 @@ def test_rebuild_table(): check_rows_data([1,1],row[0],1) check_rows_data([1,1],row[1],4) check_rows_data([1,100],row[2],3) + drop_table_subscription() if __name__ == "__main__": test_cursor_snapshot() diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index fd6631382685c..0531ce5a65284 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -83,10 +83,6 @@ where session_data, } } - - pub fn set_formats(&mut self, formats: Vec) { - self.formats = formats; - } } impl Stream for DataChunkToRowSetAdapter diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index f017e9f225659..390428f09bea3 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -638,11 +638,14 @@ impl SubscriptionCursor { session: Arc, ) { let mut formats = formats.clone(); + let mut fields = fields.clone(); formats.pop(); + fields.pop(); if *from_snapshot { formats.pop(); + fields.pop(); } - chunk_stream.init_row_stream(fields, &formats, session); + chunk_stream.init_row_stream(&fields, &formats, session); } }