diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 84b3363626138..9c205e418222f 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -96,6 +96,11 @@ sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt' sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt' sqllogictest -p 4566 -d test './e2e_test/database/test.slt' +echo "--- e2e, $mode, subscription" +python3 -m pip install --break-system-packages psycopg2-binary +sqllogictest -p 4566 -d dev './e2e_test/subscription/check_sql_statement.slt' +python3 ./e2e_test/subscription/main.py + echo "--- e2e, $mode, Apache Superset" sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile}" diff --git a/e2e_test/batch/transaction/cursor.slt b/e2e_test/batch/transaction/cursor.slt index ae0ac3706e3df..124224c401365 100644 --- a/e2e_test/batch/transaction/cursor.slt +++ b/e2e_test/batch/transaction/cursor.slt @@ -30,7 +30,7 @@ DECLARE test_cursor CURSOR FOR SELECT * FROM test where a > 2 ORDER BY a; -statement error cursor "test_cursor" already exists +statement error DECLARE test_cursor CURSOR FOR SELECT * FROM test where a > 2; @@ -40,7 +40,7 @@ DECLARE test_cursor CURSOR FOR SELECT * FROM test_non_existent where a > 2; -statement error cursor "test_cursor_non_existent" does not exist +statement error FETCH NEXT from test_cursor_non_existent; query II @@ -57,13 +57,13 @@ query II FETCH NEXT from test_cursor; ---- -statement error cursor "test_cursor_non_existent" does not exist +statement error CLOSE test_cursor_non_existent; statement ok CLOSE test_cursor; -statement error cursor "test_cursor" does not exist +statement error FETCH NEXT from test_cursor; statement ok diff --git a/e2e_test/subscription/check_sql_statement.slt b/e2e_test/subscription/check_sql_statement.slt new file mode 100644 index 0000000000000..7b7d9488495cf --- /dev/null +++ b/e2e_test/subscription/check_sql_statement.slt @@ -0,0 +1,50 @@ +statement ok +create table t1 (v1 int, v2 int, v3 int); + +statement ok +insert into t1 values (1,2), (2,3); + +statement ok +create subscription sub from t1 with(retention = '1D'); + +statement ok +declare cur subscription cursor for sub; + +statement ok +declare cur1 subscription cursor for sub since now(); + +statement ok +declare cur2 subscription cursor for sub since proctime(); + +statement ok +declare cur3 subscription cursor for sub since begin(); + +statement error +declare cur4 subscription cursor for sub since 1; + +statement error +declare cur5 subscription cursor for sub since asd; + +statement error +declare cur6 subscription cursor for sub since 18446744073709551615; + +statement error +declare cur subscription cursor for sub; + +statement ok +close cur; + +statement ok +close cur1; + +statement ok +close cur2; + +statement ok +close cur3; + +statement ok +drop subscription sub; + +statement ok +drop table t1; \ No newline at end of file diff --git a/e2e_test/subscription/create_table_and_subscription.slt b/e2e_test/subscription/create_table_and_subscription.slt new file mode 100644 index 0000000000000..94039f98b11cc --- /dev/null +++ b/e2e_test/subscription/create_table_and_subscription.slt @@ -0,0 +1,8 @@ +statement ok +create table t1 (v1 int, v2 int); + +statement ok +insert into t1 values (1,2); + +statement ok +create subscription sub from t1 with(retention = '1D'); \ No newline at end of file diff --git a/e2e_test/subscription/drop_table_and_subscription.slt b/e2e_test/subscription/drop_table_and_subscription.slt new file mode 100644 index 0000000000000..0df183a5b7793 --- /dev/null +++ b/e2e_test/subscription/drop_table_and_subscription.slt @@ -0,0 +1,5 @@ +statement ok +drop subscription sub; + +statement ok +drop table t1; \ No newline at end of file diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py new file mode 100644 index 0000000000000..f8e78813801f2 --- /dev/null +++ b/e2e_test/subscription/main.py @@ -0,0 +1,238 @@ +import subprocess +import psycopg2 +import time + + +def execute_slt(slt): + if slt is None or slt == "": + return + cmd = f"sqllogictest -p 4566 -d dev {slt}" + print(f"Command line is [{cmd}]") + subprocess.run(cmd, + shell=True, + check=True) + time.sleep(3) + +def create_table_subscription(): + execute_slt("./e2e_test/subscription/create_table_and_subscription.slt") + +def drop_table_subscription(): + execute_slt("./e2e_test/subscription/drop_table_and_subscription.slt") + +def execute_query(sql,conn): + cur = conn.cursor() + cur.execute(sql) + conn.commit() + rows = cur.fetchall() + cur.close() + return rows + +def execute_insert(sql,conn): + cur = conn.cursor() + cur.execute(sql) + conn.commit() + cur.close() + +def check_rows_data(expect_vec,rows,status): + row = rows[0] + for index, value in enumerate(row): + if index == 0: + continue + if index == 1: + assert value == status,f"expect {value} but got {status}" + continue + assert value == expect_vec[index-2],f"expect {expect_vec[index-2]} but got {value}" + +def test_cursor_snapshot(): + print(f"test_cursor_snapshot") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,2],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + drop_table_subscription() + + +def test_cursor_snapshot_log_store(): + print(f"test_cursor_snapshot_log_store") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,2],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4],row,1) + row = execute_query("fetch next from cur",conn) + check_rows_data([5,5],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + drop_table_subscription() + +def test_cursor_since_begin(): + print(f"test_cursor_since_begin") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + execute_insert("declare cur subscription cursor for sub since begin()",conn) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4],row,1) + row = execute_query("fetch next from cur",conn) + check_rows_data([5,5],row,1) + row = execute_query("fetch next from cur",conn) + check_rows_data([6,6],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + drop_table_subscription() + +def test_cursor_since_now(): + print(f"test_cursor_since_now") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + execute_insert("declare cur subscription cursor for sub since now()",conn) + time.sleep(2) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([6,6],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + drop_table_subscription() + +def test_cursor_since_rw_timestamp(): + print(f"test_cursor_since_rw_timestamp") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + execute_insert("declare cur subscription cursor for sub since begin()",conn) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + rw_timestamp_1 = row[0][0] + check_rows_data([4,4],row,1) + row = execute_query("fetch next from cur",conn) + rw_timestamp_2 = row[0][0] - 1 + check_rows_data([5,5],row,1) + row = execute_query("fetch next from cur",conn) + rw_timestamp_3 = row[0][0] + 1 + check_rows_data([6,6],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + + execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4],row,1) + execute_insert("close cur",conn) + + execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([5,5],row,1) + execute_insert("close cur",conn) + + execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn) + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + + drop_table_subscription() + +def test_cursor_op(): + print(f"test_cursor_op") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,2],row,1) + row = execute_query("fetch next from cur",conn) + assert row == [] + + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("update t1 set v2 = 10 where v1 = 4",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4],row,1) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,4],row,4) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,10],row,3) + row = execute_query("fetch next from cur",conn) + assert row == [] + + execute_insert("delete from t1 where v1 = 4",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([4,10],row,2) + row = execute_query("fetch next from cur",conn) + assert row == [] + + execute_insert("close cur",conn) + drop_table_subscription() + +if __name__ == "__main__": + test_cursor_snapshot() + test_cursor_op() + test_cursor_snapshot_log_store() + test_cursor_since_rw_timestamp() + test_cursor_since_now() + test_cursor_since_begin() diff --git a/proto/catalog.proto b/proto/catalog.proto index 4c7380079e661..a3fcd1ab2a638 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -195,6 +195,7 @@ message Subscription { optional string created_at_cluster_version = 16; string subscription_from_name = 17; + optional string subscription_internal_table_name = 18; } message Connection { diff --git a/src/common/src/catalog/internal_table.rs b/src/common/src/catalog/internal_table.rs index 1e991db6975f1..72e377b04d417 100644 --- a/src/common/src/catalog/internal_table.rs +++ b/src/common/src/catalog/internal_table.rs @@ -43,6 +43,13 @@ pub fn valid_table_name(table_name: &str) -> bool { !INTERNAL_TABLE_NAME.is_match(table_name) } +pub fn is_subscription_internal_table(subscription_name: &str, table_name: &str) -> bool { + let regex = + Regex::new(format!(r"__internal_{}_(\d+)_subscription_(\d+)", subscription_name).as_str()) + .unwrap(); + regex.is_match(table_name) +} + pub fn get_dist_key_in_pk_indices>( dist_key_indices: &[I], pk_indices: &[I], diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index faab5ddab2ee0..a3bb869b55279 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -199,7 +199,7 @@ pub fn visit_stream_node_tables_inner( // Subscription NodeBody::Subscription(node) => { - // A Subscription should have a state table. + // A Subscription should have a log store optional!(node.log_store_table, "Subscription") } diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 05518670b7d5d..ca44fb2b76e8c 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -29,8 +29,8 @@ use anyhow::{anyhow, bail, Result}; pub use resolve_id::*; use risingwave_frontend::handler::util::SourceSchemaCompatExt; use risingwave_frontend::handler::{ - close_cursor, create_index, create_mv, create_schema, create_source, create_table, create_view, - declare_cursor, drop_table, explain, fetch_cursor, variable, HandlerArgs, + create_index, create_mv, create_schema, create_source, create_table, create_view, drop_table, + explain, variable, HandlerArgs, }; use risingwave_frontend::session::SessionImpl; use risingwave_frontend::test_utils::{create_proto_file, get_explain_output, LocalFrontend}; @@ -572,16 +572,6 @@ impl TestCase { create_schema::handle_create_schema(handler_args, schema_name, if_not_exists) .await?; } - Statement::DeclareCursor { cursor_name, query } => { - declare_cursor::handle_declare_cursor(handler_args, cursor_name, *query) - .await?; - } - Statement::FetchCursor { cursor_name, count } => { - fetch_cursor::handle_fetch_cursor(handler_args, cursor_name, count).await?; - } - Statement::CloseCursor { cursor_name } => { - close_cursor::handle_close_cursor(handler_args, cursor_name).await?; - } _ => return Err(anyhow!("Unsupported statement type")), } } diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 3ded154cf055b..1409948e07bd9 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str::FromStr; use std::collections::{BTreeMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, TableId, UserId, OBJECT_ID_PLACEHOLDER}; +use risingwave_common::types::Interval; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::{PbStreamJobStatus, PbSubscription}; +use thiserror_ext::AsReport; use super::OwnedByUserCatalog; +use crate::error::{ErrorCode, Result}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(test, derive(Default))] @@ -67,6 +71,7 @@ pub struct SubscriptionCatalog { pub created_at_cluster_version: Option, pub initialized_at_cluster_version: Option, + pub subscription_internal_table_name: Option, } #[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)] @@ -98,10 +103,31 @@ impl SubscriptionCatalog { self } + pub fn get_retention_seconds(&self) -> Result { + let retention_seconds_str = self.properties.get("retention").ok_or_else(|| { + ErrorCode::InternalError("Subscription retention time not set.".to_string()) + })?; + let retention_seconds = (Interval::from_str(retention_seconds_str) + .map_err(|err| { + ErrorCode::InternalError(format!( + "Retention needs to be set in Interval format: {:?}", + err.to_report_string() + )) + })? + .epoch_in_micros() + / 1000000) as u64; + + Ok(retention_seconds) + } + pub fn create_sql(&self) -> String { self.definition.clone() } + pub fn get_log_store_name(&self) -> String { + self.subscription_internal_table_name.clone().unwrap() + } + pub fn to_proto(&self) -> PbSubscription { assert!(!self.dependent_relations.is_empty()); PbSubscription { @@ -130,6 +156,7 @@ impl SubscriptionCatalog { stream_job_status: PbStreamJobStatus::Creating.into(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), created_at_cluster_version: self.created_at_cluster_version.clone(), + subscription_internal_table_name: self.subscription_internal_table_name.clone(), } } } @@ -165,6 +192,7 @@ impl From<&PbSubscription> for SubscriptionCatalog { initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), created_at_cluster_version: prost.created_at_cluster_version.clone(), initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(), + subscription_internal_table_name: prost.subscription_internal_table_name.clone(), } } } diff --git a/src/frontend/src/handler/close_cursor.rs b/src/frontend/src/handler/close_cursor.rs index 14a5537aea0ec..1678b85f85358 100644 --- a/src/frontend/src/handler/close_cursor.rs +++ b/src/frontend/src/handler/close_cursor.rs @@ -13,20 +13,25 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_sqlparser::ast::ObjectName; +use risingwave_sqlparser::ast::CloseCursorStatement; use super::RwPgResponse; use crate::error::Result; use crate::handler::HandlerArgs; +use crate::Binder; pub async fn handle_close_cursor( - handler_args: HandlerArgs, - cursor_name: Option, + handle_args: HandlerArgs, + stmt: CloseCursorStatement, ) -> Result { - if let Some(name) = cursor_name { - handler_args.session.drop_cursor(name).await?; + let session = handle_args.session.clone(); + let cursor_manager = session.get_cursor_manager(); + let db_name = session.database(); + if let Some(cursor_name) = stmt.cursor_name { + let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, cursor_name.clone())?; + cursor_manager.remove_cursor(cursor_name).await?; } else { - handler_args.session.drop_all_cursors().await; + cursor_manager.remove_all_cursor().await; } Ok(PgResponse::empty_result(StatementType::CLOSE_CURSOR)) } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 1cba0e21fc245..bed409de178f1 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -38,13 +38,13 @@ use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; use risingwave_sqlparser::ast::{ - ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, - Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, + ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, Query, Statement, }; use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; use super::create_source::UPSTREAM_SOURCE_KEY; +use super::util::gen_query_from_table_name; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; @@ -66,32 +66,6 @@ use crate::stream_fragmenter::build_graph; use crate::utils::resolve_privatelink_in_with_option; use crate::{Explain, Planner, TableCatalog, WithOptions}; -pub fn gen_sink_subscription_query_from_name(from_name: ObjectName) -> Result { - let table_factor = TableFactor::Table { - name: from_name, - alias: None, - as_of: None, - }; - let from = vec![TableWithJoins { - relation: table_factor, - joins: vec![], - }]; - let select = Select { - from, - projection: vec![SelectItem::Wildcard(None)], - ..Default::default() - }; - let body = SetExpr::Select(Box::new(select)); - Ok(Query { - with: None, - body, - order_by: vec![], - limit: None, - offset: None, - fetch: None, - }) -} - // used to store result of `gen_sink_plan` pub struct SinkPlanContext { pub query: Box, @@ -119,7 +93,7 @@ pub fn gen_sink_plan( CreateSink::From(from_name) => { sink_from_table_name = from_name.0.last().unwrap().real_value(); direct_sink = true; - Box::new(gen_sink_subscription_query_from_name(from_name)?) + Box::new(gen_query_from_table_name(from_name)) } CreateSink::AsQuery(query) => { sink_from_table_name = sink_table_name.clone(); diff --git a/src/frontend/src/handler/create_subscription.rs b/src/frontend/src/handler/create_subscription.rs index f44abbaa42f02..371806cc93a48 100644 --- a/src/frontend/src/handler/create_subscription.rs +++ b/src/frontend/src/handler/create_subscription.rs @@ -20,8 +20,8 @@ use risingwave_common::catalog::UserId; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{CreateSubscriptionStatement, Query}; -use super::create_sink::gen_sink_subscription_query_from_name; use super::privilege::resolve_query_privileges; +use super::util::gen_query_from_table_name; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::error::Result; @@ -54,9 +54,7 @@ pub fn gen_subscription_plan( .unwrap() .real_value() .clone(); - let query = Box::new(gen_sink_subscription_query_from_name( - stmt.subscription_from, - )?); + let query = Box::new(gen_query_from_table_name(stmt.subscription_from)); let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name.clone())?; diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index dda676998edd0..27f029bf3e4a5 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -12,38 +12,123 @@ // See the License for the specific language governing permissions and // limitations under the License. +use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_sqlparser::ast::{ObjectName, Query, Statement}; +use risingwave_common::util::epoch::Epoch; +use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter}; +use super::util::{convert_epoch_to_logstore_i64, convert_unix_millis_to_logstore_i64}; use super::RwPgResponse; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; +use crate::handler::query::create_stream; use crate::handler::HandlerArgs; -use crate::session::cursor::Cursor; -use crate::OptimizerContext; +use crate::{Binder, OptimizerContext, PgResponseStream}; pub async fn handle_declare_cursor( - handler_args: HandlerArgs, + handle_args: HandlerArgs, + stmt: DeclareCursorStatement, +) -> Result { + match stmt.declare_cursor { + risingwave_sqlparser::ast::DeclareCursor::Query(query) => { + handle_declare_query_cursor(handle_args, stmt.cursor_name, query).await + } + risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => { + handle_declare_subscription_cursor( + handle_args, + sub_name, + stmt.cursor_name, + rw_timestamp, + ) + .await + } + } +} +async fn handle_declare_subscription_cursor( + handle_args: HandlerArgs, + sub_name: ObjectName, cursor_name: ObjectName, - query: Query, + rw_timestamp: Option, ) -> Result { - let session = handler_args.session.clone(); + let session = handle_args.session.clone(); + let db_name = session.database(); + let (schema_name, cursor_name) = + Binder::resolve_schema_qualified_name(db_name, cursor_name.clone())?; - let plan_fragmenter_result = { - let context = OptimizerContext::from_handler_args(handler_args); - let plan_result = gen_batch_plan_by_statement( - &session, - context.into(), - Statement::Query(Box::new(query.clone())), - )?; - gen_batch_plan_fragmenter(&session, plan_result)? + let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone(); + let subscription = + session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?; + // Start the first query of cursor, which includes querying the table and querying the subscription's logstore + let start_rw_timestamp = match rw_timestamp { + Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => { + check_cursor_unix_millis(start_rw_timestamp, subscription.get_retention_seconds()?)?; + Some(convert_unix_millis_to_logstore_i64(start_rw_timestamp)) + } + Some(risingwave_sqlparser::ast::Since::ProcessTime) => { + Some(convert_epoch_to_logstore_i64(Epoch::now().0)) + } + Some(risingwave_sqlparser::ast::Since::Begin) => { + let min_unix_millis = + Epoch::now().as_unix_millis() - subscription.get_retention_seconds()? * 1000; + Some(convert_unix_millis_to_logstore_i64(min_unix_millis)) + } + None => None, }; - + // Create cursor based on the response session - .add_cursor( - cursor_name, - Cursor::new(plan_fragmenter_result, session.clone()).await?, + .get_cursor_manager() + .add_subscription_cursor( + cursor_name.clone(), + start_rw_timestamp, + subscription, + &handle_args, + ) + .await?; + + Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR)) +} + +fn check_cursor_unix_millis(unix_millis: u64, retention_seconds: u64) -> Result<()> { + let now = Epoch::now().as_unix_millis(); + let min_unix_millis = now - retention_seconds * 1000; + if unix_millis > now { + return Err(ErrorCode::CatalogError( + "rw_timestamp is too large, need to be less than the current unix_millis" + .to_string() + .into(), ) + .into()); + } + if unix_millis < min_unix_millis { + return Err(ErrorCode::CatalogError("rw_timestamp is too small, need to be large than the current unix_millis - subscription's retention time".to_string().into()).into()); + } + Ok(()) +} + +async fn handle_declare_query_cursor( + handle_args: HandlerArgs, + cursor_name: ObjectName, + query: Box, +) -> Result { + let (row_stream, pg_descs) = + create_stream_for_cursor(handle_args.clone(), Statement::Query(query)).await?; + handle_args + .session + .get_cursor_manager() + .add_query_cursor(cursor_name, row_stream, pg_descs) .await?; Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR)) } + +pub async fn create_stream_for_cursor( + handle_args: HandlerArgs, + stmt: Statement, +) -> Result<(PgResponseStream, Vec)> { + let session = handle_args.session.clone(); + let plan_fragmenter_result = { + let context = OptimizerContext::from_handler_args(handle_args); + let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; + gen_batch_plan_fragmenter(&session, plan_result)? + }; + create_stream(session, plan_fragmenter_result, vec![]).await +} diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index ac1e929d5187f..05305a9657b1a 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -12,21 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_sqlparser::ast::ObjectName; +use pgwire::types::Row; +use risingwave_sqlparser::ast::FetchCursorStatement; use super::RwPgResponse; use crate::error::Result; use crate::handler::HandlerArgs; +use crate::{Binder, PgResponseStream}; pub async fn handle_fetch_cursor( - handler_args: HandlerArgs, - cursor_name: ObjectName, - count: Option, + handle_args: HandlerArgs, + stmt: FetchCursorStatement, ) -> Result { - let session = handler_args.session; - let (rows, pg_descs) = session.cursor_next(&cursor_name, count).await?; - Ok(PgResponse::builder(StatementType::FETCH_CURSOR) - .values(rows.into(), pg_descs) - .into()) + let session = handle_args.session.clone(); + let db_name = session.database(); + let (_, cursor_name) = + Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; + + let cursor_manager = session.get_cursor_manager(); + + let (rows, pg_descs) = cursor_manager + .get_rows_with_cursor(cursor_name, stmt.count, handle_args) + .await?; + Ok(build_fetch_cursor_response(rows, pg_descs)) +} + +fn build_fetch_cursor_response(rows: Vec, pg_descs: Vec) -> RwPgResponse { + PgResponse::builder(StatementType::FETCH_CURSOR) + .row_cnt_opt(Some(rows.len() as i32)) + .values(PgResponseStream::from(rows), pg_descs) + .into() } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 8b60eeeeef2bc..52ad26cc83732 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -362,6 +362,15 @@ pub async fn handle( if_not_exists, } => create_schema::handle_create_schema(handler_args, schema_name, if_not_exists).await, Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await, + Statement::DeclareCursor { stmt } => { + declare_cursor::handle_declare_cursor(handler_args, stmt).await + } + Statement::FetchCursor { stmt } => { + fetch_cursor::handle_fetch_cursor(handler_args, stmt).await + } + Statement::CloseCursor { stmt } => { + close_cursor::handle_close_cursor(handler_args, stmt).await + } Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await, Statement::Grant { .. } => { handle_privilege::handle_grant_privilege(handler_args, stmt).await @@ -929,15 +938,6 @@ pub async fn handle( object_name, comment, } => comment::handle_comment(handler_args, object_type, object_name, comment).await, - Statement::DeclareCursor { cursor_name, query } => { - declare_cursor::handle_declare_cursor(handler_args, cursor_name, *query).await - } - Statement::FetchCursor { cursor_name, count } => { - fetch_cursor::handle_fetch_cursor(handler_args, cursor_name, count).await - } - Statement::CloseCursor { cursor_name } => { - close_cursor::handle_close_cursor(handler_args, cursor_name).await - } _ => bail_not_implemented!("Unhandled statement: {}", stmt), } } diff --git a/src/frontend/src/handler/transaction.rs b/src/frontend/src/handler/transaction.rs index aba0020bdf46d..a9eb08b483077 100644 --- a/src/frontend/src/handler/transaction.rs +++ b/src/frontend/src/handler/transaction.rs @@ -87,7 +87,7 @@ pub async fn handle_commit( } session.txn_commit_explicit(); - session.drop_all_cursors().await; + session.get_cursor_manager().remove_all_query_cursor().await; Ok(RwPgResponse::empty_result(stmt_type)) } @@ -104,7 +104,7 @@ pub async fn handle_rollback( } session.txn_rollback_explicit(); - session.drop_all_cursors().await; + session.get_cursor_manager().remove_all_query_cursor().await; Ok(RwPgResponse::empty_result(stmt_type)) } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index d3ccb55e6a6ab..011b078958946 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -29,10 +29,15 @@ use risingwave_common::array::DataChunk; use risingwave_common::catalog::Field; use risingwave_common::row::Row as _; use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz}; +use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_sqlparser::ast::{CompatibleSourceSchema, ConnectorSchema}; +use risingwave_sqlparser::ast::{ + BinaryOperator, CompatibleSourceSchema, ConnectorSchema, Expr, ObjectName, OrderByExpr, Query, + Select, SelectItem, SetExpr, TableFactor, TableWithJoins, Value, +}; use crate::error::{ErrorCode, Result as RwResult}; +use crate::session::cursor_manager::{KV_LOG_STORE_EPOCH, KV_LOG_STORE_SEQ_ID, KV_LOG_STORE_VNODE}; use crate::session::{current, SessionImpl}; pin_project! { @@ -191,6 +196,87 @@ impl CompatibleSourceSchema { } } +pub fn gen_query_from_table_name(from_name: ObjectName) -> Query { + let table_factor = TableFactor::Table { + name: from_name, + alias: None, + as_of: None, + }; + let from = vec![TableWithJoins { + relation: table_factor, + joins: vec![], + }]; + let select = Select { + from, + projection: vec![SelectItem::Wildcard(None)], + ..Default::default() + }; + let body = SetExpr::Select(Box::new(select)); + Query { + with: None, + body, + order_by: vec![], + limit: None, + offset: None, + fetch: None, + } +} + +pub fn gen_query_from_logstore_ge_rw_timestamp(logstore_name: &str, rw_timestamp: i64) -> Query { + let table_factor = TableFactor::Table { + name: ObjectName(vec![logstore_name.into()]), + alias: None, + as_of: None, + }; + let from = vec![TableWithJoins { + relation: table_factor, + joins: vec![], + }]; + let selection = Some(Expr::BinaryOp { + left: Box::new(Expr::Identifier(KV_LOG_STORE_EPOCH.into())), + op: BinaryOperator::GtEq, + right: Box::new(Expr::Value(Value::Number(rw_timestamp.to_string()))), + }); + let except_columns = vec![ + Expr::Identifier(KV_LOG_STORE_SEQ_ID.into()), + Expr::Identifier(KV_LOG_STORE_VNODE.into()), + ]; + let select = Select { + from, + projection: vec![SelectItem::Wildcard(Some(except_columns))], + selection, + ..Default::default() + }; + let order_by = vec![OrderByExpr { + expr: Expr::Identifier(KV_LOG_STORE_EPOCH.into()), + asc: None, + nulls_first: None, + }]; + let body = SetExpr::Select(Box::new(select)); + Query { + with: None, + body, + order_by, + limit: None, + offset: None, + fetch: None, + } +} + +pub fn convert_unix_millis_to_logstore_i64(unix_millis: u64) -> i64 { + let epoch = Epoch::from_unix_millis(unix_millis); + convert_epoch_to_logstore_i64(epoch.0) +} + +pub fn convert_epoch_to_logstore_i64(epoch: u64) -> i64 { + epoch as i64 ^ (1i64 << 63) +} + +pub fn convert_logstore_i64_to_unix_millis(logstore_i64: i64) -> u64 { + let epoch = Epoch::from(logstore_i64 as u64 ^ (1u64 << 63)); + epoch.as_unix_millis() +} + #[cfg(test)] mod tests { use bytes::BytesMut; diff --git a/src/frontend/src/optimizer/plan_node/stream_subscription.rs b/src/frontend/src/optimizer/plan_node/stream_subscription.rs index e9d8936961586..8b165d5bbacbc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_subscription.rs +++ b/src/frontend/src/optimizer/plan_node/stream_subscription.rs @@ -137,6 +137,7 @@ impl StreamSubscription { created_at_epoch: None, created_at_cluster_version: None, initialized_at_cluster_version: None, + subscription_internal_table_name: None, }; Ok((input, subscription_desc)) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 2059a40acab31..dc957735e676e 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -355,17 +355,7 @@ pub fn infer_kv_log_store_table_catalog_inner( let read_prefix_len_hint = table_catalog_builder.get_current_pk_len(); - let payload_indices = table_catalog_builder.extend_columns( - &columns - .iter() - .map(|column| { - // make payload hidden column visible in kv log store batch query - let mut column = column.clone(); - column.is_hidden = false; - column - }) - .collect_vec(), - ); + let payload_indices = table_catalog_builder.extend_columns(columns); value_indices.extend(payload_indices); table_catalog_builder.set_value_indices(value_indices); diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 132108636f013..36285b59b5e36 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -60,6 +60,15 @@ impl ReadSnapshot { } } + /// Get the [`Option`] value for this snapshot, only `FrontendPinned`. + pub fn epoch_with_frontend_pinned(&self) -> Option { + match self.batch_query_epoch().epoch.unwrap() { + batch_query_epoch::Epoch::Committed(epoch) + | batch_query_epoch::Epoch::Current(epoch) => Some(epoch.into()), + batch_query_epoch::Epoch::Backup(_) => None, + } + } + /// Get the [`Epoch`] value for this snapshot. pub fn epoch(&self) -> Epoch { match self.batch_query_epoch().epoch.unwrap() { diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9ecff36106e30..7ff790748a761 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -78,10 +78,12 @@ use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::info; +use self::cursor_manager::CursorManager; use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError}; use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl}; use crate::catalog::connection_catalog::ConnectionCatalog; use crate::catalog::root_catalog::Catalog; +use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::{ check_schema_writable, CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, }; @@ -112,7 +114,7 @@ use crate::user::UserId; use crate::{FrontendOpts, PgResponseStream}; pub(crate) mod current; -pub(crate) mod cursor; +pub(crate) mod cursor_manager; pub(crate) mod transaction; /// The global environment for the frontend server. @@ -608,8 +610,7 @@ pub struct SessionImpl { /// Last idle instant last_idle_instant: Arc>>, - /// The cursors declared in the transaction. - cursors: tokio::sync::Mutex>, + cursor_manager: Arc, } #[derive(Error, Debug)] @@ -650,7 +651,7 @@ impl SessionImpl { notices: Default::default(), exec_context: Mutex::new(None), last_idle_instant: Default::default(), - cursors: Default::default(), + cursor_manager: Arc::new(CursorManager::default()), } } @@ -677,7 +678,7 @@ impl SessionImpl { )) .into(), last_idle_instant: Default::default(), - cursors: Default::default(), + cursor_manager: Arc::new(CursorManager::default()), } } @@ -747,6 +748,10 @@ impl SessionImpl { .map(|context| context.running_sql.clone()) } + pub fn get_cursor_manager(&self) -> Arc { + self.cursor_manager.clone() + } + pub fn peer_addr(&self) -> &Address { &self.peer_addr } @@ -874,6 +879,32 @@ impl SessionImpl { Ok(connection.clone()) } + pub fn get_subscription_by_name( + &self, + schema_name: Option, + subscription_name: &str, + ) -> Result> { + let db_name = self.database(); + let search_path = self.config().search_path(); + let user_name = &self.auth_context().user_name; + + let catalog_reader = self.env().catalog_reader().read_guard(); + let schema = match schema_name { + Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?, + None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?, + }; + let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?; + let subscription = schema + .get_subscription_by_name(subscription_name) + .ok_or_else(|| { + RwError::from(ErrorCode::ItemNotFound(format!( + "subscription {} not found", + subscription_name + ))) + })?; + Ok(subscription.clone()) + } + pub fn clear_cancel_query_flag(&self) { let mut flag = self.current_query_cancel_flag.lock(); *flag = None; diff --git a/src/frontend/src/session/cursor.rs b/src/frontend/src/session/cursor.rs deleted file mode 100644 index 74a54b75123a5..0000000000000 --- a/src/frontend/src/session/cursor.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::VecDeque; -use std::sync::Arc; - -use futures::StreamExt; -use pgwire::pg_field_descriptor::PgFieldDescriptor; -use pgwire::pg_response::StatementType; -use pgwire::types::Row; -use risingwave_sqlparser::ast::ObjectName; - -use super::PgResponseStream; -use crate::error::{ErrorCode, Result, RwError}; -use crate::handler::query::{create_stream, BatchPlanFragmenterResult}; -use crate::session::SessionImpl; - -pub struct Cursor { - row_stream: PgResponseStream, - pg_descs: Vec, - remaining_rows: VecDeque, -} - -impl Cursor { - pub async fn new( - plan_fragmenter_result: BatchPlanFragmenterResult, - session: Arc, - ) -> Result { - assert_eq!(plan_fragmenter_result.stmt_type, StatementType::SELECT); - let (row_stream, pg_descs) = create_stream(session, plan_fragmenter_result, vec![]).await?; - Ok(Self { - row_stream, - pg_descs, - remaining_rows: VecDeque::::new(), - }) - } - - pub async fn next_once(&mut self) -> Result> { - while self.remaining_rows.is_empty() { - let rows = self.row_stream.next().await; - let rows = match rows { - None => return Ok(None), - Some(row) => { - row.map_err(|err| RwError::from(ErrorCode::InternalError(format!("{}", err))))? - } - }; - self.remaining_rows = rows.into_iter().collect(); - } - let row = self.remaining_rows.pop_front().unwrap(); - Ok(Some(row)) - } - - pub async fn next(&mut self, count: Option) -> Result> { - // `FETCH NEXT` is equivalent to `FETCH 1`. - let fetch_count = count.unwrap_or(1); - if fetch_count <= 0 { - Err(crate::error::ErrorCode::InternalError( - "FETCH a non-positive count is not supported yet".to_string(), - ) - .into()) - } else { - // min with 100 to avoid allocating too many memory at once. - let mut ans = Vec::with_capacity(std::cmp::min(100, fetch_count) as usize); - let mut cur = 0; - while cur < fetch_count - && let Some(row) = self.next_once().await? - { - cur += 1; - ans.push(row); - } - Ok(ans) - } - } - - pub fn pg_descs(&self) -> Vec { - self.pg_descs.clone() - } -} - -impl SessionImpl { - pub async fn add_cursor(&self, cursor_name: ObjectName, cursor: Cursor) -> Result<()> { - if self - .cursors - .lock() - .await - .try_insert(cursor_name.clone(), cursor) - .is_err() - { - return Err(ErrorCode::CatalogError( - format!("cursor \"{cursor_name}\" already exists").into(), - ) - .into()); - } - Ok(()) - } - - pub async fn drop_all_cursors(&self) { - self.cursors.lock().await.clear(); - } - - pub async fn drop_cursor(&self, cursor_name: ObjectName) -> Result<()> { - match self.cursors.lock().await.remove(&cursor_name) { - Some(_) => Ok(()), - None => Err(ErrorCode::CatalogError( - format!("cursor \"{cursor_name}\" does not exist").into(), - ) - .into()), - } - } - - pub async fn cursor_next( - &self, - cursor_name: &ObjectName, - count: Option, - ) -> Result<(Vec, Vec)> { - if let Some(cursor) = self.cursors.lock().await.get_mut(cursor_name) { - Ok((cursor.next(count).await?, cursor.pg_descs())) - } else { - Err( - ErrorCode::CatalogError(format!("cursor \"{cursor_name}\" does not exist").into()) - .into(), - ) - } - } -} diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs new file mode 100644 index 0000000000000..214dac1a2b5a2 --- /dev/null +++ b/src/frontend/src/session/cursor_manager.rs @@ -0,0 +1,544 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::ops::Index; +use core::time::Duration; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::time::Instant; + +use bytes::Bytes; +use futures::StreamExt; +use itertools::Itertools; +use pgwire::pg_field_descriptor::PgFieldDescriptor; +use pgwire::types::Row; +use risingwave_common::types::DataType; +use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; + +use crate::catalog::subscription_catalog::SubscriptionCatalog; +use crate::error::{ErrorCode, Result, RwError}; +use crate::handler::declare_cursor::create_stream_for_cursor; +use crate::handler::util::{ + convert_epoch_to_logstore_i64, convert_logstore_i64_to_unix_millis, + gen_query_from_logstore_ge_rw_timestamp, gen_query_from_table_name, +}; +use crate::handler::HandlerArgs; +use crate::PgResponseStream; + +pub const KV_LOG_STORE_EPOCH: &str = "kv_log_store_epoch"; +const KV_LOG_STORE_ROW_OP: &str = "kv_log_store_row_op"; +pub const KV_LOG_STORE_SEQ_ID: &str = "kv_log_store_seq_id"; +pub const KV_LOG_STORE_VNODE: &str = "kv_log_store_vnode"; + +pub enum Cursor { + Subscription(SubscriptionCursor), + Query(QueryCursor), +} +impl Cursor { + pub async fn next( + &mut self, + count: u32, + handle_args: HandlerArgs, + ) -> Result<(Vec, Vec)> { + match self { + Cursor::Subscription(cursor) => cursor.next(count, handle_args).await, + Cursor::Query(cursor) => cursor.next(count).await, + } + } +} + +pub struct QueryCursor { + row_stream: PgResponseStream, + pg_descs: Vec, + remaining_rows: VecDeque, +} + +impl QueryCursor { + pub fn new(row_stream: PgResponseStream, pg_descs: Vec) -> Result { + Ok(Self { + row_stream, + pg_descs, + remaining_rows: VecDeque::::new(), + }) + } + + pub async fn next_once(&mut self) -> Result> { + while self.remaining_rows.is_empty() { + let rows = self.row_stream.next().await; + let rows = match rows { + None => return Ok(None), + Some(row) => { + row.map_err(|err| RwError::from(ErrorCode::InternalError(format!("{}", err))))? + } + }; + self.remaining_rows = rows.into_iter().collect(); + } + let row = self.remaining_rows.pop_front().unwrap(); + Ok(Some(row)) + } + + pub async fn next(&mut self, count: u32) -> Result<(Vec, Vec)> { + // `FETCH NEXT` is equivalent to `FETCH 1`. + // min with 100 to avoid allocating too many memory at once. + let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); + let mut cur = 0; + while cur < count + && let Some(row) = self.next_once().await? + { + cur += 1; + ans.push(row); + } + Ok((ans, self.pg_descs.clone())) + } +} + +enum State { + InitLogStoreQuery { + // The rw_timestamp used to initiate the query to read from subscription logstore. + seek_timestamp: i64, + + // If specified, the expected_timestamp must be an exact match for the next rw_timestamp. + expected_timestamp: Option, + }, + Fetch { + // Whether the query is reading from snapshot + // true: read from the upstream table snapshot + // false: read from subscription logstore + from_snapshot: bool, + + // The rw_timestamp used to initiate the query to read from subscription logstore. + rw_timestamp: i64, + + // The row stream to from the batch query read. + // It is returned from the batch execution. + row_stream: PgResponseStream, + + // The pg descs to from the batch query read. + // It is returned from the batch execution. + pg_descs: Vec, + + // A cache to store the remaining rows from the row stream. + remaining_rows: VecDeque, + }, + Invalid, +} + +pub struct SubscriptionCursor { + cursor_name: String, + subscription: Arc, + cursor_need_drop_time: Instant, + state: State, +} + +impl SubscriptionCursor { + pub async fn new( + cursor_name: String, + start_timestamp: Option, + subscription: Arc, + handle_args: &HandlerArgs, + ) -> Result { + let state = if let Some(start_timestamp) = start_timestamp { + State::InitLogStoreQuery { + seek_timestamp: start_timestamp, + expected_timestamp: None, + } + } else { + // The query stream needs to initiated on cursor creation to make sure + // future fetch on the cursor starts from the snapshot when the cursor is declared. + // + // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? + let (row_stream, pg_descs) = + Self::initiate_query(None, &subscription, handle_args.clone()).await?; + let pinned_epoch = handle_args + .session + .get_pinned_snapshot() + .ok_or_else(|| { + ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_string()) + })? + .epoch_with_frontend_pinned() + .ok_or_else(|| { + ErrorCode::InternalError( + "Fetch Cursor can't support setting an epoch".to_string(), + ) + })? + .0; + let start_timestamp = convert_epoch_to_logstore_i64(pinned_epoch); + + State::Fetch { + from_snapshot: true, + rw_timestamp: start_timestamp, + row_stream, + pg_descs, + remaining_rows: VecDeque::new(), + } + }; + + let cursor_need_drop_time = + Instant::now() + Duration::from_secs(subscription.get_retention_seconds()?); + Ok(Self { + cursor_name, + subscription, + cursor_need_drop_time, + state, + }) + } + + pub async fn next_row( + &mut self, + handle_args: HandlerArgs, + ) -> Result<(Option, Vec)> { + loop { + match &mut self.state { + State::InitLogStoreQuery { + seek_timestamp, + expected_timestamp, + } => { + let from_snapshot = false; + + // Initiate a new batch query to continue fetching + let (mut row_stream, pg_descs) = Self::initiate_query( + Some(*seek_timestamp), + &self.subscription, + handle_args.clone(), + ) + .await?; + self.cursor_need_drop_time = Instant::now() + + Duration::from_secs(self.subscription.get_retention_seconds()?); + + // Try refill remaining rows + let mut remaining_rows = VecDeque::new(); + Self::try_refill_remaining_rows(&mut row_stream, &mut remaining_rows).await?; + + // Get the rw_timestamp in the first row returned by the query if any. + // new_row_rw_timestamp == None means the query returns empty result. + let new_row_rw_timestamp: Option = remaining_rows.front().map(|row| { + std::str::from_utf8(row.index(0).as_ref().unwrap()) + .unwrap() + .parse() + .unwrap() + }); + + // Check expected_timestamp against the rw_timestamp of the first row. + // Return an error if there is no new row or there is a mismatch. + if let Some(expected_timestamp) = expected_timestamp { + let expected_timestamp = *expected_timestamp; + if new_row_rw_timestamp.is_none() + || new_row_rw_timestamp.unwrap() != expected_timestamp + { + // Transition to Invalid state and return and error + self.state = State::Invalid; + return Err(ErrorCode::CatalogError( + format!( + " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor", + convert_logstore_i64_to_unix_millis(expected_timestamp) + ) + .into(), + ) + .into()); + } + } + + // Return None if no data is found for the rw_timestamp in logstore. + // This happens when reaching EOF of logstore. This check cannot be moved before the + // expected_timestamp check to ensure that an error is returned on empty result when + // expected_timstamp is set. + if new_row_rw_timestamp.is_none() { + return Ok((None, pg_descs)); + } + + // Transition to the Fetch state + self.state = State::Fetch { + from_snapshot, + rw_timestamp: new_row_rw_timestamp.unwrap(), + row_stream, + pg_descs, + remaining_rows, + }; + } + State::Fetch { + from_snapshot, + rw_timestamp, + row_stream, + pg_descs, + remaining_rows, + } => { + let from_snapshot = *from_snapshot; + let rw_timestamp = *rw_timestamp; + + // Try refill remaining rows + Self::try_refill_remaining_rows(row_stream, remaining_rows).await?; + + if let Some(row) = remaining_rows.pop_front() { + // 1. Fetch the next row + let new_row = row.take(); + if from_snapshot { + // 1a. The rw_timestamp in the table is all the same, so don't need to check. + return Ok(( + Some(Row::new(Self::build_row_with_snapshot(new_row))), + pg_descs.clone(), + )); + } + + let new_row_rw_timestamp: i64 = new_row + .get(0) + .unwrap() + .as_ref() + .map(|bytes| std::str::from_utf8(bytes).unwrap().parse().unwrap()) + .unwrap(); + + if new_row_rw_timestamp != rw_timestamp { + // 1b. Find the next rw_timestamp. + // Initiate a new batch query to avoid query timeout and pinning version for too long. + // expected_timestamp shouold be set to ensure there is no data missing in the next query. + self.state = State::InitLogStoreQuery { + seek_timestamp: new_row_rw_timestamp, + expected_timestamp: Some(new_row_rw_timestamp), + }; + } else { + // 1c. The rw_timestamp of this row is equal to self.rw_timestamp, return row + return Ok(( + Some(Row::new(Self::build_row_with_logstore( + new_row, + rw_timestamp, + )?)), + pg_descs.clone(), + )); + } + } else { + // 2. Reach EOF for the current query. + // Initiate a new batch query using the rw_timestamp + 1. + // expected_timestamp don't need to be set as the next rw_timestamp is unknown. + self.state = State::InitLogStoreQuery { + seek_timestamp: rw_timestamp + 1, + expected_timestamp: None, + }; + } + } + State::Invalid => { + // TODO: auto close invalid cursor? + return Err(ErrorCode::InternalError( + "Cursor is in invalid state. Please close and re-create the cursor." + .to_string(), + ) + .into()); + } + } + } + } + + pub async fn next( + &mut self, + count: u32, + handle_args: HandlerArgs, + ) -> Result<(Vec, Vec)> { + if Instant::now() > self.cursor_need_drop_time { + return Err(ErrorCode::InternalError( + "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_string(), + ) + .into()); + } + // `FETCH NEXT` is equivalent to `FETCH 1`. + if count != 1 { + Err(crate::error::ErrorCode::InternalError( + "FETCH count with subscription is not supported".to_string(), + ) + .into()) + } else { + let (row, pg_descs) = self.next_row(handle_args).await?; + if let Some(row) = row { + Ok((vec![row], pg_descs)) + } else { + Ok((vec![], pg_descs)) + } + } + } + + async fn initiate_query( + rw_timestamp: Option, + subscription: &SubscriptionCatalog, + handle_args: HandlerArgs, + ) -> Result<(PgResponseStream, Vec)> { + let query_stmt = if let Some(rw_timestamp) = rw_timestamp { + Statement::Query(Box::new(gen_query_from_logstore_ge_rw_timestamp( + &subscription.get_log_store_name(), + rw_timestamp, + ))) + } else { + let subscription_from_table_name = ObjectName(vec![Ident::from( + subscription.subscription_from_name.as_ref(), + )]); + Statement::Query(Box::new(gen_query_from_table_name( + subscription_from_table_name, + ))) + }; + let (row_stream, pg_descs) = create_stream_for_cursor(handle_args, query_stmt).await?; + Ok(( + row_stream, + Self::build_desc(pg_descs, rw_timestamp.is_none()), + )) + } + + async fn try_refill_remaining_rows( + row_stream: &mut PgResponseStream, + remaining_rows: &mut VecDeque, + ) -> Result<()> { + if remaining_rows.is_empty() + && let Some(row_set) = row_stream.next().await + { + remaining_rows.extend(row_set.map_err(|e| { + ErrorCode::InternalError(format!("Cursor get next chunk error {:?}", e.to_string())) + })?); + } + Ok(()) + } + + pub fn build_row_with_snapshot(row: Vec>) -> Vec> { + let mut new_row = vec![None, Some(Bytes::from(1i16.to_string()))]; + new_row.extend(row); + new_row + } + + pub fn build_row_with_logstore( + mut row: Vec>, + rw_timestamp: i64, + ) -> Result>> { + let mut new_row = vec![Some(Bytes::from( + convert_logstore_i64_to_unix_millis(rw_timestamp).to_string(), + ))]; + // need remove kv_log_store_epoch + new_row.extend(row.drain(1..row.len()).collect_vec()); + Ok(new_row) + } + + pub fn build_desc( + mut descs: Vec, + from_snapshot: bool, + ) -> Vec { + let mut new_descs = vec![ + PgFieldDescriptor::new( + "rw_timestamp".to_owned(), + DataType::Int64.to_oid(), + DataType::Int64.type_len(), + ), + PgFieldDescriptor::new( + "op".to_owned(), + DataType::Int16.to_oid(), + DataType::Int16.type_len(), + ), + ]; + // need remove kv_log_store_epoch and kv_log_store_row_op + if from_snapshot { + new_descs.extend(descs) + } else { + assert_eq!( + descs.get(0).unwrap().get_name(), + KV_LOG_STORE_EPOCH, + "Cursor query logstore: first column must be {}", + KV_LOG_STORE_EPOCH + ); + assert_eq!( + descs.get(1).unwrap().get_name(), + KV_LOG_STORE_ROW_OP, + "Cursor query logstore: first column must be {}", + KV_LOG_STORE_ROW_OP + ); + new_descs.extend(descs.drain(2..descs.len())); + } + new_descs + } +} + +#[derive(Default)] +pub struct CursorManager { + cursor_map: tokio::sync::Mutex>, +} + +impl CursorManager { + pub async fn add_subscription_cursor( + &self, + cursor_name: String, + start_timestamp: Option, + subscription: Arc, + handle_args: &HandlerArgs, + ) -> Result<()> { + let cursor = SubscriptionCursor::new( + cursor_name.clone(), + start_timestamp, + subscription, + handle_args, + ) + .await?; + self.cursor_map + .lock() + .await + .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor)) + .map_err(|_| { + ErrorCode::CatalogError(format!("cursor `{}` already exists", cursor_name).into()) + })?; + Ok(()) + } + + pub async fn add_query_cursor( + &self, + cursor_name: ObjectName, + row_stream: PgResponseStream, + pg_descs: Vec, + ) -> Result<()> { + let cursor = QueryCursor::new(row_stream, pg_descs)?; + self.cursor_map + .lock() + .await + .try_insert(cursor_name.to_string(), Cursor::Query(cursor)) + .map_err(|_| { + ErrorCode::CatalogError(format!("cursor `{}` already exists", cursor_name).into()) + })?; + + Ok(()) + } + + pub async fn remove_cursor(&self, cursor_name: String) -> Result<()> { + self.cursor_map + .lock() + .await + .remove(&cursor_name) + .ok_or_else(|| { + ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into()) + })?; + Ok(()) + } + + pub async fn remove_all_cursor(&self) { + self.cursor_map.lock().await.clear(); + } + + pub async fn remove_all_query_cursor(&self) { + self.cursor_map + .lock() + .await + .retain(|_, v| matches!(v, Cursor::Subscription(_))); + } + + pub async fn get_rows_with_cursor( + &self, + cursor_name: String, + count: u32, + handle_args: HandlerArgs, + ) -> Result<(Vec, Vec)> { + if let Some(cursor) = self.cursor_map.lock().await.get_mut(&cursor_name) { + cursor.next(count, handle_args).await + } else { + Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) + } + } +} diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index 2d4a2484013a7..cdc2b0e7d692c 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -195,6 +195,10 @@ impl SessionImpl { }) } + pub fn get_pinned_snapshot(&self) -> Option { + self.txn_ctx().snapshot.clone() + } + /// Unpin snapshot by replacing the snapshot with None. pub fn unpin_snapshot(&self) { self.txn_ctx().snapshot = None; diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index c82a54cbc0e62..724355358b026 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -7,6 +7,7 @@ mod m20231008_020431_hummock; mod m20240304_074901_subscription; mod m20240410_082733_with_version_column_migration; mod m20240410_154406_session_params; +mod m20240417_062305_subscription_internal_table_name; pub struct Migrator; @@ -19,6 +20,7 @@ impl MigratorTrait for Migrator { Box::new(m20240304_074901_subscription::Migration), Box::new(m20240410_082733_with_version_column_migration::Migration), Box::new(m20240410_154406_session_params::Migration), + Box::new(m20240417_062305_subscription_internal_table_name::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240417_062305_subscription_internal_table_name.rs b/src/meta/model_v2/migration/src/m20240417_062305_subscription_internal_table_name.rs new file mode 100644 index 0000000000000..a4c6f60928c91 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240417_062305_subscription_internal_table_name.rs @@ -0,0 +1,41 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column( + ColumnDef::new(Subscription::SubscriptionInternalTableName).integer(), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new( + Subscription::SubscriptionInternalTableName.to_string(), + )) + .to_owned(), + ) + .await?; + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Subscription { + Table, + SubscriptionInternalTableName, +} diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs index 096c63078a2a4..8a695c2b4c659 100644 --- a/src/meta/model_v2/src/subscription.rs +++ b/src/meta/model_v2/src/subscription.rs @@ -30,6 +30,7 @@ pub struct Model { pub properties: Property, pub definition: String, pub subscription_from_name: String, + pub subscription_internal_table_name: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -63,6 +64,7 @@ impl From for ActiveModel { properties: Set(pb_subscription.properties.into()), definition: Set(pb_subscription.definition), subscription_from_name: Set(pb_subscription.subscription_from_name), + subscription_internal_table_name: Set(pb_subscription.subscription_internal_table_name), } } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index bddc82d372d6c..111e8e5ab9fe3 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; +use risingwave_common::catalog::{ + is_subscription_internal_table, TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS, +}; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -854,10 +856,10 @@ impl CatalogController { .all(&txn) .await?; let mut relations = internal_table_objs - .into_iter() + .iter() .map(|(table, obj)| PbRelation { relation_info: Some(PbRelationInfo::Table( - ObjectModel(table, obj.unwrap()).into(), + ObjectModel(table.clone(), obj.clone().unwrap()).into(), )), }) .collect_vec(); @@ -900,11 +902,21 @@ impl CatalogController { }); } ObjectType::Subscription => { - let (subscription, obj) = Subscription::find_by_id(job_id) + let (mut subscription, obj) = Subscription::find_by_id(job_id) .find_also_related(Object) .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?; + let log_store_names: Vec<_> = internal_table_objs + .iter() + .filter(|a| is_subscription_internal_table(&subscription.name, &a.0.name)) + .map(|a| &a.0.name) + .collect(); + if log_store_names.len() != 1 { + bail!("A subscription can only have one log_store_name"); + } + subscription.subscription_internal_table_name = + log_store_names.get(0).cloned().cloned(); relations.push(PbRelation { relation_info: Some(PbRelationInfo::Subscription( ObjectModel(subscription, obj.unwrap()).into(), diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 645a580d84456..ff98459888399 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -240,6 +240,7 @@ impl From> for PbSubscription { subscription_from_name: value.0.subscription_from_name, initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, + subscription_internal_table_name: value.0.subscription_internal_table_name, } } } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 6eae315bac72c..5a316696e6195 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -27,8 +27,8 @@ pub use database::*; pub use fragment::*; use itertools::Itertools; use risingwave_common::catalog::{ - valid_table_name, TableId as StreamingJobId, TableOption, DEFAULT_DATABASE_NAME, - DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG, + is_subscription_internal_table, valid_table_name, TableId as StreamingJobId, TableOption, + DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG, DEFAULT_SUPER_USER_FOR_PG_ID, DEFAULT_SUPER_USER_ID, SYSTEM_SCHEMAS, }; use risingwave_common::{bail, ensure}; @@ -3231,6 +3231,15 @@ impl CatalogManager { subscription.schema_id, subscription.name.clone(), ); + let log_store_names: Vec<_> = internal_tables + .iter() + .filter(|a| is_subscription_internal_table(&subscription.name, a.get_name())) + .map(|a| a.get_name()) + .collect(); + if log_store_names.len() != 1 { + bail!("A subscription can only have one log_store_name"); + } + subscription.subscription_internal_table_name = log_store_names.get(0).cloned().cloned(); let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); assert!( diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 82b59d1bf462a..79c12c1b6cc6f 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1286,6 +1286,22 @@ pub enum Statement { append_only: bool, params: CreateFunctionBody, }, + + /// DECLARE CURSOR + DeclareCursor { + stmt: DeclareCursorStatement, + }, + + // FETCH CURSOR + FetchCursor { + stmt: FetchCursorStatement, + }, + + // CLOSE CURSOR + CloseCursor { + stmt: CloseCursorStatement, + }, + /// ALTER DATABASE AlterDatabase { name: ObjectName, @@ -1497,21 +1513,6 @@ pub enum Statement { param: Ident, value: SetVariableValue, }, - /// DECLARE CURSOR - DeclareCursor { - cursor_name: ObjectName, - query: Box, - }, - /// FETCH FROM CURSOR - FetchCursor { - cursor_name: ObjectName, - /// Number of rows to fetch. `None` means `FETCH ALL`. - count: Option, - }, - /// CLOSE CURSOR - CloseCursor { - cursor_name: Option, - }, /// FLUSH the current barrier. /// /// Note: RisingWave specific statement. @@ -1854,6 +1855,9 @@ impl fmt::Display for Statement { Statement::CreateSink { stmt } => write!(f, "CREATE SINK {}", stmt,), Statement::CreateSubscription { stmt } => write!(f, "CREATE SUBSCRIPTION {}", stmt,), Statement::CreateConnection { stmt } => write!(f, "CREATE CONNECTION {}", stmt,), + Statement::DeclareCursor { stmt } => write!(f, "DECLARE {}", stmt,), + Statement::FetchCursor { stmt } => write!(f, "FETCH {}", stmt), + Statement::CloseCursor { stmt } => write!(f, "CLOSE {}", stmt), Statement::AlterDatabase { name, operation } => { write!(f, "ALTER DATABASE {} {}", name, operation) } @@ -2072,23 +2076,6 @@ impl fmt::Display for Statement { "{param} = {value}", ) } - Statement::DeclareCursor { cursor_name, query } => { - write!(f, "DECLARE {} CURSOR FOR {}", cursor_name, query) - }, - Statement::FetchCursor { cursor_name , count} => { - if let Some(count) = count { - write!(f, "FETCH {} FROM {}", count, cursor_name) - } else { - write!(f, "FETCH NEXT FROM {}", cursor_name) - } - }, - Statement::CloseCursor { cursor_name } => { - if let Some(name) = cursor_name { - write!(f, "CLOSE {}", name) - } else { - write!(f, "CLOSE ALL") - } - } Statement::Flush => { write!(f, "FLUSH") } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index ff50c31a29d48..a821cd77e70e3 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -634,6 +634,147 @@ impl fmt::Display for CreateSubscriptionStatement { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum DeclareCursor { + Query(Box), + Subscription(ObjectName, Option), +} + +impl fmt::Display for DeclareCursor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + match self { + DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())), + DeclareCursor::Subscription(name, since) => { + v.push(format!("{}", name)); + v.push(format!("{:?}", since)); + } + } + v.iter().join(" ").fmt(f) + } +} +// sql_grammar!(DeclareCursorStatement { +// cursor_name: Ident, +// [Keyword::SUBSCRIPTION] +// [Keyword::CURSOR], +// [Keyword::FOR], +// subscription: Ident or query: Query, +// [Keyword::SINCE], +// rw_timestamp: Ident, +// }); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct DeclareCursorStatement { + pub cursor_name: ObjectName, + pub declare_cursor: DeclareCursor, +} + +impl ParseTo for DeclareCursorStatement { + fn parse_to(p: &mut Parser) -> Result { + impl_parse_to!(cursor_name: ObjectName, p); + + let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) { + p.expect_keyword(Keyword::CURSOR)?; + p.expect_keyword(Keyword::FOR)?; + DeclareCursor::Query(Box::new(p.parse_query()?)) + } else { + p.expect_keyword(Keyword::CURSOR)?; + p.expect_keyword(Keyword::FOR)?; + let cursor_for_name = p.parse_object_name()?; + let rw_timestamp = p.parse_since()?; + DeclareCursor::Subscription(cursor_for_name, rw_timestamp) + }; + + Ok(Self { + cursor_name, + declare_cursor, + }) + } +} +impl fmt::Display for DeclareCursorStatement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + impl_fmt_display!(cursor_name, v, self); + v.push("CURSOR FOR ".to_string()); + impl_fmt_display!(declare_cursor, v, self); + v.iter().join(" ").fmt(f) + } +} + +// sql_grammar!(FetchCursorStatement { +// cursor_name: Ident, +// }); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct FetchCursorStatement { + pub cursor_name: ObjectName, + pub count: u32, +} + +impl ParseTo for FetchCursorStatement { + fn parse_to(p: &mut Parser) -> Result { + let count = if p.parse_keyword(Keyword::NEXT) { + 1 + } else { + let count_str = p.parse_number_value()?; + count_str.parse::().map_err(|e| { + ParserError::ParserError(format!("Could not parse '{}' as i32: {}", count_str, e)) + })? + }; + p.expect_keyword(Keyword::FROM)?; + impl_parse_to!(cursor_name: ObjectName, p); + + Ok(Self { cursor_name, count }) + } +} + +impl fmt::Display for FetchCursorStatement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + if self.count == 1 { + v.push("NEXT ".to_string()); + } else { + impl_fmt_display!(count, v, self); + } + v.push("FROM ".to_string()); + impl_fmt_display!(cursor_name, v, self); + v.iter().join(" ").fmt(f) + } +} + +// sql_grammar!(CloseCursorStatement { +// cursor_name: Ident, +// }); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct CloseCursorStatement { + pub cursor_name: Option, +} + +impl ParseTo for CloseCursorStatement { + fn parse_to(p: &mut Parser) -> Result { + let cursor_name = if p.parse_keyword(Keyword::ALL) { + None + } else { + Some(p.parse_object_name()?) + }; + + Ok(Self { cursor_name }) + } +} +impl fmt::Display for CloseCursorStatement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + if let Some(cursor_name) = &self.cursor_name { + v.push(format!("{}", cursor_name)); + } else { + v.push("ALL".to_string()); + } + v.iter().join(" ").fmt(f) + } +} + // sql_grammar!(CreateConnectionStatement { // if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], // connection_name: Ident, @@ -708,6 +849,25 @@ impl fmt::Display for WithProperties { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum Since { + TimestampMsNum(u64), + ProcessTime, + Begin, +} + +impl fmt::Display for Since { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use Since::*; + match self { + TimestampMsNum(ts) => write!(f, " SINCE {}", ts), + ProcessTime => write!(f, " SINCE PROCTIME()"), + Begin => write!(f, " SINCE BEGIN()"), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct RowSchemaLocation { diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 208a642eb484b..5d282427d4537 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -459,6 +459,7 @@ define_keywords!( SETS, SHOW, SIMILAR, + SINCE, SINK, SINKS, SMALLINT, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index aaf400b43879e..92009ecb016ea 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -241,6 +241,9 @@ impl Parser { self.prev_token(); Ok(Statement::Query(Box::new(self.parse_query()?))) } + Keyword::DECLARE => Ok(self.parse_declare()?), + Keyword::FETCH => Ok(self.parse_fetch_cursor()?), + Keyword::CLOSE => Ok(self.parse_close_cursor()?), Keyword::TRUNCATE => Ok(self.parse_truncate()?), Keyword::CREATE => Ok(self.parse_create()?), Keyword::DROP => Ok(self.parse_drop()?), @@ -278,9 +281,6 @@ impl Parser { Keyword::EXECUTE => Ok(self.parse_execute()?), Keyword::PREPARE => Ok(self.parse_prepare()?), Keyword::COMMENT => Ok(self.parse_comment()?), - Keyword::DECLARE => Ok(self.parse_declare_cursor()?), - Keyword::FETCH => Ok(self.parse_fetch_cursor()?), - Keyword::CLOSE => Ok(self.parse_close_cursor()?), Keyword::FLUSH => Ok(Statement::Flush), Keyword::WAIT => Ok(Statement::Wait), Keyword::RECOVER => Ok(Statement::Recover), @@ -2362,6 +2362,24 @@ impl Parser { }) } + pub fn parse_declare(&mut self) -> Result { + Ok(Statement::DeclareCursor { + stmt: DeclareCursorStatement::parse_to(self)?, + }) + } + + pub fn parse_fetch_cursor(&mut self) -> Result { + Ok(Statement::FetchCursor { + stmt: FetchCursorStatement::parse_to(self)?, + }) + } + + pub fn parse_close_cursor(&mut self) -> Result { + Ok(Statement::CloseCursor { + stmt: CloseCursorStatement::parse_to(self)?, + }) + } + fn parse_table_column_def(&mut self) -> Result { Ok(TableColumnDef { name: self.parse_identifier_non_reserved()?, @@ -2988,6 +3006,44 @@ impl Parser { Ok(SqlOption { name, value }) } + pub fn parse_since(&mut self) -> Result, ParserError> { + if self.parse_keyword(Keyword::SINCE) { + let token = self.next_token(); + match token.token { + Token::Word(w) => { + let ident = w.to_ident()?; + // Backward compatibility for now. + if ident.real_value() == "proctime" || ident.real_value() == "now" { + self.expect_token(&Token::LParen)?; + self.expect_token(&Token::RParen)?; + Ok(Some(Since::ProcessTime)) + } else if ident.real_value() == "begin" { + self.expect_token(&Token::LParen)?; + self.expect_token(&Token::RParen)?; + Ok(Some(Since::Begin)) + } else { + parser_err!(format!( + "Expected proctime(), begin() or now(), found: {}", + ident.real_value() + )) + } + } + Token::Number(s) => { + let num = s.parse::().map_err(|e| { + ParserError::ParserError(format!("Could not parse '{}' as u64: {}", s, e)) + }); + Ok(Some(Since::TimestampMsNum(num?))) + } + unexpected => self.expected( + "proctime(), begin() , now(), Number", + unexpected.with_location(token.location), + ), + } + } else { + Ok(None) + } + } + pub fn parse_emit_mode(&mut self) -> Result, ParserError> { if self.parse_keyword(Keyword::EMIT) { match self.parse_one_of_keywords(&[Keyword::IMMEDIATELY, Keyword::ON]) { @@ -3394,7 +3450,6 @@ impl Parser { self.peek_token(), ); } - let value = self.parse_set_variable()?; let deferred = self.parse_keyword(Keyword::DEFERRED); @@ -5427,40 +5482,6 @@ impl Parser { comment, }) } - - /// Parse a SQL DECLARE statement - pub fn parse_declare_cursor(&mut self) -> Result { - let cursor_name = self.parse_object_name()?; - self.expect_keyword(Keyword::CURSOR)?; - self.expect_keyword(Keyword::FOR)?; - let query = Box::new(self.parse_query()?); - Ok(Statement::DeclareCursor { cursor_name, query }) - } - - /// Parse a SQL FETCH statement - pub fn parse_fetch_cursor(&mut self) -> Result { - let count = if self.parse_keyword(Keyword::NEXT) { - None - } else { - let count_str = self.parse_number_value()?; - Some(count_str.parse::().map_err(|e| { - ParserError::ParserError(format!("Could not parse '{}' as i32: {}", count_str, e)) - })?) - }; - self.expect_keyword(Keyword::FROM)?; - let cursor_name = self.parse_object_name()?; - Ok(Statement::FetchCursor { cursor_name, count }) - } - - /// Parse a SQL CLOSE statement - pub fn parse_close_cursor(&mut self) -> Result { - let cursor_name = if self.parse_keyword(Keyword::ALL) { - None - } else { - Some(self.parse_object_name()?) - }; - Ok(Statement::CloseCursor { cursor_name }) - } } impl Word { diff --git a/src/stream/src/common/log_store_impl/subscription_log_store.rs b/src/stream/src/common/log_store_impl/subscription_log_store.rs index 39ada926826d7..c7de7073f85ed 100644 --- a/src/stream/src/common/log_store_impl/subscription_log_store.rs +++ b/src/stream/src/common/log_store_impl/subscription_log_store.rs @@ -18,7 +18,6 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VnodeBitmapExt; use risingwave_connector::sink::log_store::LogStoreResult; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; @@ -83,15 +82,9 @@ impl SubscriptionLogStoreWriter { pub async fn flush_current_epoch( &mut self, next_epoch: u64, - is_checkpoint: bool, truncate_offset: Option, ) -> LogStoreResult<()> { - let epoch = self.state_store.epoch(); - for vnode in self.serde.vnodes().iter_vnodes() { - let (key, value) = self.serde.serialize_barrier(epoch, vnode, is_checkpoint); - self.state_store.insert(key, value, None)?; - } - + // Because barrier has no effect on subscription, barrier will not be inserted here let watermark = truncate_offset.map(|truncate_offset| { VnodeWatermark::new( self.serde.vnodes().clone(), diff --git a/src/stream/src/executor/subscription.rs b/src/stream/src/executor/subscription.rs index 8eed385a2b599..cd752dfdf28cd 100644 --- a/src/stream/src/executor/subscription.rs +++ b/src/stream/src/executor/subscription.rs @@ -114,11 +114,7 @@ impl SubscriptionExecutor { None }; self.log_store - .flush_current_epoch( - barrier.epoch.curr, - barrier.kind.is_checkpoint(), - truncate_offset, - ) + .flush_current_epoch(barrier.epoch.curr, truncate_offset) .await?; if let Some(vnode_bitmap) = diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 1431909f5366f..aeb08b5f0842b 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -42,6 +42,7 @@ pub enum StatementType { FETCH, COPY, EXPLAIN, + CLOSE_CURSOR, CREATE_TABLE, CREATE_MATERIALIZED_VIEW, CREATE_VIEW, @@ -55,6 +56,7 @@ pub enum StatementType { CREATE_FUNCTION, CREATE_CONNECTION, COMMENT, + DECLARE_CURSOR, DESCRIBE, GRANT_PRIVILEGE, DROP_TABLE, @@ -100,9 +102,7 @@ pub enum StatementType { ROLLBACK, SET_TRANSACTION, CANCEL_COMMAND, - DECLARE_CURSOR, FETCH_CURSOR, - CLOSE_CURSOR, WAIT, KILL, RECOVER, diff --git a/src/utils/pgwire/src/types.rs b/src/utils/pgwire/src/types.rs index 45f289b053e65..d4d37e1168ea1 100644 --- a/src/utils/pgwire/src/types.rs +++ b/src/utils/pgwire/src/types.rs @@ -45,6 +45,10 @@ impl Row { pub fn values(&self) -> &[Option] { &self.0 } + + pub fn take(self) -> Vec> { + self.0 + } } impl Index for Row {