diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 7e5a386d498a8..ba3a047058256 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -132,7 +132,7 @@ steps: files: "*-junit.xml" format: "junit" - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 10 + timeout_in_minutes: 11 retry: *auto-retry - label: "end-to-end test (parallel, in-memory) (release)" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index c19b9a774fc12..10e57f6fee825 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -115,7 +115,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 16 retry: *auto-retry - label: "end-to-end test for opendal (parallel)" diff --git a/e2e_test/ddl/alter_parallelism.slt b/e2e_test/ddl/alter_parallelism.slt index 4d774321f1d64..025496ca1c571 100644 --- a/e2e_test/ddl/alter_parallelism.slt +++ b/e2e_test/ddl/alter_parallelism.slt @@ -13,9 +13,6 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali statement ok create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id; -statement ok -create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id; - statement ok create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id; @@ -97,28 +94,9 @@ select parallelism from sink_parallelism where name = 's'; ---- FIXED(4) -statement ok -create subscription subscription1 from t with (retention = '1D'); - -query T -select parallelism from subscription_parallelism where name = 'subscription1'; ----- -ADAPTIVE - -statement ok -alter subscription subscription1 set parallelism = 4; - -query T -select parallelism from subscription_parallelism where name = 'subscription1'; ----- -FIXED(4) - statement ok drop sink s; -statement ok -drop subscription subscription1; - statement ok drop materialized view m_join; @@ -179,8 +157,5 @@ drop view mview_parallelism; statement ok drop view sink_parallelism; -statement ok -drop view subscription_parallelism; - statement ok drop view fragment_parallelism; \ No newline at end of file diff --git a/e2e_test/subscription/create_table_and_subscription.slt b/e2e_test/subscription/create_table_and_subscription.slt index 94039f98b11cc..fd43567bc52de 100644 --- a/e2e_test/subscription/create_table_and_subscription.slt +++ b/e2e_test/subscription/create_table_and_subscription.slt @@ -4,5 +4,8 @@ create table t1 (v1 int, v2 int); statement ok insert into t1 values (1,2); +statement ok +flush; + statement ok create subscription sub from t1 with(retention = '1D'); \ No newline at end of file diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index f8e78813801f2..c7fcc56a35ac5 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -35,13 +35,14 @@ def execute_insert(sql,conn): def check_rows_data(expect_vec,rows,status): row = rows[0] + value_len = len(row) for index, value in enumerate(row): - if index == 0: + if index == value_len - 1: continue - if index == 1: + if index == value_len - 2: assert value == status,f"expect {value} but got {status}" continue - assert value == expect_vec[index-2],f"expect {expect_vec[index-2]} but got {value}" + assert value == expect_vec[index],f"expect {expect_vec[index]} but got {value}" def test_cursor_snapshot(): print(f"test_cursor_snapshot") @@ -161,13 +162,16 @@ def test_cursor_since_rw_timestamp(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - rw_timestamp_1 = row[0][0] + valuelen = len(row[0]) + rw_timestamp_1 = row[0][valuelen - 1] check_rows_data([4,4],row,1) row = execute_query("fetch next from cur",conn) - rw_timestamp_2 = row[0][0] - 1 + valuelen = len(row[0]) + rw_timestamp_2 = row[0][valuelen - 1] - 1 check_rows_data([5,5],row,1) row = execute_query("fetch next from cur",conn) - rw_timestamp_3 = row[0][0] + 1 + valuelen = len(row[0]) + rw_timestamp_3 = row[0][valuelen - 1] + 1 check_rows_data([6,6],row,1) row = execute_query("fetch next from cur",conn) assert row == [] diff --git a/proto/catalog.proto b/proto/catalog.proto index fd7d77116c1a3..47b5d719d15cb 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -176,26 +176,25 @@ message Sink { } message Subscription { + enum SubscriptionState { + UNSPECIFIED = 0; + INIT = 1; + CREATED = 2; + } uint32 id = 1; string name = 2; string definition = 3; - repeated common.ColumnOrder plan_pk = 4; - repeated int32 distribution_key = 5; - map properties = 6; - repeated plan_common.ColumnCatalog column_catalogs = 7; + uint64 retention_seconds = 6; uint32 database_id = 8; uint32 schema_id = 9; - repeated uint32 dependent_relations = 10; + uint32 dependent_table_id = 10; optional uint64 initialized_at_epoch = 11; optional uint64 created_at_epoch = 12; uint32 owner = 13; - StreamJobStatus stream_job_status = 14; - optional string initialized_at_cluster_version = 15; optional string created_at_cluster_version = 16; - string subscription_from_name = 17; - optional string subscription_internal_table_name = 18; + SubscriptionState subscription_state = 19; } message Connection { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 80c46dd676a13..58fb645e056eb 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -100,7 +100,6 @@ message DropSinkResponse { message CreateSubscriptionRequest { catalog.Subscription subscription = 1; - stream_plan.StreamFragmentGraph fragment_graph = 2; } message CreateSubscriptionResponse { diff --git a/proto/hummock.proto b/proto/hummock.proto index 6eb4b7887e0e6..faf9ee4f375e0 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -833,6 +833,7 @@ service HummockManagerService { rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse); rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); + rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); } message CompactionConfig { @@ -892,3 +893,13 @@ message BranchedObject { // Compaction group id the SST belongs to. uint64 compaction_group_id = 3; } + +message ListChangeLogEpochsRequest { + uint32 table_id = 1; + uint64 min_epoch = 2; + uint32 max_count = 3; +} + +message ListChangeLogEpochsResponse { + repeated uint64 epochs = 1; +} diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index c3c3d82863f8b..c4b3e949fe4f5 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -765,12 +765,6 @@ message OverWindowNode { OverWindowCachePolicy cache_policy = 5; } -message SubscriptionNode { - catalog.Subscription subscription_catalog = 1; - // log store should have a table. - catalog.Table log_store_table = 2; -} - message StreamNode { oneof node_body { SourceNode source = 100; @@ -814,7 +808,6 @@ message StreamNode { StreamFsFetchNode stream_fs_fetch = 138; StreamCdcScanNode stream_cdc_scan = 139; CdcFilterNode cdc_filter = 140; - SubscriptionNode subscription = 141; SourceBackfillNode source_backfill = 142; } // The id for the operator. This is local per mview. @@ -911,7 +904,6 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_VALUES = 64; FRAGMENT_TYPE_FLAG_DML = 128; FRAGMENT_TYPE_FLAG_CDC_FILTER = 256; - FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512; FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024; } diff --git a/src/common/src/catalog/internal_table.rs b/src/common/src/catalog/internal_table.rs index bac22f3e7ae95..ba6370cdfc634 100644 --- a/src/common/src/catalog/internal_table.rs +++ b/src/common/src/catalog/internal_table.rs @@ -43,13 +43,6 @@ pub fn valid_table_name(table_name: &str) -> bool { !INTERNAL_TABLE_NAME.is_match(table_name) } -pub fn is_subscription_internal_table(subscription_name: &str, table_name: &str) -> bool { - let regex = - Regex::new(format!(r"__internal_{}_(\d+)_subscription_(\d+)", subscription_name).as_str()) - .unwrap(); - regex.is_match(table_name) -} - pub fn get_dist_key_in_pk_indices>( dist_key_indices: &[I], pk_indices: &[I], diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index 81f1189693c0d..d7975357e09c7 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -200,12 +200,6 @@ pub fn visit_stream_node_tables_inner( optional!(node.table, "Sink") } - // Subscription - NodeBody::Subscription(node) => { - // A Subscription should have a log store - optional!(node.log_store_table, "Subscription") - } - // Now NodeBody::Now(node) => { always!(node.state_table, "Now"); diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 8a77ca5650f9d..391d6c41a2985 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -560,13 +560,11 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an let subscription_models: Vec = subscriptions .into_iter() .map(|s| { - object_dependencies.extend(s.dependent_relations.iter().map(|id| { - object_dependency::ActiveModel { - id: NotSet, - oid: Set(*id as _), - used_by: Set(s.id as _), - } - })); + object_dependencies.push(object_dependency::ActiveModel { + id: NotSet, + oid: Set(s.dependent_table_id as _), + used_by: Set(s.id as _), + }); s.into() }) .collect(); diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 7994fca811a35..b97c551f2ef78 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -117,11 +117,7 @@ pub trait CatalogWriter: Send + Sync { affected_table_change: Option, ) -> Result<()>; - async fn create_subscription( - &self, - subscription: PbSubscription, - graph: StreamFragmentGraph, - ) -> Result<()>; + async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>; async fn create_function(&self, function: PbFunction) -> Result<()>; @@ -204,6 +200,13 @@ pub trait CatalogWriter: Send + Sync { object: alter_set_schema_request::Object, new_schema_id: u32, ) -> Result<()>; + + async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Result>; } #[derive(Clone)] @@ -339,15 +342,8 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn create_subscription( - &self, - subscription: PbSubscription, - graph: StreamFragmentGraph, - ) -> Result<()> { - let version = self - .meta_client - .create_subscription(subscription, graph) - .await?; + async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> { + let version = self.meta_client.create_subscription(subscription).await?; self.wait_version(version).await } @@ -568,6 +564,18 @@ impl CatalogWriter for CatalogWriterImpl { Ok(()) } + + async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Result> { + Ok(self + .meta_client + .list_change_log_epochs(table_id, min_epoch, max_count) + .await?) + } } impl CatalogWriterImpl { diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 1409948e07bd9..6efb31614a922 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -13,14 +13,13 @@ // limitations under the License. use core::str::FromStr; -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; -use itertools::Itertools; -use risingwave_common::catalog::{ColumnCatalog, TableId, UserId, OBJECT_ID_PLACEHOLDER}; +use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER}; use risingwave_common::types::Interval; use risingwave_common::util::epoch::Epoch; -use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::{PbStreamJobStatus, PbSubscription}; +use risingwave_pb::catalog::subscription::PbSubscriptionState; +use risingwave_pb::catalog::PbSubscription; use thiserror_ext::AsReport; use super::OwnedByUserCatalog; @@ -38,21 +37,8 @@ pub struct SubscriptionCatalog { /// Full SQL definition of the subscription. For debug now. pub definition: String, - /// All columns of the subscription. Note that this is NOT sorted by columnId in the vector. - pub columns: Vec, - - /// Primiary keys of the subscription. Derived by the frontend. - pub plan_pk: Vec, - - /// Distribution key indices of the subscription. For example, if `distribution_key = [1, 2]`, then the - /// distribution keys will be `columns[1]` and `columns[2]`. - pub distribution_key: Vec, - - /// The properties of the subscription, only `retention`. - pub properties: BTreeMap, - - /// The upstream table name on which the subscription depends - pub subscription_from_name: String, + /// The retention seconds of the subscription. + pub retention_seconds: u64, /// The database id pub database_id: u32, @@ -61,7 +47,7 @@ pub struct SubscriptionCatalog { pub schema_id: u32, /// The subscription depends on the upstream list - pub dependent_relations: Vec, + pub dependent_table_id: TableId, /// The user id pub owner: UserId, @@ -71,7 +57,6 @@ pub struct SubscriptionCatalog { pub created_at_cluster_version: Option, pub initialized_at_cluster_version: Option, - pub subscription_internal_table_name: Option, } #[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)] @@ -97,14 +82,8 @@ impl SubscriptionId { } impl SubscriptionCatalog { - pub fn add_dependent_relations(mut self, mut dependent_relations: HashSet) -> Self { - dependent_relations.extend(self.dependent_relations); - self.dependent_relations = dependent_relations.into_iter().collect(); - self - } - - pub fn get_retention_seconds(&self) -> Result { - let retention_seconds_str = self.properties.get("retention").ok_or_else(|| { + pub fn set_retention_seconds(&mut self, properties: BTreeMap) -> Result<()> { + let retention_seconds_str = properties.get("retention").ok_or_else(|| { ErrorCode::InternalError("Subscription retention time not set.".to_string()) })?; let retention_seconds = (Interval::from_str(retention_seconds_str) @@ -116,47 +95,29 @@ impl SubscriptionCatalog { })? .epoch_in_micros() / 1000000) as u64; - - Ok(retention_seconds) + self.retention_seconds = retention_seconds; + Ok(()) } pub fn create_sql(&self) -> String { self.definition.clone() } - pub fn get_log_store_name(&self) -> String { - self.subscription_internal_table_name.clone().unwrap() - } - pub fn to_proto(&self) -> PbSubscription { - assert!(!self.dependent_relations.is_empty()); PbSubscription { id: self.id.subscription_id, name: self.name.clone(), definition: self.definition.clone(), - column_catalogs: self - .columns - .iter() - .map(|column| column.to_protobuf()) - .collect_vec(), - plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(), - distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(), - subscription_from_name: self.subscription_from_name.clone(), - properties: self.properties.clone().into_iter().collect(), + retention_seconds: self.retention_seconds, database_id: self.database_id, schema_id: self.schema_id, - dependent_relations: self - .dependent_relations - .iter() - .map(|k| k.table_id) - .collect_vec(), initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0), created_at_epoch: self.created_at_epoch.map(|e| e.0), owner: self.owner.into(), - stream_job_status: PbStreamJobStatus::Creating.into(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), created_at_cluster_version: self.created_at_cluster_version.clone(), - subscription_internal_table_name: self.subscription_internal_table_name.clone(), + dependent_table_id: self.dependent_table_id.table_id, + subscription_state: PbSubscriptionState::Init.into(), } } } @@ -167,32 +128,15 @@ impl From<&PbSubscription> for SubscriptionCatalog { id: SubscriptionId::new(prost.id), name: prost.name.clone(), definition: prost.definition.clone(), - columns: prost - .column_catalogs - .iter() - .map(|c| ColumnCatalog::from(c.clone())) - .collect_vec(), - plan_pk: prost - .plan_pk - .iter() - .map(ColumnOrder::from_protobuf) - .collect_vec(), - distribution_key: prost.distribution_key.iter().map(|k| *k as _).collect_vec(), - subscription_from_name: prost.subscription_from_name.clone(), - properties: prost.properties.clone().into_iter().collect(), + retention_seconds: prost.retention_seconds, database_id: prost.database_id, schema_id: prost.schema_id, - dependent_relations: prost - .dependent_relations - .iter() - .map(|k| TableId::new(*k)) - .collect_vec(), + dependent_table_id: TableId::new(prost.dependent_table_id), owner: prost.owner.into(), created_at_epoch: prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), created_at_cluster_version: prost.created_at_cluster_version.clone(), initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(), - subscription_internal_table_name: prost.subscription_internal_table_name.clone(), } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index b21e36481e20e..5f0155e9dd46a 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -85,13 +85,6 @@ pub async fn handle_alter_parallelism( session.check_privilege_for_drop_alter(schema_name, &**sink)?; sink.id.sink_id() } - StatementType::ALTER_SUBSCRIPTION => { - let (subscription, schema_name) = - reader.get_subscription_by_name(db_name, schema_path, &real_table_name)?; - - session.check_privilege_for_drop_alter(schema_name, &**subscription)?; - subscription.id.subscription_id() - } _ => bail!( "invalid statement type for alter parallelism: {:?}", stmt_type diff --git a/src/frontend/src/handler/create_subscription.rs b/src/frontend/src/handler/create_subscription.rs index 3b21efd10203c..7da1a9d1683ed 100644 --- a/src/frontend/src/handler/create_subscription.rs +++ b/src/frontend/src/handler/create_subscription.rs @@ -17,94 +17,56 @@ use std::rc::Rc; use either::Either; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::UserId; -use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; -use risingwave_sqlparser::ast::{CreateSubscriptionStatement, Query}; +use risingwave_sqlparser::ast::CreateSubscriptionStatement; -use super::privilege::resolve_query_privileges; -use super::util::gen_query_from_table_name; use super::{HandlerArgs, RwPgResponse}; -use crate::catalog::subscription_catalog::SubscriptionCatalog; +use crate::catalog::subscription_catalog::{SubscriptionCatalog, SubscriptionId}; use crate::error::Result; -use crate::optimizer::RelationCollectorVisitor; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; -use crate::{ - build_graph, Binder, Explain, OptimizerContext, OptimizerContextRef, PlanRef, Planner, -}; +use crate::{Binder, OptimizerContext, OptimizerContextRef}; -// used to store result of `gen_subscription_plan` -pub struct SubscriptionPlanContext { - pub query: Box, - pub subscription_plan: PlanRef, - pub subscription_catalog: SubscriptionCatalog, -} - -pub fn gen_subscription_plan( +pub fn create_subscription_catalog( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSubscriptionStatement, -) -> Result { +) -> Result { let db_name = session.database(); - let (schema_name, subscription_name) = + let (subscription_schema_name, subscription_name) = Binder::resolve_schema_qualified_name(db_name, stmt.subscription_name.clone())?; - let subscription_from_table_name = stmt - .subscription_from - .0 - .last() - .unwrap() - .real_value() - .clone(); - let query = Box::new(gen_query_from_table_name(stmt.subscription_from)); - - let (database_id, schema_id) = - session.get_database_and_schema_id_for_create(schema_name.clone())?; - + let (table_schema_name, subscription_from_table_name) = + Binder::resolve_schema_qualified_name(db_name, stmt.subscription_from.clone())?; + let (table_database_id, table_schema_id) = + session.get_database_and_schema_id_for_create(table_schema_name.clone())?; + let (subscription_database_id, subscription_schema_id) = + session.get_database_and_schema_id_for_create(subscription_schema_name.clone())?; let definition = context.normalized_sql().to_owned(); - - let (dependent_relations, bound) = { - let mut binder = Binder::new_for_stream(session); - let bound = binder.bind_query(*query.clone())?; - (binder.included_relations(), bound) - }; - - let check_items = resolve_query_privileges(&bound); - session.check_privileges(&check_items)?; - - let with_options = context.with_options().clone(); - let plan_root = Planner::new(context).plan_query(bound)?; - - let subscription_plan = plan_root.gen_subscription_plan( - database_id, - schema_id, - dependent_relations.clone(), - subscription_name, + let dependent_table_id = session + .get_table_by_name( + &subscription_from_table_name, + table_database_id, + table_schema_id, + )? + .id; + + let mut subscription_catalog = SubscriptionCatalog { + id: SubscriptionId::placeholder(), + name: subscription_name, definition, - with_options, - false, - subscription_from_table_name, - UserId::new(session.user_id()), - )?; - - let subscription_catalog = subscription_plan.subscription_catalog(); - - let subscription_plan: PlanRef = subscription_plan.into(); - - let ctx = subscription_plan.ctx(); - let explain_trace = ctx.is_explain_trace(); - if explain_trace { - ctx.trace("Create Subscription:"); - ctx.trace(subscription_plan.explain_to_string()); - } + retention_seconds: 0, + database_id: subscription_database_id, + schema_id: subscription_schema_id, + dependent_table_id, + owner: UserId::new(session.user_id()), + initialized_at_epoch: None, + created_at_epoch: None, + created_at_cluster_version: None, + initialized_at_cluster_version: None, + }; - let dependent_relations = - RelationCollectorVisitor::collect_with(dependent_relations, subscription_plan.clone()); + subscription_catalog.set_retention_seconds(context.with_options().clone().into_inner())?; - let subscription_catalog = subscription_catalog.add_dependent_relations(dependent_relations); - Ok(SubscriptionPlanContext { - query, - subscription_plan, - subscription_catalog, - }) + Ok(subscription_catalog) } pub async fn handle_create_subscription( @@ -120,27 +82,9 @@ pub async fn handle_create_subscription( )? { return Ok(resp); }; - - let (subscription_catalog, graph) = { + let subscription_catalog = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); - - let SubscriptionPlanContext { - query: _, - subscription_plan, - subscription_catalog, - } = gen_subscription_plan(&session, context.clone(), stmt)?; - - let mut graph = build_graph(subscription_plan)?; - - graph.parallelism = - session - .config() - .streaming_parallelism() - .map(|parallelism| Parallelism { - parallelism: parallelism.get(), - }); - - (subscription_catalog, graph) + create_subscription_catalog(&session, context.clone(), stmt)? }; let _job_guard = @@ -156,7 +100,7 @@ pub async fn handle_create_subscription( let catalog_writer = session.catalog_writer()?; catalog_writer - .create_subscription(subscription_catalog.to_proto(), graph) + .create_subscription(subscription_catalog.to_proto()) .await?; Ok(PgResponse::empty_result(StatementType::CREATE_SUBSCRIPTION)) diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index c9e338c6074b2..6bd4e300ec0fa 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -12,25 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::rc::Rc; - -use fixedbitset::FixedBitSet; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::session_config::QueryMode; use risingwave_common::util::epoch::Epoch; use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; -use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult}; -use super::util::{convert_epoch_to_logstore_i64, convert_unix_millis_to_logstore_i64}; +use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter}; +use super::util::convert_unix_millis_to_logstore_u64; use super::RwPgResponse; use crate::error::{ErrorCode, Result}; use crate::handler::query::create_stream; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; -use crate::optimizer::property::{Order, RequiredDist}; -use crate::optimizer::PlanRoot; -use crate::{Binder, OptimizerContext, PgResponseStream, PlanRef, TableCatalog}; +use crate::{Binder, OptimizerContext, PgResponseStream}; pub async fn handle_declare_cursor( handle_args: HandlerArgs, @@ -65,19 +58,20 @@ async fn handle_declare_subscription_cursor( let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone(); let subscription = session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?; + let table = session.get_table_by_id(&subscription.dependent_table_id)?; // Start the first query of cursor, which includes querying the table and querying the subscription's logstore let start_rw_timestamp = match rw_timestamp { Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => { - check_cursor_unix_millis(start_rw_timestamp, subscription.get_retention_seconds()?)?; - Some(convert_unix_millis_to_logstore_i64(start_rw_timestamp)) - } - Some(risingwave_sqlparser::ast::Since::ProcessTime) => { - Some(convert_epoch_to_logstore_i64(Epoch::now().0)) + check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?; + Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp)) } + Some(risingwave_sqlparser::ast::Since::ProcessTime) => Some(Epoch::now().0), Some(risingwave_sqlparser::ast::Since::Begin) => { let min_unix_millis = - Epoch::now().as_unix_millis() - subscription.get_retention_seconds()? * 1000; - Some(convert_unix_millis_to_logstore_i64(min_unix_millis)) + Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000; + let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis(); + let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis); + Some(convert_unix_millis_to_logstore_u64(min_unix_millis)) } None => None, }; @@ -88,6 +82,7 @@ async fn handle_declare_subscription_cursor( cursor_name.clone(), start_rw_timestamp, subscription, + table, &handle_args, ) .await?; @@ -118,7 +113,7 @@ async fn handle_declare_query_cursor( query: Box, ) -> Result { let (row_stream, pg_descs) = - create_stream_for_cursor(handle_args.clone(), Statement::Query(query)).await?; + create_stream_for_cursor_stmt(handle_args.clone(), Statement::Query(query)).await?; handle_args .session .get_cursor_manager() @@ -127,7 +122,7 @@ async fn handle_declare_query_cursor( Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR)) } -pub async fn create_stream_for_cursor( +pub async fn create_stream_for_cursor_stmt( handle_args: HandlerArgs, stmt: Statement, ) -> Result<(PgResponseStream, Vec)> { @@ -139,54 +134,3 @@ pub async fn create_stream_for_cursor( }; create_stream(session, plan_fragmenter_result, vec![]).await } - -pub fn create_batch_plan_for_cursor( - table_catalog: std::sync::Arc, - handle_args: HandlerArgs, - old_epoch: u64, - new_epoch: u64, -) -> Result { - let context = OptimizerContext::from_handler_args(handle_args.clone()); - let out_col_idx = table_catalog - .columns - .iter() - .enumerate() - .filter(|(_, v)| !v.is_hidden) - .map(|(i, _)| i) - .collect::>(); - let core = generic::LogScan::new( - table_catalog.name.clone(), - out_col_idx, - Rc::new(table_catalog.table_desc()), - Rc::new(context), - old_epoch, - new_epoch, - ); - let batch_log_seq_scan = BatchLogSeqScan::new(core); - let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len()); - let out_names = batch_log_seq_scan.core().column_names(); - // Here we just need a plan_root to call the method, only out_fields and out_names will be used - let plan_root = PlanRoot::new_with_batch_plan( - PlanRef::from(batch_log_seq_scan.clone()), - RequiredDist::single(), - Order::default(), - out_fields, - out_names, - ); - let schema = batch_log_seq_scan.core().schema().clone(); - let (batch_log_seq_scan, query_mode) = match handle_args.session.config().query_mode() { - QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local), - QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local), - QueryMode::Distributed => ( - plan_root.gen_batch_distributed_plan()?, - QueryMode::Distributed, - ), - }; - Ok(BatchQueryPlanResult { - plan: batch_log_seq_scan, - query_mode, - schema, - stmt_type: StatementType::SELECT, - dependent_relations: table_catalog.dependent_relations.clone(), - }) -} diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index cfc416f0d8b0d..9f46087c206e8 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -22,7 +22,6 @@ use thiserror_ext::AsReport; use super::create_index::{gen_create_index_plan, resolve_index_schema}; use super::create_mv::gen_create_mv_plan; use super::create_sink::{gen_sink_plan, get_partition_compute_info}; -use super::create_subscription::gen_subscription_plan; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; @@ -135,9 +134,11 @@ async fn do_handle_explain( ).into()); } - Statement::CreateSubscription { stmt } => { - gen_subscription_plan(&session, context.clone(), stmt) - .map(|plan| plan.subscription_plan) + Statement::CreateSubscription { .. } => { + return Err(ErrorCode::NotSupported( + "EXPLAIN CREATE SUBSCRIPTION".into(), + "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into() + ).into()); } Statement::CreateIndex { name, diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 794a5a7125051..7405ce7460c97 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -853,23 +853,6 @@ pub async fn handle( ) .await } - Statement::AlterSubscription { - name, - operation: - AlterSubscriptionOperation::SetParallelism { - parallelism, - deferred, - }, - } => { - alter_parallelism::handle_alter_parallelism( - handler_args, - name, - parallelism, - StatementType::ALTER_SUBSCRIPTION, - deferred, - ) - .await - } Statement::AlterSource { name, operation: AlterSourceOperation::RenameSource { source_name }, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 011b078958946..33392049fce3a 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -32,12 +32,11 @@ use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Time use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{ - BinaryOperator, CompatibleSourceSchema, ConnectorSchema, Expr, ObjectName, OrderByExpr, Query, - Select, SelectItem, SetExpr, TableFactor, TableWithJoins, Value, + CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr, + TableFactor, TableWithJoins, }; use crate::error::{ErrorCode, Result as RwResult}; -use crate::session::cursor_manager::{KV_LOG_STORE_EPOCH, KV_LOG_STORE_SEQ_ID, KV_LOG_STORE_VNODE}; use crate::session::{current, SessionImpl}; pin_project! { @@ -222,59 +221,12 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query { } } -pub fn gen_query_from_logstore_ge_rw_timestamp(logstore_name: &str, rw_timestamp: i64) -> Query { - let table_factor = TableFactor::Table { - name: ObjectName(vec![logstore_name.into()]), - alias: None, - as_of: None, - }; - let from = vec![TableWithJoins { - relation: table_factor, - joins: vec![], - }]; - let selection = Some(Expr::BinaryOp { - left: Box::new(Expr::Identifier(KV_LOG_STORE_EPOCH.into())), - op: BinaryOperator::GtEq, - right: Box::new(Expr::Value(Value::Number(rw_timestamp.to_string()))), - }); - let except_columns = vec![ - Expr::Identifier(KV_LOG_STORE_SEQ_ID.into()), - Expr::Identifier(KV_LOG_STORE_VNODE.into()), - ]; - let select = Select { - from, - projection: vec![SelectItem::Wildcard(Some(except_columns))], - selection, - ..Default::default() - }; - let order_by = vec![OrderByExpr { - expr: Expr::Identifier(KV_LOG_STORE_EPOCH.into()), - asc: None, - nulls_first: None, - }]; - let body = SetExpr::Select(Box::new(select)); - Query { - with: None, - body, - order_by, - limit: None, - offset: None, - fetch: None, - } -} - -pub fn convert_unix_millis_to_logstore_i64(unix_millis: u64) -> i64 { - let epoch = Epoch::from_unix_millis(unix_millis); - convert_epoch_to_logstore_i64(epoch.0) -} - -pub fn convert_epoch_to_logstore_i64(epoch: u64) -> i64 { - epoch as i64 ^ (1i64 << 63) +pub fn convert_unix_millis_to_logstore_u64(unix_millis: u64) -> u64 { + Epoch::from_unix_millis(unix_millis).0 } -pub fn convert_logstore_i64_to_unix_millis(logstore_i64: i64) -> u64 { - let epoch = Epoch::from(logstore_i64 as u64 ^ (1u64 << 63)); - epoch.as_unix_millis() +pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { + Epoch::from(logstore_u64).as_unix_millis() } #[cfg(test)] diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 3565a8715bb98..67b265e02d3a2 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -40,7 +40,7 @@ mod plan_expr_visitor; mod rule; use std::assert_matches::assert_matches; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use fixedbitset::FixedBitSet; use itertools::Itertools as _; @@ -50,7 +50,7 @@ use plan_expr_rewriter::ConstEvalRewriter; use property::Order; use risingwave_common::bail; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, UserId, + ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, }; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -64,7 +64,7 @@ use self::plan_node::generic::{self, PhysicalPlanRef}; use self::plan_node::{ stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, - StreamSubscription, StreamWatermarkFilter, ToStreamContext, + StreamWatermarkFilter, ToStreamContext, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -966,45 +966,6 @@ impl PlanRoot { ) } - #[allow(clippy::too_many_arguments)] - /// Optimize and generate a create subscription plan. - pub fn gen_subscription_plan( - mut self, - database_id: u32, - schema_id: u32, - dependent_relations: HashSet, - subscription_name: String, - definition: String, - properties: WithOptions, - emit_on_window_close: bool, - subscription_from_table_name: String, - user_id: UserId, - ) -> Result { - let stream_scan_type = StreamScanType::UpstreamOnly; - assert_eq!(self.phase, PlanPhase::Logical); - assert_eq!(self.plan.convention(), Convention::Logical); - let stream_plan = - self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?; - assert_eq!(self.phase, PlanPhase::Stream); - assert_eq!(stream_plan.convention(), Convention::Stream); - - StreamSubscription::create( - database_id, - schema_id, - dependent_relations, - stream_plan, - subscription_name, - subscription_from_table_name, - self.required_dist.clone(), - self.required_order.clone(), - self.out_fields.clone(), - self.out_names.clone(), - definition, - properties, - user_id, - ) - } - pub fn should_use_arrangement_backfill(&self) -> bool { let ctx = self.plan.ctx(); let session_ctx = ctx.session_ctx(); diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 27e1c140b5983..b35b97a724bcb 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -899,7 +899,6 @@ mod stream_sort; mod stream_source; mod stream_source_scan; mod stream_stateless_simple_agg; -mod stream_subscription; mod stream_table_scan; mod stream_topn; mod stream_values; @@ -1002,7 +1001,6 @@ pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; pub use stream_source_scan::StreamSourceScan; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; -pub use stream_subscription::StreamSubscription; pub use stream_table_scan::StreamTableScan; pub use stream_temporal_join::StreamTemporalJoin; pub use stream_topn::StreamTopN; @@ -1100,7 +1098,6 @@ macro_rules! for_all_plan_nodes { , { Stream, TableScan } , { Stream, CdcTableScan } , { Stream, Sink } - , { Stream, Subscription } , { Stream, Source } , { Stream, SourceScan } , { Stream, HashJoin } @@ -1223,7 +1220,6 @@ macro_rules! for_stream_plan_nodes { , { Stream, TableScan } , { Stream, CdcTableScan } , { Stream, Sink } - , { Stream, Subscription } , { Stream, Source } , { Stream, SourceScan } , { Stream, HashAgg } diff --git a/src/frontend/src/optimizer/plan_node/stream_subscription.rs b/src/frontend/src/optimizer/plan_node/stream_subscription.rs deleted file mode 100644 index 8b165d5bbacbc..0000000000000 --- a/src/frontend/src/optimizer/plan_node/stream_subscription.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::assert_matches::assert_matches; -use std::collections::HashSet; - -use fixedbitset::FixedBitSet; -use itertools::Itertools; -use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, TableId, UserId}; -use risingwave_pb::stream_plan::stream_node::PbNodeBody; - -use super::derive::{derive_columns, derive_pk}; -use super::expr_visitable::ExprVisitable; -use super::stream::prelude::{GenericPlanRef, PhysicalPlanRef}; -use super::utils::{ - childless_record, infer_kv_log_store_table_catalog_inner, Distill, IndicesDisplay, -}; -use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode}; -use crate::catalog::subscription_catalog::{SubscriptionCatalog, SubscriptionId}; -use crate::error::Result; -use crate::optimizer::property::{Distribution, Order, RequiredDist}; -use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::{PlanRef, TableCatalog, WithOptions}; - -/// [`StreamSubscription`] represents a subscription at the very end of the graph. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct StreamSubscription { - pub base: PlanBase, - input: PlanRef, - subscription_catalog: SubscriptionCatalog, -} - -impl StreamSubscription { - #[must_use] - pub fn new(input: PlanRef, subscription_catalog: SubscriptionCatalog) -> Self { - let base = input - .plan_base() - .into_stream() - .expect("input should be stream plan") - .clone_with_new_plan_id(); - Self { - base, - input, - subscription_catalog, - } - } - - pub fn subscription_catalog(&self) -> SubscriptionCatalog { - self.subscription_catalog.clone() - } - - #[allow(clippy::too_many_arguments)] - pub fn create( - database_id: u32, - schema_id: u32, - dependent_relations: HashSet, - input: PlanRef, - name: String, - subscription_from_name: String, - user_distributed_by: RequiredDist, - user_order_by: Order, - user_cols: FixedBitSet, - out_names: Vec, - definition: String, - properties: WithOptions, - user_id: UserId, - ) -> Result { - let columns = derive_columns(input.schema(), out_names, &user_cols)?; - let (input, subscription) = Self::derive_subscription_catalog( - database_id, - schema_id, - dependent_relations, - input, - user_distributed_by, - name, - subscription_from_name, - user_order_by, - columns, - definition, - properties, - user_id, - )?; - Ok(Self::new(input, subscription)) - } - - #[allow(clippy::too_many_arguments)] - fn derive_subscription_catalog( - database_id: u32, - schema_id: u32, - dependent_relations: HashSet, - input: PlanRef, - user_distributed_by: RequiredDist, - name: String, - subscription_from_name: String, - user_order_by: Order, - columns: Vec, - definition: String, - properties: WithOptions, - user_id: UserId, - ) -> Result<(PlanRef, SubscriptionCatalog)> { - let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let required_dist = match input.distribution() { - Distribution::Single => RequiredDist::single(), - _ => { - assert_matches!(user_distributed_by, RequiredDist::Any); - RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()) - } - }; - let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; - let distribution_key = input.distribution().dist_column_indices().to_vec(); - let subscription_desc = SubscriptionCatalog { - database_id, - schema_id, - dependent_relations: dependent_relations.into_iter().collect(), - id: SubscriptionId::placeholder(), - name, - subscription_from_name, - definition, - columns, - plan_pk: pk, - distribution_key, - properties: properties.into_inner(), - owner: user_id, - initialized_at_epoch: None, - created_at_epoch: None, - created_at_cluster_version: None, - initialized_at_cluster_version: None, - subscription_internal_table_name: None, - }; - Ok((input, subscription_desc)) - } - - /// The table schema is: | epoch | seq id | row op | subscription columns | - /// Pk is: | epoch | seq id | - fn infer_kv_log_store_table_catalog(&self) -> TableCatalog { - infer_kv_log_store_table_catalog_inner(&self.input, &self.subscription_catalog.columns) - } -} - -impl PlanTreeNodeUnary for StreamSubscription { - fn input(&self) -> PlanRef { - self.input.clone() - } - - fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.subscription_catalog.clone()) - // TODO(nanderstabel): Add assertions (assert_eq!) - } -} - -impl_plan_tree_node_for_unary! { StreamSubscription } - -impl Distill for StreamSubscription { - fn distill<'a>(&self) -> XmlNode<'a> { - let column_names = self - .subscription_catalog - .columns - .iter() - .map(|col| col.name_with_hidden().to_string()) - .map(Pretty::from) - .collect(); - let column_names = Pretty::Array(column_names); - let mut vec = Vec::with_capacity(2); - vec.push(("columns", column_names)); - let pk = IndicesDisplay { - indices: &self - .subscription_catalog - .plan_pk - .iter() - .map(|k| k.column_index) - .collect_vec(), - schema: self.base.schema(), - }; - vec.push(("pk", pk.distill())); - childless_record("Streamsubscription", vec) - } -} - -impl StreamNode for StreamSubscription { - fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { - use risingwave_pb::stream_plan::*; - - // We need to create a table for subscription with a kv log store. - let table = self - .infer_kv_log_store_table_catalog() - .with_id(state.gen_table_id_wrapped()); - - PbNodeBody::Subscription(SubscriptionNode { - subscription_catalog: Some(self.subscription_catalog.to_proto()), - log_store_table: Some(table.to_internal_table_prost()), - }) - } -} - -impl ExprRewritable for StreamSubscription {} - -impl ExprVisitable for StreamSubscription {} diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7ff790748a761..c0606b646a4a2 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -85,7 +85,7 @@ use crate::catalog::connection_catalog::ConnectionCatalog; use crate::catalog::root_catalog::Catalog; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::{ - check_schema_writable, CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, + check_schema_writable, CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, TableId, }; use crate::error::{ErrorCode, Result, RwError}; use crate::handler::describe::infer_describe; @@ -111,7 +111,7 @@ use crate::user::user_authentication::md5_hash_with_salt; use crate::user::user_manager::UserInfoManager; use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl}; use crate::user::UserId; -use crate::{FrontendOpts, PgResponseStream}; +use crate::{FrontendOpts, PgResponseStream, TableCatalog}; pub(crate) mod current; pub(crate) mod cursor_manager; @@ -905,6 +905,42 @@ impl SessionImpl { Ok(subscription.clone()) } + pub fn get_table_by_id(&self, table_id: &TableId) -> Result> { + let catalog_reader = self.env().catalog_reader().read_guard(); + Ok(catalog_reader.get_table_by_id(table_id)?.clone()) + } + + pub fn get_table_by_name( + &self, + table_name: &str, + db_id: u32, + schema_id: u32, + ) -> Result> { + let catalog_reader = self.env().catalog_reader().read_guard(); + let table = catalog_reader + .get_schema_by_id(&DatabaseId::from(db_id), &SchemaId::from(schema_id))? + .get_table_by_name(table_name) + .ok_or_else(|| { + Error::new( + ErrorKind::InvalidInput, + format!("table \"{}\" does not exist", table_name), + ) + })?; + Ok(table.clone()) + } + + pub async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Result> { + self.env + .catalog_writer + .list_change_log_epochs(table_id, min_epoch, max_count) + .await + } + pub fn clear_cancel_query_flag(&self) { let mut flag = self.current_query_cancel_flag.lock(); *flag = None; diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 0ada5f6ccd5e4..13eaec03b1663 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -12,34 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::ops::Index; use core::time::Duration; use std::collections::{HashMap, VecDeque}; +use std::rc::Rc; use std::sync::Arc; use std::time::Instant; use bytes::Bytes; +use fixedbitset::FixedBitSet; use futures::StreamExt; -use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; +use pgwire::pg_response::StatementType; use pgwire::types::Row; +use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; +use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::error::{ErrorCode, Result}; -use crate::handler::declare_cursor::create_stream_for_cursor; -use crate::handler::util::{ - convert_epoch_to_logstore_i64, convert_logstore_i64_to_unix_millis, - gen_query_from_logstore_ge_rw_timestamp, gen_query_from_table_name, -}; +use crate::handler::declare_cursor::create_stream_for_cursor_stmt; +use crate::handler::query::{create_stream, gen_batch_plan_fragmenter, BatchQueryPlanResult}; +use crate::handler::util::{convert_logstore_u64_to_unix_millis, gen_query_from_table_name}; use crate::handler::HandlerArgs; -use crate::PgResponseStream; - -pub const KV_LOG_STORE_EPOCH: &str = "kv_log_store_epoch"; -const KV_LOG_STORE_ROW_OP: &str = "kv_log_store_row_op"; -pub const KV_LOG_STORE_SEQ_ID: &str = "kv_log_store_seq_id"; -pub const KV_LOG_STORE_VNODE: &str = "kv_log_store_vnode"; +use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; +use crate::optimizer::property::{Order, RequiredDist}; +use crate::optimizer::PlanRoot; +use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog}; pub enum Cursor { Subscription(SubscriptionCursor), @@ -104,10 +103,10 @@ impl QueryCursor { enum State { InitLogStoreQuery { // The rw_timestamp used to initiate the query to read from subscription logstore. - seek_timestamp: i64, + seek_timestamp: u64, // If specified, the expected_timestamp must be an exact match for the next rw_timestamp. - expected_timestamp: Option, + expected_timestamp: Option, }, Fetch { // Whether the query is reading from snapshot @@ -116,7 +115,7 @@ enum State { from_snapshot: bool, // The rw_timestamp used to initiate the query to read from subscription logstore. - rw_timestamp: i64, + rw_timestamp: u64, // The row stream to from the batch query read. // It is returned from the batch execution. @@ -128,6 +127,8 @@ enum State { // A cache to store the remaining rows from the row stream. remaining_rows: VecDeque, + + expected_timestamp: Option, }, Invalid, } @@ -135,6 +136,7 @@ enum State { pub struct SubscriptionCursor { cursor_name: String, subscription: Arc, + table: Arc, cursor_need_drop_time: Instant, state: State, } @@ -142,8 +144,9 @@ pub struct SubscriptionCursor { impl SubscriptionCursor { pub async fn new( cursor_name: String, - start_timestamp: Option, + start_timestamp: Option, subscription: Arc, + table: Arc, handle_args: &HandlerArgs, ) -> Result { let state = if let Some(start_timestamp) = start_timestamp { @@ -157,7 +160,7 @@ impl SubscriptionCursor { // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? let (row_stream, pg_descs) = - Self::initiate_query(None, &subscription, handle_args.clone()).await?; + Self::initiate_query(None, &table, handle_args.clone()).await?; let pinned_epoch = handle_args .session .get_pinned_snapshot() @@ -171,7 +174,7 @@ impl SubscriptionCursor { ) })? .0; - let start_timestamp = convert_epoch_to_logstore_i64(pinned_epoch); + let start_timestamp = pinned_epoch; State::Fetch { from_snapshot: true, @@ -179,14 +182,16 @@ impl SubscriptionCursor { row_stream, pg_descs, remaining_rows: VecDeque::new(), + expected_timestamp: None, } }; let cursor_need_drop_time = - Instant::now() + Duration::from_secs(subscription.get_retention_seconds()?); + Instant::now() + Duration::from_secs(subscription.retention_seconds); Ok(Self { cursor_name, subscription, + table, cursor_need_drop_time, state, }) @@ -205,64 +210,42 @@ impl SubscriptionCursor { let from_snapshot = false; // Initiate a new batch query to continue fetching - let (mut row_stream, pg_descs) = Self::initiate_query( - Some(*seek_timestamp), - &self.subscription, + match Self::get_next_rw_timestamp( + *seek_timestamp, + self.table.id.table_id, + *expected_timestamp, handle_args.clone(), ) - .await?; - self.cursor_need_drop_time = Instant::now() - + Duration::from_secs(self.subscription.get_retention_seconds()?); - - // Try refill remaining rows - let mut remaining_rows = VecDeque::new(); - Self::try_refill_remaining_rows(&mut row_stream, &mut remaining_rows).await?; - - // Get the rw_timestamp in the first row returned by the query if any. - // new_row_rw_timestamp == None means the query returns empty result. - let new_row_rw_timestamp: Option = remaining_rows.front().map(|row| { - std::str::from_utf8(row.index(0).as_ref().unwrap()) - .unwrap() - .parse() - .unwrap() - }); - - // Check expected_timestamp against the rw_timestamp of the first row. - // Return an error if there is no new row or there is a mismatch. - if let Some(expected_timestamp) = expected_timestamp { - let expected_timestamp = *expected_timestamp; - if new_row_rw_timestamp.is_none() - || new_row_rw_timestamp.unwrap() != expected_timestamp - { - // Transition to Invalid state and return and error - self.state = State::Invalid; - return Err(ErrorCode::CatalogError( - format!( - " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor", - convert_logstore_i64_to_unix_millis(expected_timestamp) - ) - .into(), + .await + { + Ok((Some(rw_timestamp), expected_timestamp)) => { + let (mut row_stream, pg_descs) = Self::initiate_query( + Some(rw_timestamp), + &self.table, + handle_args.clone(), ) - .into()); + .await?; + self.cursor_need_drop_time = Instant::now() + + Duration::from_secs(self.subscription.retention_seconds); + let mut remaining_rows = VecDeque::new(); + Self::try_refill_remaining_rows(&mut row_stream, &mut remaining_rows) + .await?; + // Transition to the Fetch state + self.state = State::Fetch { + from_snapshot, + rw_timestamp, + row_stream, + pg_descs, + remaining_rows, + expected_timestamp, + }; + } + Ok((None, _)) => return Ok((None, vec![])), + Err(e) => { + self.state = State::Invalid; + return Err(e); } } - - // Return None if no data is found for the rw_timestamp in logstore. - // This happens when reaching EOF of logstore. This check cannot be moved before the - // expected_timestamp check to ensure that an error is returned on empty result when - // expected_timstamp is set. - if new_row_rw_timestamp.is_none() { - return Ok((None, pg_descs)); - } - - // Transition to the Fetch state - self.state = State::Fetch { - from_snapshot, - rw_timestamp: new_row_rw_timestamp.unwrap(), - row_stream, - pg_descs, - remaining_rows, - }; } State::Fetch { from_snapshot, @@ -270,6 +253,7 @@ impl SubscriptionCursor { row_stream, pg_descs, remaining_rows, + expected_timestamp, } => { let from_snapshot = *from_snapshot; let rw_timestamp = *rw_timestamp; @@ -281,46 +265,29 @@ impl SubscriptionCursor { // 1. Fetch the next row let new_row = row.take(); if from_snapshot { - // 1a. The rw_timestamp in the table is all the same, so don't need to check. return Ok(( - Some(Row::new(Self::build_row_with_snapshot(new_row))), + Some(Row::new(Self::build_row(new_row, None)?)), pg_descs.clone(), )); - } - - let new_row_rw_timestamp: i64 = new_row - .get(0) - .unwrap() - .as_ref() - .map(|bytes| std::str::from_utf8(bytes).unwrap().parse().unwrap()) - .unwrap(); - - if new_row_rw_timestamp != rw_timestamp { - // 1b. Find the next rw_timestamp. - // Initiate a new batch query to avoid query timeout and pinning version for too long. - // expected_timestamp shouold be set to ensure there is no data missing in the next query. - self.state = State::InitLogStoreQuery { - seek_timestamp: new_row_rw_timestamp, - expected_timestamp: Some(new_row_rw_timestamp), - }; } else { - // 1c. The rw_timestamp of this row is equal to self.rw_timestamp, return row return Ok(( - Some(Row::new(Self::build_row_with_logstore( - new_row, - rw_timestamp, - )?)), + Some(Row::new(Self::build_row(new_row, Some(rw_timestamp))?)), pg_descs.clone(), )); } } else { // 2. Reach EOF for the current query. - // Initiate a new batch query using the rw_timestamp + 1. - // expected_timestamp don't need to be set as the next rw_timestamp is unknown. - self.state = State::InitLogStoreQuery { - seek_timestamp: rw_timestamp + 1, - expected_timestamp: None, - }; + if let Some(expected_timestamp) = expected_timestamp { + self.state = State::InitLogStoreQuery { + seek_timestamp: *expected_timestamp, + expected_timestamp: Some(*expected_timestamp), + }; + } else { + self.state = State::InitLogStoreQuery { + seek_timestamp: rw_timestamp + 1, + expected_timestamp: None, + }; + } } } State::Invalid => { @@ -362,25 +329,60 @@ impl SubscriptionCursor { } } + async fn get_next_rw_timestamp( + seek_timestamp: u64, + table_id: u32, + expected_timestamp: Option, + handle_args: HandlerArgs, + ) -> Result<(Option, Option)> { + // The epoch here must be pulled every time, otherwise there will be cache consistency issues + let new_epochs = handle_args + .session + .catalog_writer()? + .list_change_log_epochs(table_id, seek_timestamp, 2) + .await?; + if let Some(expected_timestamp) = expected_timestamp + && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap()) + { + return Err(ErrorCode::CatalogError( + format!( + " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor", + convert_logstore_u64_to_unix_millis(expected_timestamp) + ) + .into(), + ) + .into()); + } + Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned())) + } + async fn initiate_query( - rw_timestamp: Option, - subscription: &SubscriptionCatalog, + rw_timestamp: Option, + table_catalog: &TableCatalog, handle_args: HandlerArgs, ) -> Result<(PgResponseStream, Vec)> { - let query_stmt = if let Some(rw_timestamp) = rw_timestamp { - Statement::Query(Box::new(gen_query_from_logstore_ge_rw_timestamp( - &subscription.get_log_store_name(), - rw_timestamp, - ))) + let (row_stream, pg_descs) = if let Some(rw_timestamp) = rw_timestamp { + let context = OptimizerContext::from_handler_args(handle_args.clone()); + let session = handle_args.session; + let plan_fragmenter_result = gen_batch_plan_fragmenter( + &session, + Self::create_batch_plan_for_cursor( + table_catalog, + &session, + context.into(), + rw_timestamp, + rw_timestamp, + )?, + )?; + create_stream(session, plan_fragmenter_result, vec![]).await? } else { - let subscription_from_table_name = ObjectName(vec![Ident::from( - subscription.subscription_from_name.as_ref(), - )]); - Statement::Query(Box::new(gen_query_from_table_name( + let subscription_from_table_name = + ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); + let query_stmt = Statement::Query(Box::new(gen_query_from_table_name( subscription_from_table_name, - ))) + ))); + create_stream_for_cursor_stmt(handle_args, query_stmt).await? }; - let (row_stream, pg_descs) = create_stream_for_cursor(handle_args, query_stmt).await?; Ok(( row_stream, Self::build_desc(pg_descs, rw_timestamp.is_none()), @@ -399,59 +401,89 @@ impl SubscriptionCursor { Ok(()) } - pub fn build_row_with_snapshot(row: Vec>) -> Vec> { - let mut new_row = vec![None, Some(Bytes::from(1i16.to_string()))]; - new_row.extend(row); - new_row - } - - pub fn build_row_with_logstore( + pub fn build_row( mut row: Vec>, - rw_timestamp: i64, + rw_timestamp: Option, ) -> Result>> { - let mut new_row = vec![Some(Bytes::from( - convert_logstore_i64_to_unix_millis(rw_timestamp).to_string(), - ))]; - // need remove kv_log_store_epoch - new_row.extend(row.drain(1..row.len()).collect_vec()); - Ok(new_row) + let new_row = if let Some(rw_timestamp) = rw_timestamp { + vec![Some(Bytes::from( + convert_logstore_u64_to_unix_millis(rw_timestamp).to_string(), + ))] + } else { + vec![Some(Bytes::from(1i16.to_string())), None] + }; + row.extend(new_row); + Ok(row) } pub fn build_desc( mut descs: Vec, from_snapshot: bool, ) -> Vec { - let mut new_descs = vec![ - PgFieldDescriptor::new( - "rw_timestamp".to_owned(), - DataType::Int64.to_oid(), - DataType::Int64.type_len(), - ), - PgFieldDescriptor::new( + if from_snapshot { + descs.push(PgFieldDescriptor::new( "op".to_owned(), DataType::Int16.to_oid(), DataType::Int16.type_len(), - ), - ]; - // need remove kv_log_store_epoch and kv_log_store_row_op - if from_snapshot { - new_descs.extend(descs) - } else { - assert_eq!( - descs.get(0).unwrap().get_name(), - KV_LOG_STORE_EPOCH, - "Cursor query logstore: first column must be {}", - KV_LOG_STORE_EPOCH - ); - assert_eq!( - descs.get(1).unwrap().get_name(), - KV_LOG_STORE_ROW_OP, - "Cursor query logstore: first column must be {}", - KV_LOG_STORE_ROW_OP - ); - new_descs.extend(descs.drain(2..descs.len())); + )); } - new_descs + descs.push(PgFieldDescriptor::new( + "rw_timestamp".to_owned(), + DataType::Int64.to_oid(), + DataType::Int64.type_len(), + )); + descs + } + + pub fn create_batch_plan_for_cursor( + table_catalog: &TableCatalog, + session: &SessionImpl, + context: OptimizerContextRef, + old_epoch: u64, + new_epoch: u64, + ) -> Result { + let out_col_idx = table_catalog + .columns + .iter() + .enumerate() + .filter(|(_, v)| !v.is_hidden) + .map(|(i, _)| i) + .collect::>(); + let core = generic::LogScan::new( + table_catalog.name.clone(), + out_col_idx, + Rc::new(table_catalog.table_desc()), + context, + old_epoch, + new_epoch, + ); + let batch_log_seq_scan = BatchLogSeqScan::new(core); + let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len()); + let out_names = batch_log_seq_scan.core().column_names(); + // Here we just need a plan_root to call the method, only out_fields and out_names will be used + let plan_root = PlanRoot::new_with_batch_plan( + PlanRef::from(batch_log_seq_scan.clone()), + RequiredDist::single(), + Order::default(), + out_fields, + out_names, + ); + let schema = batch_log_seq_scan.core().schema().clone(); + let (batch_log_seq_scan, query_mode) = match session.config().query_mode() { + QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local), + QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local), + QueryMode::Distributed => ( + plan_root.gen_batch_distributed_plan()?, + QueryMode::Distributed, + ), + }; + Ok(BatchQueryPlanResult { + plan: batch_log_seq_scan, + query_mode, + schema, + stmt_type: StatementType::SELECT, + dependent_relations: table_catalog.dependent_relations.clone(), + }) } } @@ -464,14 +496,16 @@ impl CursorManager { pub async fn add_subscription_cursor( &self, cursor_name: String, - start_timestamp: Option, + start_timestamp: Option, subscription: Arc, + table: Arc, handle_args: &HandlerArgs, ) -> Result<()> { let cursor = SubscriptionCursor::new( cursor_name.clone(), start_timestamp, subscription, + table, handle_args, ) .await?; diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index e7548ce5fa176..f2d768cc076f4 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -285,10 +285,6 @@ fn build_fragment( current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32 } - NodeBody::Subscription(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Subscription as u32 - } - NodeBody::TopN(_) => current_fragment.requires_singleton = true, NodeBody::StreamScan(node) => { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 47a9900752bae..b3bf70f9c523f 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -242,6 +242,15 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn list_change_log_epochs( + &self, + _table_id: u32, + _min_epoch: u64, + _max_count: u32, + ) -> Result> { + unreachable!() + } + async fn create_schema( &self, db_id: DatabaseId, @@ -325,12 +334,8 @@ impl CatalogWriter for MockCatalogWriter { self.create_sink_inner(sink, graph) } - async fn create_subscription( - &self, - subscription: PbSubscription, - graph: StreamFragmentGraph, - ) -> Result<()> { - self.create_subscription_inner(subscription, graph) + async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> { + self.create_subscription_inner(subscription) } async fn create_index( @@ -773,11 +778,7 @@ impl MockCatalogWriter { Ok(()) } - fn create_subscription_inner( - &self, - mut subscription: PbSubscription, - _graph: StreamFragmentGraph, - ) -> Result<()> { + fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> { subscription.id = self.gen_id(); self.catalog.write().create_subscription(&subscription); self.add_table_or_subscription_id( diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 7feaf88788693..74968d2e3a11c 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -9,6 +9,7 @@ mod m20240410_082733_with_version_column_migration; mod m20240410_154406_session_params; mod m20240417_062305_subscription_internal_table_name; mod m20240418_142249_function_runtime; +mod m20240506_112555_subscription_partial_ckpt; pub struct Migrator; @@ -23,6 +24,7 @@ impl MigratorTrait for Migrator { Box::new(m20240410_154406_session_params::Migration), Box::new(m20240417_062305_subscription_internal_table_name::Migration), Box::new(m20240418_142249_function_runtime::Migration), + Box::new(m20240506_112555_subscription_partial_ckpt::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240506_112555_subscription_partial_ckpt.rs b/src/meta/model_v2/migration/src/m20240506_112555_subscription_partial_ckpt.rs new file mode 100644 index 0000000000000..d799880f48511 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240506_112555_subscription_partial_ckpt.rs @@ -0,0 +1,180 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::RetentionSeconds).big_integer()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::SubscriptionState).integer()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::DependentTableId).integer()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::Columns.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::PlanPk.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::DistributionKey.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::Properties.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::SubscriptionFromName.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new( + Subscription::SubscriptionInternalTableName.to_string(), + )) + .to_owned(), + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::Columns).binary().not_null()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::PlanPk).binary().not_null()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::DistributionKey).json_binary()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::Properties).json_binary()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column(ColumnDef::new(Subscription::SubscriptionFromName).string()) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .add_column( + ColumnDef::new(Subscription::SubscriptionInternalTableName).string(), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::RetentionSeconds.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::SubscriptionState.to_string())) + .to_owned(), + ) + .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Subscription::Table) + .drop_column(Alias::new(Subscription::DependentTableId.to_string())) + .to_owned(), + ) + .await?; + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Subscription { + Table, + // delete + Columns, + PlanPk, + DistributionKey, + Properties, + SubscriptionFromName, + SubscriptionInternalTableName, + // add + RetentionSeconds, + SubscriptionState, + DependentTableId, +} diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs index 1aa4702574498..47ebbc63a2dc1 100644 --- a/src/meta/model_v2/src/subscription.rs +++ b/src/meta/model_v2/src/subscription.rs @@ -17,7 +17,7 @@ use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; -use crate::{ColumnCatalogArray, ColumnOrderArray, I32Array, Property, SubscriptionId}; +use crate::SubscriptionId; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscription")] @@ -25,13 +25,10 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub subscription_id: SubscriptionId, pub name: String, - pub columns: ColumnCatalogArray, - pub plan_pk: ColumnOrderArray, - pub distribution_key: I32Array, - pub properties: Property, + pub retention_seconds: i64, pub definition: String, - pub subscription_from_name: String, - pub subscription_internal_table_name: Option, + pub subscription_state: i32, + pub dependent_table_id: u32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -59,13 +56,10 @@ impl From for ActiveModel { Self { subscription_id: Set(pb_subscription.id as _), name: Set(pb_subscription.name), - columns: Set(pb_subscription.column_catalogs.into()), - plan_pk: Set(pb_subscription.plan_pk.into()), - distribution_key: Set(pb_subscription.distribution_key.into()), - properties: Set(pb_subscription.properties.into()), + retention_seconds: Set(pb_subscription.retention_seconds as _), definition: Set(pb_subscription.definition), - subscription_from_name: Set(pb_subscription.subscription_from_name), - subscription_internal_table_name: Set(pb_subscription.subscription_internal_table_name), + subscription_state: Set(pb_subscription.subscription_state), + dependent_table_id: Set(pb_subscription.dependent_table_id), } } } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index f5012e5796e69..7b7d4f5a7d092 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -320,16 +320,7 @@ impl DdlService for DdlServiceImpl { let req = request.into_inner(); let subscription = req.get_subscription()?.clone(); - let fragment_graph = req.get_fragment_graph()?.clone(); - - let stream_job = StreamingJob::Subscription(subscription); - - let command = DdlCommand::CreateStreamingJob( - stream_job, - fragment_graph, - CreateType::Foreground, - None, - ); + let command = DdlCommand::CreateSubscription(subscription); let version = self.ddl_controller.run_command(command).await?; @@ -347,11 +338,7 @@ impl DdlService for DdlServiceImpl { let subscription_id = request.subscription_id; let drop_mode = DropMode::from_request_setting(request.cascade); - let command = DdlCommand::DropStreamingJob( - StreamingJobId::Subscription(subscription_id), - drop_mode, - None, - ); + let command = DdlCommand::DropSubscription(subscription_id, drop_mode); let version = self.ddl_controller.run_command(command).await?; diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index d2991d4a005d6..1e46438cb8ddd 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -666,6 +666,22 @@ impl HummockManagerService for HummockServiceImpl { let response = Response::new(CancelCompactTaskResponse { ret }); return Ok(response); } + + async fn list_change_log_epochs( + &self, + request: Request, + ) -> Result, Status> { + let ListChangeLogEpochsRequest { + table_id, + min_epoch, + max_count, + } = request.into_inner(); + let epochs = self + .hummock_manager + .list_change_log_epochs(table_id, min_epoch, max_count) + .await; + Ok(Response::new(ListChangeLogEpochsResponse { epochs })) + } } #[cfg(test)] diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index fc26b5cb1606e..51175ca9475f2 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -1117,7 +1117,20 @@ impl CommandContext { .await; } - Command::CreateSubscription { .. } => {} + Command::CreateSubscription { + subscription_id, .. + } => match &self.barrier_manager_context.metadata_manager { + MetadataManager::V1(mgr) => { + mgr.catalog_manager + .finish_create_subscription_procedure(*subscription_id) + .await?; + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .finish_create_subscription_catalog(*subscription_id) + .await?; + } + }, Command::DropSubscription { .. } => {} } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index a3f19e555bf75..0f21ea956858e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -67,6 +67,7 @@ impl GlobalBarrierManagerContext { async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> { match &self.metadata_manager { MetadataManager::V1(mgr) => { + mgr.catalog_manager.clean_dirty_subscription().await?; // Please look at `CatalogManager::clean_dirty_tables` for more details. mgr.catalog_manager .clean_dirty_tables(mgr.fragment_manager.clone()) @@ -97,6 +98,7 @@ impl GlobalBarrierManagerContext { .await; } MetadataManager::V2(mgr) => { + mgr.catalog_controller.clean_dirty_subscription().await?; let ReleaseContext { source_ids, .. } = mgr.catalog_controller.clean_dirty_creating_jobs().await?; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 111e8e5ab9fe3..7607915bd13f7 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -18,9 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::catalog::{ - is_subscription_internal_table, TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS, -}; +use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -32,8 +30,10 @@ use risingwave_meta_model_v2::{ sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, TableId, UserId, + SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, + UserId, }; +use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, @@ -58,6 +58,7 @@ use sea_orm::{ }; use tokio::sync::{RwLock, RwLockReadGuard}; +use super::utils::check_subscription_name_duplicate; use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, @@ -402,6 +403,117 @@ impl CatalogController { Ok(version) } + pub async fn create_subscription_catalog( + &self, + pb_subscription: &mut PbSubscription, + ) -> MetaResult<()> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + ensure_user_id(pb_subscription.owner as _, &txn).await?; + ensure_object_id(ObjectType::Database, pb_subscription.database_id as _, &txn).await?; + ensure_object_id(ObjectType::Schema, pb_subscription.schema_id as _, &txn).await?; + check_subscription_name_duplicate(pb_subscription, &txn).await?; + + let obj = Self::create_object( + &txn, + ObjectType::Subscription, + pb_subscription.owner as _, + Some(pb_subscription.database_id as _), + Some(pb_subscription.schema_id as _), + ) + .await?; + pb_subscription.id = obj.oid as _; + let subscription: subscription::ActiveModel = pb_subscription.clone().into(); + Subscription::insert(subscription).exec(&txn).await?; + + // record object dependency. + ObjectDependency::insert(object_dependency::ActiveModel { + oid: Set(pb_subscription.dependent_table_id as _), + used_by: Set(pb_subscription.id as _), + ..Default::default() + }) + .exec(&txn) + .await?; + txn.commit().await?; + Ok(()) + } + + pub async fn finish_create_subscription_catalog(&self, subscription_id: u32) -> MetaResult<()> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let job_id = subscription_id as i32; + + // update `created_at` as now() and `created_at_cluster_version` as current cluster version. + let res = Object::update_many() + .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into()) + .col_expr( + object::Column::CreatedAtClusterVersion, + current_cluster_version().into(), + ) + .filter(object::Column::Oid.eq(job_id)) + .exec(&txn) + .await?; + if res.rows_affected == 0 { + return Err(MetaError::catalog_id_not_found("subscription", job_id)); + } + + // mark the target subscription as `Create`. + let job = subscription::ActiveModel { + subscription_id: Set(job_id), + subscription_state: Set(SubscriptionState::Created.into()), + ..Default::default() + }; + job.update(&txn).await?; + txn.commit().await?; + + Ok(()) + } + + pub async fn notify_create_subscription( + &self, + subscription_id: u32, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let job_id = subscription_id as i32; + let (subscription, obj) = Subscription::find_by_id(job_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?; + txn.commit().await?; + + let version = self + .notify_frontend( + Operation::Add, + Info::RelationGroup(PbRelationGroup { + relations: vec![PbRelation { + relation_info: PbRelationInfo::Subscription( + ObjectModel(subscription, obj.unwrap()).into(), + ) + .into(), + }], + }), + ) + .await; + Ok(version) + } + + pub async fn clean_dirty_subscription(&self) -> MetaResult<()> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let _res = Subscription::delete_many() + .filter( + subscription::Column::SubscriptionState + .eq(Into::::into(SubscriptionState::Init)), + ) + .exec(&txn) + .await?; + txn.commit().await?; + Ok(()) + } + pub async fn list_background_creating_mviews(&self) -> MetaResult> { let inner = self.inner.read().await; let tables = Table::find() @@ -901,28 +1013,6 @@ impl CatalogController { )), }); } - ObjectType::Subscription => { - let (mut subscription, obj) = Subscription::find_by_id(job_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?; - let log_store_names: Vec<_> = internal_table_objs - .iter() - .filter(|a| is_subscription_internal_table(&subscription.name, &a.0.name)) - .map(|a| &a.0.name) - .collect(); - if log_store_names.len() != 1 { - bail!("A subscription can only have one log_store_name"); - } - subscription.subscription_internal_table_name = - log_store_names.get(0).cloned().cloned(); - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Subscription( - ObjectModel(subscription, obj.unwrap()).into(), - )), - }); - } ObjectType::Index => { let (index, obj) = Index::find_by_id(job_id) .find_also_related(Object) @@ -1446,35 +1536,6 @@ impl CatalogController { relations.push(PbRelationInfo::Subscription( ObjectModel(subscription, obj).into(), )); - - // internal tables. - let internal_tables: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .filter(table::Column::BelongsToJobId.eq(object_id)) - .into_tuple() - .all(&txn) - .await?; - - Object::update_many() - .col_expr( - object::Column::OwnerId, - SimpleExpr::Value(Value::Int(Some(new_owner))), - ) - .filter(object::Column::Oid.is_in(internal_tables.clone())) - .exec(&txn) - .await?; - - let table_objs = Table::find() - .find_also_related(Object) - .filter(table::Column::TableId.is_in(internal_tables)) - .all(&txn) - .await?; - for (table, table_obj) in table_objs { - relations.push(PbRelationInfo::Table( - ObjectModel(table, table_obj.unwrap()).into(), - )); - } } ObjectType::View => { let view = View::find_by_id(object_id) @@ -1706,37 +1767,6 @@ impl CatalogController { relations.push(PbRelationInfo::Subscription( ObjectModel(subscription, obj).into(), )); - - // internal tables. - let internal_tables: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .filter(table::Column::BelongsToJobId.eq(object_id)) - .into_tuple() - .all(&txn) - .await?; - - if !internal_tables.is_empty() { - Object::update_many() - .col_expr( - object::Column::SchemaId, - SimpleExpr::Value(Value::Int(Some(new_schema))), - ) - .filter(object::Column::Oid.is_in(internal_tables.clone())) - .exec(&txn) - .await?; - - let table_objs = Table::find() - .find_also_related(Object) - .filter(table::Column::TableId.is_in(internal_tables)) - .all(&txn) - .await?; - for (table, table_obj) in table_objs { - relations.push(PbRelationInfo::Table( - ObjectModel(table, table_obj.unwrap()).into(), - )); - } - } } ObjectType::View => { let view = View::find_by_id(object_id) @@ -2614,6 +2644,25 @@ impl CatalogController { .collect()) } + pub async fn get_subscription_by_id( + &self, + subscription_id: SubscriptionId, + ) -> MetaResult { + let inner = self.inner.read().await; + let subscription_objs = Subscription::find() + .find_also_related(Object) + .filter(subscription::Column::SubscriptionId.eq(subscription_id)) + .all(&inner.db) + .await?; + let subscription: PbSubscription = subscription_objs + .into_iter() + .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into()) + .find_or_first(|_| true) + .ok_or_else(|| anyhow!("cant find subscription with id {}", subscription_id))?; + + Ok(subscription) + } + pub async fn find_creating_streaming_job_ids( &self, infos: Vec, diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index a271ab85bb928..ff90dd33297d2 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -19,6 +19,7 @@ use risingwave_meta_model_v2::{ }; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; +use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{ PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex, @@ -223,11 +224,8 @@ impl From> for PbSubscription { schema_id: value.1.schema_id.unwrap() as _, database_id: value.1.database_id.unwrap() as _, name: value.0.name, - plan_pk: value.0.plan_pk.to_protobuf(), - dependent_relations: vec![], // todo: deprecate it. - distribution_key: value.0.distribution_key.0, owner: value.1.owner_id as _, - properties: value.0.properties.0, + retention_seconds: value.0.retention_seconds as _, definition: value.0.definition, initialized_at_epoch: Some( Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, @@ -235,12 +233,10 @@ impl From> for PbSubscription { created_at_epoch: Some( Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, ), - stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. - column_catalogs: value.0.columns.to_protobuf(), - subscription_from_name: value.0.subscription_from_name, initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, - subscription_internal_table_name: value.0.subscription_internal_table_name, + dependent_table_id: value.0.dependent_table_id, + subscription_state: PbSubscriptionState::Init as _, } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 271860ee49561..8b7c5281a9268 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -30,9 +30,9 @@ use risingwave_meta_model_v2::prelude::{ }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, - streaming_job, subscription, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, - ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, - StreamNode, StreamingParallelism, TableId, TableVersion, UserId, + streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, + FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, + StreamingParallelism, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -171,22 +171,6 @@ impl CatalogController { let sink: sink::ActiveModel = sink.clone().into(); Sink::insert(sink).exec(&txn).await?; } - StreamingJob::Subscription(subscription) => { - let job_id = Self::create_streaming_job_obj( - &txn, - ObjectType::Subscription, - subscription.owner as _, - Some(subscription.database_id as _), - Some(subscription.schema_id as _), - create_type, - ctx, - streaming_parallelism, - ) - .await?; - subscription.id = job_id as _; - let subscription: subscription::ActiveModel = subscription.clone().into(); - subscription.insert(&txn).await?; - } StreamingJob::Table(src, table, _) => { let job_id = Self::create_streaming_job_obj( &txn, diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 173634ab03cc7..42ccc97e6637f 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -23,11 +23,11 @@ use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, - object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId, - DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, - SchemaId, SourceId, StreamNode, UserId, + object_dependency, schema, sink, source, subscription, table, user, user_privilege, view, + ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, + PrivilegeId, SchemaId, SourceId, StreamNode, UserId, }; -use risingwave_pb::catalog::{PbConnection, PbFunction}; +use risingwave_pb::catalog::{PbConnection, PbFunction, PbSubscription}; use risingwave_pb::meta::PbFragmentParallelUnitMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; @@ -410,6 +410,33 @@ where Ok(()) } +pub async fn check_subscription_name_duplicate( + pb_subscription: &PbSubscription, + db: &C, +) -> MetaResult<()> +where + C: ConnectionTrait, +{ + let count = Subscription::find() + .inner_join(Object) + .filter( + object::Column::DatabaseId + .eq(pb_subscription.database_id as DatabaseId) + .and(object::Column::SchemaId.eq(pb_subscription.schema_id as SchemaId)) + .and(subscription::Column::Name.eq(&pb_subscription.name)), + ) + .count(db) + .await?; + if count > 0 { + assert_eq!(count, 1); + return Err(MetaError::catalog_duplicated( + "subscription", + &pb_subscription.name, + )); + } + Ok(()) +} + /// `check_user_name_duplicate` checks whether the user is already existed in the cluster. pub async fn check_user_name_duplicate(name: &str, db: &C) -> MetaResult<()> where diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 773ca9e6ba536..ddbe89b2511b1 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -176,13 +176,11 @@ impl HummockManager { ), ); } - versions_object_ids.extend(version_delta.newly_added_object_ids()); } // Object ids that once exist in any hummock version but not exist in the latest hummock version let removed_object_ids = &versions_object_ids - ¤t_version.get_object_ids(); - let total_file_size = removed_object_ids .iter() .map(|t| { diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index ff3965d974819..c7834b144b69f 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -3301,6 +3301,26 @@ impl HummockManager { Ok(()) } + + #[named] + pub async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Vec { + let versioning = read_lock!(self, versioning).await; + if let Some(table_change_log) = versioning + .current_version + .table_change_log + .get(&TableId::new(table_id)) + { + let table_change_log = table_change_log.clone(); + table_change_log.get_epochs(min_epoch, max_count as usize) + } else { + vec![] + } + } } // This structure describes how hummock handles sst switching in a compaction group. A better sst cut will result in better data alignment, which in turn will improve the efficiency of the compaction. diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index aa4317add003b..ed128ee3e1ecb 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableOption; +use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{ Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, @@ -123,9 +124,9 @@ impl DatabaseManager { (sink.id, sink) })); let subscriptions = BTreeMap::from_iter(subscriptions.into_iter().map(|subscription| { - for depend_relation_id in &subscription.dependent_relations { - *relation_ref_count.entry(*depend_relation_id).or_default() += 1; - } + *relation_ref_count + .entry(subscription.dependent_table_id) + .or_default() += 1; (subscription.id, subscription) })); let indexes = BTreeMap::from_iter(indexes.into_iter().map(|index| (index.id, index))); @@ -185,10 +186,7 @@ impl DatabaseManager { .collect_vec(), self.subscriptions .values() - .filter(|t| { - t.stream_job_status == PbStreamJobStatus::Unspecified as i32 - || t.stream_job_status == PbStreamJobStatus::Created as i32 - }) + .filter(|t| t.subscription_state == PbSubscriptionState::Created as i32) .cloned() .collect_vec(), self.indexes @@ -398,7 +396,6 @@ impl DatabaseManager { .keys() .copied() .chain(self.sinks.keys().copied()) - .chain(self.subscriptions.keys().copied()) .chain(self.indexes.keys().copied()) .chain(self.sources.keys().copied()) .chain( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 5a316696e6195..665eb45fc3c37 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -27,12 +27,13 @@ pub use database::*; pub use fragment::*; use itertools::Itertools; use risingwave_common::catalog::{ - is_subscription_internal_table, valid_table_name, TableId as StreamingJobId, TableOption, - DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG, + valid_table_name, TableId as StreamingJobId, TableOption, DEFAULT_DATABASE_NAME, + DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG, DEFAULT_SUPER_USER_FOR_PG_ID, DEFAULT_SUPER_USER_ID, SYSTEM_SCHEMAS, }; -use risingwave_common::{bail, ensure}; +use risingwave_common::{bail, current_cluster_version, ensure}; use risingwave_connector::source::{should_copy_to_format_encode_options, UPSTREAM_SOURCE_KEY}; +use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, @@ -715,9 +716,6 @@ impl CatalogManager { .await } StreamingJob::Sink(sink, _) => self.start_create_sink_procedure(sink).await, - StreamingJob::Subscription(subscription) => { - self.start_create_subscription_procedure(subscription).await - } StreamingJob::Index(index, index_table) => { self.start_create_index_procedure(index, index_table).await } @@ -1202,7 +1200,7 @@ impl CatalogManager { .tree_ref() .iter() .filter_map(|(_, subscription)| { - if subscription.dependent_relations.contains(&relation_id) { + if subscription.dependent_table_id == relation_id { Some(RelationInfo::Subscription(subscription.clone())) } else { None @@ -1549,11 +1547,6 @@ impl CatalogManager { if !all_subscription_ids.insert(subscription.id) { continue; } - let table_fragments = fragment_manager - .select_table_fragments_by_table_id(&subscription.id.into()) - .await?; - - all_internal_table_ids.extend(table_fragments.internal_table_ids()); if let Some(ref_count) = database_core .relation_ref_count @@ -1735,9 +1728,7 @@ impl CatalogManager { } for subscription in &subscriptions_removed { - for dependent_relation_id in &subscription.dependent_relations { - database_core.decrease_ref_count(*dependent_relation_id); - } + database_core.decrease_ref_count(subscription.dependent_table_id); } let version = self @@ -1780,7 +1771,6 @@ impl CatalogManager { .into_iter() .map(|id| id.into()) .chain(all_sink_ids.into_iter().map(|id| id.into())) - .chain(all_subscription_ids.into_iter().map(|id| id.into())) .chain(all_streaming_job_source_ids.into_iter().map(|id| id.into())) .collect_vec(); @@ -1873,7 +1863,7 @@ impl CatalogManager { } for subscription in database_mgr.subscriptions.values() { - if subscription.dependent_relations.contains(&relation_id) { + if subscription.dependent_table_id == relation_id { let mut subscription = subscription.clone(); subscription.definition = alter_relation_rename_refs(&subscription.definition, from, to); @@ -2395,7 +2385,6 @@ impl CatalogManager { alter_owner_request::Object::SubscriptionId(subscription_id) => { database_core.ensure_subscription_id(subscription_id)?; let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); - let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut subscription = subscriptions.get_mut(subscription_id).unwrap(); let old_owner_id = subscription.owner; if old_owner_id == owner_id { @@ -2403,26 +2392,12 @@ impl CatalogManager { } subscription.owner = owner_id; - let mut relations = vec![Relation { + let relations = vec![Relation { relation_info: Some(RelationInfo::Subscription(subscription.clone())), }]; - // internal tables - let internal_table_ids = fragment_manager - .select_table_fragments_by_table_id(&(subscription_id.into())) - .await? - .internal_table_ids(); - for id in internal_table_ids { - let mut table = tables.get_mut(id).unwrap(); - assert_eq!(old_owner_id, table.owner); - table.owner = owner_id; - relations.push(Relation { - relation_info: Some(RelationInfo::Table(table.clone())), - }); - } - relation_info = Info::RelationGroup(RelationGroup { relations }); - commit_meta!(self, subscriptions, tables)?; + commit_meta!(self, subscriptions)?; user_core.increase_ref(owner_id); user_core.decrease_ref(old_owner_id); } @@ -2659,14 +2634,6 @@ impl CatalogManager { return Ok(IGNORED_NOTIFICATION_VERSION); } - // internal tables. - let to_update_internal_table_ids = Vec::from_iter( - fragment_manager - .select_table_fragments_by_table_id(&(subscription_id.into())) - .await? - .internal_table_ids(), - ); - database_core.check_relation_name_duplicated(&( database_id, new_schema_id, @@ -2676,15 +2643,7 @@ impl CatalogManager { let mut subscription = subscriptions.get_mut(subscription_id).unwrap(); subscription.schema_id = new_schema_id; relation_infos.push(Some(RelationInfo::Subscription(subscription.clone()))); - - let mut tables = BTreeMapTransaction::new(&mut database_core.tables); - for table_id in to_update_internal_table_ids { - let mut table = tables.get_mut(table_id).unwrap(); - table.schema_id = new_schema_id; - relation_infos.push(Some(RelationInfo::Table(table.clone()))); - } - - commit_meta!(self, subscriptions, tables)?; + commit_meta!(self, subscriptions)?; } } @@ -3194,9 +3153,8 @@ impl CatalogManager { let user_core = &mut core.user; database_core.ensure_database_id(subscription.database_id)?; database_core.ensure_schema_id(subscription.schema_id)?; - for dependent_id in &subscription.dependent_relations { - database_core.ensure_table_view_or_source_id(dependent_id)?; - } + database_core + .ensure_table_view_or_source_id(&TableId::from(subscription.dependent_table_id))?; let key = ( subscription.database_id, subscription.schema_id, @@ -3211,39 +3169,36 @@ impl CatalogManager { } else { database_core.mark_creating(&key); database_core.mark_creating_streaming_job(subscription.id, key); - for &dependent_relation_id in &subscription.dependent_relations { - database_core.increase_ref_count(dependent_relation_id); - } + database_core.increase_ref_count(subscription.dependent_table_id); user_core.increase_ref(subscription.owner); + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + subscriptions.insert(subscription.id, subscription.clone()); + commit_meta!(self, subscriptions)?; Ok(()) } } pub async fn finish_create_subscription_procedure( &self, - mut internal_tables: Vec, - mut subscription: Subscription, - ) -> MetaResult { + subscription_id: SubscriptionId, + ) -> MetaResult<()> { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + let mut subscription = subscriptions + .get(&subscription_id) + .ok_or_else(|| anyhow!("subscription not found"))? + .clone(); + subscription.created_at_cluster_version = Some(current_cluster_version()); + subscription.created_at_epoch = Some(Epoch::now().0); let key = ( subscription.database_id, subscription.schema_id, subscription.name.clone(), ); - let log_store_names: Vec<_> = internal_tables - .iter() - .filter(|a| is_subscription_internal_table(&subscription.name, a.get_name())) - .map(|a| a.get_name()) - .collect(); - if log_store_names.len() != 1 { - bail!("A subscription can only have one log_store_name"); - } - subscription.subscription_internal_table_name = log_store_names.get(0).cloned().cloned(); - let mut tables = BTreeMapTransaction::new(&mut database_core.tables); - let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + assert!( - !subscriptions.contains_key(&subscription.id) + subscription.subscription_state == Into::::into(PbSubscriptionState::Init) && database_core.in_progress_creation_tracker.contains(&key), "subscription must be in creating procedure" ); @@ -3253,13 +3208,28 @@ impl CatalogManager { .in_progress_creation_streaming_job .remove(&subscription.id); - subscription.stream_job_status = PbStreamJobStatus::Created.into(); + subscription.subscription_state = PbSubscriptionState::Created.into(); subscriptions.insert(subscription.id, subscription.clone()); - for table in &mut internal_tables { - table.stream_job_status = PbStreamJobStatus::Created.into(); - tables.insert(table.id, table.clone()); - } - commit_meta!(self, subscriptions, tables)?; + commit_meta!(self, subscriptions)?; + Ok(()) + } + + pub async fn notify_create_subscription( + &self, + subscription_id: SubscriptionId, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + let subscription = subscriptions + .get(&subscription_id) + .ok_or_else(|| anyhow!("subscription not found"))? + .clone(); + assert_eq!( + subscription.subscription_state, + Into::::into(PbSubscriptionState::Created) + ); + commit_meta!(self, subscriptions)?; let version = self .notify_frontend( @@ -3267,39 +3237,45 @@ impl CatalogManager { Info::RelationGroup(RelationGroup { relations: vec![Relation { relation_info: RelationInfo::Subscription(subscription.to_owned()).into(), - }] - .into_iter() - .chain(internal_tables.into_iter().map(|internal_table| Relation { - relation_info: RelationInfo::Table(internal_table).into(), - })) - .collect_vec(), + }], }), ) .await; - Ok(version) } - pub async fn cancel_create_subscription_procedure(&self, subscription: &Subscription) { + pub async fn clean_dirty_subscription(&self) -> MetaResult<()> { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + let remove_subscriptions = subscriptions + .tree_ref() + .iter() + .filter(|(_, s)| s.subscription_state == Into::::into(PbSubscriptionState::Init)) + .map(|(_, s)| s.clone()) + .collect_vec(); let user_core = &mut core.user; - let key = ( - subscription.database_id, - subscription.schema_id, - subscription.name.clone(), - ); - assert!( - !database_core.subscriptions.contains_key(&subscription.id), - "subscription must be in creating procedure" - ); + for s in &remove_subscriptions { + subscriptions.remove(s.id); + } + commit_meta!(self, subscriptions)?; + for subscription in remove_subscriptions { + let key = ( + subscription.database_id, + subscription.schema_id, + subscription.name.clone(), + ); + assert!( + !database_core.subscriptions.contains_key(&subscription.id), + "subscription must be in creating procedure" + ); - database_core.unmark_creating(&key); - database_core.unmark_creating_streaming_job(subscription.id); - for &dependent_relation_id in &subscription.dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); + database_core.unmark_creating(&key); + database_core.unmark_creating_streaming_job(subscription.id); + database_core.decrease_ref_count(subscription.dependent_table_id); + user_core.decrease_ref(subscription.owner); } - user_core.decrease_ref(subscription.owner); + Ok(()) } /// This is used for `ALTER TABLE ADD/DROP COLUMN`. @@ -3761,6 +3737,19 @@ impl CatalogManager { tables } + pub async fn get_subscription_by_id( + &self, + subscription_id: SubscriptionId, + ) -> MetaResult { + let guard = self.core.lock().await; + let subscription = guard + .database + .subscriptions + .get(&subscription_id) + .ok_or_else(|| anyhow!("cant find subscription with id {}", subscription_id))?; + Ok(subscription.clone()) + } + pub async fn get_created_table_ids(&self) -> Vec { let guard = self.core.lock().await; guard diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 0ce47608cdfd2..dcf537ad6c7e7 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -101,7 +101,6 @@ impl NotificationManager { info: Some(task.info), version: task.version.unwrap_or_default(), }; - core.lock().await.notify(task.target, response); } }); diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index a29e6e923de2b..a9a159e6a4535 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -15,7 +15,7 @@ use risingwave_common::catalog::TableVersionId; use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; -use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Subscription, Table}; +use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; use strum::EnumDiscriminants; @@ -26,7 +26,6 @@ use crate::model::FragmentId; #[derive(Debug, Clone, EnumDiscriminants)] pub enum StreamingJob { MaterializedView(Table), - Subscription(Subscription), Sink(Sink, Option<(Table, Option)>), Table(Option, Table, TableJobType), Index(Index, Table), @@ -37,7 +36,6 @@ pub enum StreamingJob { pub enum DdlType { MaterializedView, Sink, - Subscription, Table(TableJobType), Index, Source, @@ -51,7 +49,6 @@ impl From<&StreamingJob> for DdlType { StreamingJob::Table(_, _, ty) => DdlType::Table(*ty), StreamingJob::Index(_, _) => DdlType::Index, StreamingJob::Source(_) => DdlType::Source, - StreamingJob::Subscription(_) => DdlType::Subscription, } } } @@ -94,10 +91,6 @@ impl StreamingJob { source.created_at_epoch = created_at_epoch; source.created_at_cluster_version = created_at_cluster_version; } - StreamingJob::Subscription(subscription) => { - subscription.created_at_epoch = created_at_epoch; - subscription.created_at_cluster_version = created_at_cluster_version; - } } } @@ -132,10 +125,6 @@ impl StreamingJob { source.initialized_at_epoch = initialized_at_epoch; source.initialized_at_cluster_version = initialized_at_cluster_version; } - StreamingJob::Subscription(subscription) => { - subscription.initialized_at_epoch = initialized_at_epoch; - subscription.initialized_at_cluster_version = initialized_at_cluster_version; - } } } } @@ -154,9 +143,6 @@ impl StreamingJob { StreamingJob::Source(src) => { src.id = id; } - StreamingJob::Subscription(subscription) => { - subscription.id = id; - } } } @@ -166,7 +152,7 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { table.fragment_id = id; } - Self::Sink(_, _) | Self::Source(_) | Self::Subscription(_) => {} + Self::Sink(_, _) | Self::Source(_) => {} } } @@ -176,10 +162,7 @@ impl StreamingJob { Self::Table(_, table, ..) => { table.dml_fragment_id = id; } - Self::MaterializedView(_) - | Self::Index(_, _) - | Self::Sink(_, _) - | Self::Subscription(_) => {} + Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_, _) => {} Self::Source(_) => {} } } @@ -191,7 +174,6 @@ impl StreamingJob { Self::Table(_, table, ..) => table.id, Self::Index(index, _) => index.id, Self::Source(source) => source.id, - Self::Subscription(subscription) => subscription.id, } } @@ -202,7 +184,6 @@ impl StreamingJob { Self::Table(_, table, ..) => Some(table.id), Self::Index(_, table) => Some(table.id), Self::Source(_) => None, - Self::Subscription(_) => None, } } @@ -212,7 +193,7 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { Some(table) } - Self::Sink(_, _) | Self::Source(_) | Self::Subscription(_) => None, + Self::Sink(_, _) | Self::Source(_) => None, } } @@ -223,7 +204,6 @@ impl StreamingJob { Self::Table(_, table, ..) => table.schema_id, Self::Index(index, _) => index.schema_id, Self::Source(source) => source.schema_id, - Self::Subscription(subscription) => subscription.schema_id, } } @@ -234,7 +214,6 @@ impl StreamingJob { Self::Table(_, table, ..) => table.database_id, Self::Index(index, _) => index.database_id, Self::Source(source) => source.database_id, - Self::Subscription(subscription) => subscription.database_id, } } @@ -245,7 +224,6 @@ impl StreamingJob { Self::Table(_, table, ..) => table.name.clone(), Self::Index(index, _) => index.name.clone(), Self::Source(source) => source.name.clone(), - Self::Subscription(subscription) => subscription.name.clone(), } } @@ -256,7 +234,6 @@ impl StreamingJob { StreamingJob::Table(_, table, ..) => table.owner, StreamingJob::Index(index, _) => index.owner, StreamingJob::Source(source) => source.owner, - StreamingJob::Subscription(subscription) => subscription.owner, } } @@ -267,7 +244,6 @@ impl StreamingJob { Self::Index(_, table) => table.definition.clone(), Self::Sink(sink, _) => sink.definition.clone(), Self::Source(source) => source.definition.clone(), - Self::Subscription(subscription) => subscription.definition.clone(), } } @@ -306,7 +282,6 @@ impl StreamingJob { vec![] } StreamingJob::Source(_) => vec![], - Self::Subscription(subscription) => subscription.dependent_relations.clone(), } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c0aa62750facb..c8080498e1761 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -45,7 +45,7 @@ use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ connection, Comment, Connection, CreateType, Database, Function, PbSource, PbTable, Schema, - Sink, Source, Table, View, + Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -101,7 +101,6 @@ pub enum StreamingJobId { Sink(SinkId), Table(Option, TableId), Index(IndexId), - Subscription(SubscriptionId), } impl StreamingJobId { @@ -110,7 +109,6 @@ impl StreamingJobId { match self { StreamingJobId::MaterializedView(id) | StreamingJobId::Sink(id) - | StreamingJobId::Subscription(id) | StreamingJobId::Table(_, id) | StreamingJobId::Index(id) => *id, } @@ -150,6 +148,8 @@ pub enum DdlCommand { CreateConnection(Connection), DropConnection(ConnectionId), CommentOn(Comment), + CreateSubscription(Subscription), + DropSubscription(SubscriptionId, DropMode), } impl DdlCommand { @@ -332,6 +332,12 @@ impl DdlController { } DdlCommand::AlterSourceColumn(source) => ctrl.alter_source_column(source).await, DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, + DdlCommand::CreateSubscription(subscription) => { + ctrl.create_subscription(subscription).await + } + DdlCommand::DropSubscription(subscription_id, drop_mode) => { + ctrl.drop_subscription(subscription_id, drop_mode).await + } } } .in_current_span(); @@ -634,6 +640,114 @@ impl DdlController { Ok(()) } + async fn create_subscription( + &self, + mut subscription: Subscription, + ) -> MetaResult { + tracing::debug!("create subscription"); + let _permit = self + .creating_streaming_job_permits + .semaphore + .acquire() + .await + .unwrap(); + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + let initialized_at_epoch = Some(Epoch::now().0); + let initialized_at_cluster_version = Some(current_cluster_version()); + subscription.initialized_at_epoch = initialized_at_epoch; + subscription.initialized_at_cluster_version = initialized_at_cluster_version; + subscription.id = id; + + mgr.catalog_manager + .start_create_subscription_procedure(&subscription) + .await?; + match self.stream_manager.create_subscription(&subscription).await { + Ok(_) => { + let version = mgr + .catalog_manager + .notify_create_subscription(subscription.id) + .await?; + tracing::debug!("finish create subscription"); + Ok(version) + } + Err(e) => { + tracing::debug!("cancel create subscription"); + Err(e) + } + } + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .create_subscription_catalog(&mut subscription) + .await?; + match self.stream_manager.create_subscription(&subscription).await { + Ok(_) => { + let version = mgr + .catalog_controller + .notify_create_subscription(subscription.id) + .await?; + tracing::debug!("finish create subscription"); + Ok(version) + } + Err(e) => { + tracing::debug!("cancel create subscription"); + Err(e) + } + } + } + } + } + + async fn drop_subscription( + &self, + subscription_id: SubscriptionId, + drop_mode: DropMode, + ) -> MetaResult { + tracing::debug!("preparing drop subscription"); + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let table_id = mgr + .catalog_manager + .get_subscription_by_id(subscription_id) + .await? + .dependent_table_id; + let (version, _) = mgr + .catalog_manager + .drop_relation( + RelationIdEnum::Subscription(subscription_id), + mgr.fragment_manager.clone(), + drop_mode, + ) + .await?; + self.stream_manager + .drop_subscription(subscription_id, table_id) + .await; + tracing::debug!("finish drop subscription"); + Ok(version) + } + MetadataManager::V2(mgr) => { + let table_id = mgr + .catalog_controller + .get_subscription_by_id(subscription_id as i32) + .await? + .dependent_table_id; + let (_, version) = mgr + .catalog_controller + .drop_relation(ObjectType::Subscription, subscription_id as _, drop_mode) + .await?; + self.stream_manager + .drop_subscription(subscription_id, table_id) + .await; + tracing::debug!("finish drop subscription"); + Ok(version) + } + } + } + async fn create_streaming_job( &self, mut stream_job: StreamingJob, @@ -1159,7 +1273,6 @@ impl DdlController { StreamingJobId::Sink(id) => (id as _, ObjectType::Sink), StreamingJobId::Table(_, id) => (id as _, ObjectType::Table), StreamingJobId::Index(idx) => (idx as _, ObjectType::Index), - StreamingJobId::Subscription(id) => (id as _, ObjectType::Subscription), }; let version = self @@ -1217,15 +1330,6 @@ impl DdlController { ) .await? } - StreamingJobId::Subscription(subscription_id) => { - mgr.catalog_manager - .drop_relation( - RelationIdEnum::Subscription(subscription_id), - mgr.fragment_manager.clone(), - drop_mode, - ) - .await? - } }; if let Some(replace_table_info) = target_replace_info { @@ -1543,11 +1647,6 @@ impl DdlController { .cancel_create_sink_procedure(sink, target_table) .await; } - StreamingJob::Subscription(subscription) => { - mgr.catalog_manager - .cancel_create_subscription_procedure(subscription) - .await; - } StreamingJob::Table(source, table, ..) => { if let Some(source) = source { mgr.catalog_manager @@ -1634,11 +1733,6 @@ impl DdlController { version } - StreamingJob::Subscription(subscription) => { - mgr.catalog_manager - .finish_create_subscription_procedure(internal_tables, subscription) - .await? - } StreamingJob::Table(source, table, ..) => { creating_internal_table_ids.push(table.id); if let Some(source) = source { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index af9e233fb2984..e347dd0287f36 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -148,11 +148,6 @@ impl BuildingFragment { has_table = true; } - NodeBody::Subscription(subscription_node) => { - subscription_node.subscription_catalog.as_mut().unwrap().id = table_id; - - has_table = true; - } NodeBody::Dml(dml_node) => { dml_node.table_id = table_id; dml_node.table_version_id = job.table_version_id().unwrap(); @@ -666,10 +661,7 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - DdlType::MaterializedView - | DdlType::Sink - | DdlType::Index - | DdlType::Subscription => { + DdlType::MaterializedView | DdlType::Sink | DdlType::Index => { // handle MV on MV/Source // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 9238d9652fe55..58c3b9add64c5 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -19,7 +19,7 @@ use futures::future::join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_meta_model_v2::ObjectId; -use risingwave_pb::catalog::{CreateType, Table}; +use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; use thiserror_ext::AsReport; @@ -741,6 +741,39 @@ impl GlobalStreamManager { Ok(()) } + + // Dont need add actor, just send a command + pub async fn create_subscription( + self: &Arc, + subscription: &Subscription, + ) -> MetaResult<()> { + let command = Command::CreateSubscription { + subscription_id: subscription.id, + upstream_mv_table_id: TableId::new(subscription.dependent_table_id), + retention_second: subscription.retention_seconds, + }; + + tracing::debug!("sending Command::CreateSubscription"); + self.barrier_scheduler.run_command(command).await?; + Ok(()) + } + + // Dont need add actor, just send a command + pub async fn drop_subscription(self: &Arc, subscription_id: u32, table_id: u32) { + let command = Command::DropSubscription { + subscription_id, + upstream_mv_table_id: TableId::new(table_id), + }; + + tracing::debug!("sending Command::DropSubscription"); + let _ = self + .barrier_scheduler + .run_command(command) + .await + .inspect_err(|err| { + tracing::error!(error = ?err.as_report(), "failed to run drop command"); + }); + } } #[cfg(test)] diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index fbb8dff1f5a98..7fd32f3b8bab0 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -384,11 +384,9 @@ impl MetaClient { pub async fn create_subscription( &self, subscription: PbSubscription, - graph: StreamFragmentGraph, ) -> Result { let request = CreateSubscriptionRequest { subscription: Some(subscription), - fragment_graph: Some(graph), }; let resp = self.inner.create_subscription(request).await?; @@ -596,6 +594,21 @@ impl MetaClient { Ok(resp.version) } + pub async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Result> { + let request = ListChangeLogEpochsRequest { + table_id, + min_epoch, + max_count, + }; + let resp = self.inner.list_change_log_epochs(request).await?; + Ok(resp.epochs) + } + pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { let request = DropIndexRequest { index_id: index_id.index_id, @@ -1976,6 +1989,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse } ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} + ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 3230ab99941b8..440d8f58d2802 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -172,20 +172,9 @@ pub enum AlterSinkOperation { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterSubscriptionOperation { - RenameSubscription { - subscription_name: ObjectName, - }, - ChangeOwner { - new_owner_name: Ident, - }, - SetSchema { - new_schema_name: ObjectName, - }, - /// `SET PARALLELISM TO [ DEFERRED ]` - SetParallelism { - parallelism: SetVariableValue, - deferred: bool, - }, + RenameSubscription { subscription_name: ObjectName }, + ChangeOwner { new_owner_name: Ident }, + SetSchema { new_schema_name: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -407,17 +396,6 @@ impl fmt::Display for AlterSubscriptionOperation { AlterSubscriptionOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterSubscriptionOperation::SetParallelism { - parallelism, - deferred, - } => { - write!( - f, - "SET PARALLELISM TO {} {}", - parallelism, - if *deferred { " DEFERRED" } else { "" } - ) - } } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 1d820ce53ff7d..ea28a79c01c6e 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3527,24 +3527,8 @@ impl Parser { AlterSubscriptionOperation::SetSchema { new_schema_name: schema_name, } - } else if self.parse_keyword(Keyword::PARALLELISM) { - if self.expect_keyword(Keyword::TO).is_err() - && self.expect_token(&Token::Eq).is_err() - { - return self.expected( - "TO or = after ALTER TABLE SET PARALLELISM", - self.peek_token(), - ); - } - let value = self.parse_set_variable()?; - let deferred = self.parse_keyword(Keyword::DEFERRED); - - AlterSubscriptionOperation::SetParallelism { - parallelism: value, - deferred, - } } else { - return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); + return self.expected("SCHEMA after SET", self.peek_token()); } } else { return self.expected( diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 1c68a05cd81a4..af694c3373e10 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::cmp::min; use std::collections::HashMap; use risingwave_common::catalog::TableId; @@ -33,6 +34,18 @@ impl TableChangeLog { &self.0[start..end] } + pub fn get_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { + let epochs: Vec = self + .filter_epoch((min_epoch, u64::MAX)) + .iter() + .flat_map(|epoch_change_log| epoch_change_log.epochs.clone()) + .filter(|a| a >= &min_epoch) + .clone() + .collect(); + let end = min(max_count, epochs.len()); + epochs[..end].into() + } + pub fn truncate(&mut self, truncate_epoch: u64) { // TODO: may optimize by using VecDeque to maintain the log self.0 diff --git a/src/stream/src/common/log_store_impl/mod.rs b/src/stream/src/common/log_store_impl/mod.rs index bcd15ab12a7da..bd600fc80086f 100644 --- a/src/stream/src/common/log_store_impl/mod.rs +++ b/src/stream/src/common/log_store_impl/mod.rs @@ -14,4 +14,3 @@ pub mod in_mem; pub mod kv_log_store; -pub mod subscription_log_store; diff --git a/src/stream/src/common/log_store_impl/subscription_log_store.rs b/src/stream/src/common/log_store_impl/subscription_log_store.rs deleted file mode 100644 index c7de7073f85ed..0000000000000 --- a/src/stream/src/common/log_store_impl/subscription_log_store.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use itertools::Itertools; -use risingwave_common::array::StreamChunk; -use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::TableId; -use risingwave_connector::sink::log_store::LogStoreResult; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; -use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; - -use super::kv_log_store::ReaderTruncationOffsetType; -use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; -use crate::common::log_store_impl::kv_log_store::{SeqIdType, FIRST_SEQ_ID}; - -pub struct SubscriptionLogStoreWriter { - _table_id: TableId, - - seq_id: SeqIdType, - - state_store: LS, - - serde: LogStoreRowSerde, - - _identity: String, -} - -impl SubscriptionLogStoreWriter { - pub(crate) fn new( - table_id: TableId, - state_store: LS, - serde: LogStoreRowSerde, - identity: String, - ) -> Self { - Self { - _table_id: table_id, - seq_id: FIRST_SEQ_ID, - state_store, - serde, - _identity: identity, - } - } - - pub async fn init( - &mut self, - epoch: risingwave_common::util::epoch::EpochPair, - _pause_read_on_bootstrap: bool, - ) -> LogStoreResult<()> { - self.state_store.init(InitOptions::new(epoch)).await?; - self.seq_id = FIRST_SEQ_ID; - Ok(()) - } - - pub fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { - if chunk.cardinality() == 0 { - return Ok(()); - } - let epoch = self.state_store.epoch(); - let start_seq_id = self.seq_id; - self.seq_id += chunk.cardinality() as SeqIdType; - for (i, (op, row)) in chunk.rows().enumerate() { - let seq_id = start_seq_id + (i as SeqIdType); - let (_vnode, key, value) = self.serde.serialize_data_row(epoch, seq_id, op, row); - self.state_store.insert(key, value, None)?; - } - Ok(()) - } - - pub async fn flush_current_epoch( - &mut self, - next_epoch: u64, - truncate_offset: Option, - ) -> LogStoreResult<()> { - // Because barrier has no effect on subscription, barrier will not be inserted here - let watermark = truncate_offset.map(|truncate_offset| { - VnodeWatermark::new( - self.serde.vnodes().clone(), - self.serde - .serialize_truncation_offset_watermark(truncate_offset), - ) - }); - self.state_store.flush().await?; - let watermark = watermark.into_iter().collect_vec(); - self.state_store.seal_current_epoch( - next_epoch, - SealCurrentEpochOptions { - table_watermarks: Some((WatermarkDirection::Ascending, watermark)), - switch_op_consistency_level: None, - }, - ); - self.seq_id = FIRST_SEQ_ID; - Ok(()) - } - - pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { - self.serde.update_vnode_bitmap(new_vnodes.clone()); - self.state_store.update_vnode_bitmap(new_vnodes); - Ok(()) - } -} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 015956bec8df2..6d5c5867060de 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -91,7 +91,6 @@ mod sort_buffer; pub mod source; mod stateless_simple_agg; mod stream_reader; -mod subscription; pub mod subtask; mod temporal_join; mod top_n; @@ -141,7 +140,6 @@ pub use simple_agg::SimpleAggExecutor; pub use sink::SinkExecutor; pub use sort::*; pub use stateless_simple_agg::StatelessSimpleAggExecutor; -pub use subscription::SubscriptionExecutor; pub use temporal_join::*; pub use top_n::{ AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor, diff --git a/src/stream/src/executor/subscription.rs b/src/stream/src/executor/subscription.rs deleted file mode 100644 index 4bacb2dc75f05..0000000000000 --- a/src/stream/src/executor/subscription.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::str::FromStr; -use core::time::Duration; -use std::collections::HashMap; - -use risingwave_common::types::{Interval, Timestamptz}; -use risingwave_common::util::epoch::Epoch; -use risingwave_storage::store::LocalStateStore; -use tokio::time::Instant; - -use crate::common::log_store_impl::kv_log_store::ReaderTruncationOffsetType; -use crate::common::log_store_impl::subscription_log_store::SubscriptionLogStoreWriter; -use crate::executor::prelude::*; - -const EXECUTE_GC_INTERVAL: u64 = 3600; -const MAX_RETENTION_DAYS: i32 = 365; -const RETENTION_SECONDS_KEY: &str = "retention"; - -pub struct SubscriptionExecutor { - actor_context: ActorContextRef, - input: Executor, - log_store: SubscriptionLogStoreWriter, - retention_seconds: i64, -} - -impl SubscriptionExecutor { - #[allow(clippy::too_many_arguments)] - #[expect(clippy::unused_async)] - pub async fn new( - actor_context: ActorContextRef, - input: Executor, - log_store: SubscriptionLogStoreWriter, - properties: HashMap, - ) -> StreamExecutorResult { - let retention_seconds_str = properties.get(RETENTION_SECONDS_KEY).ok_or_else(|| { - StreamExecutorError::serde_error("Subscription retention time not set.".to_string()) - })?; - let retention_seconds_interval = - Interval::from_str(retention_seconds_str).map_err(|_| { - StreamExecutorError::serde_error( - "Retention needs to be set in Interval format".to_string(), - ) - })?; - if retention_seconds_interval.days() > MAX_RETENTION_DAYS { - return Err(StreamExecutorError::serde_error(format!( - "Retention time cannot exceed {} days", - MAX_RETENTION_DAYS - ))); - } - let retention_seconds = (retention_seconds_interval.epoch_in_micros() / 1000000) as i64; - - Ok(Self { - actor_context, - input, - log_store, - retention_seconds, - }) - } - - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(mut self) { - let mut input = self.input.execute(); - - let barrier = expect_first_barrier(&mut input).await?; - self.log_store.init(barrier.epoch, false).await?; - - // The first barrier message should be propagated. - yield Message::Barrier(barrier); - - let mut next_truncate_time = Instant::now() + Duration::from_secs(EXECUTE_GC_INTERVAL); - - #[for_await] - for msg in input { - let msg = msg?; - yield match msg { - Message::Watermark(w) => Message::Watermark(w), - Message::Chunk(chunk) => { - if chunk.cardinality() == 0 { - // empty chunk - continue; - } - self.log_store.write_chunk(chunk.clone())?; - Message::Chunk(chunk) - } - Message::Barrier(barrier) => { - let truncate_offset: Option = if next_truncate_time - < Instant::now() - { - let truncate_timestamptz = Timestamptz::from_secs(barrier.get_curr_epoch().as_timestamptz().timestamp() - self.retention_seconds).ok_or_else(||{StreamExecutorError::from("Subscription retention time calculation error: timestamp is out of range.".to_string())})?; - let epoch = - Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64); - next_truncate_time = - Instant::now() + Duration::from_secs(EXECUTE_GC_INTERVAL); - Some((epoch.0, None)) - } else { - None - }; - self.log_store - .flush_current_epoch(barrier.epoch.curr, truncate_offset) - .await?; - - if let Some(vnode_bitmap) = - barrier.as_update_vnode_bitmap(self.actor_context.id) - { - self.log_store.update_vnode_bitmap(vnode_bitmap)?; - } - Message::Barrier(barrier) - } - } - } - } -} -impl Execute for SubscriptionExecutor { - fn execute(self: Box) -> BoxedMessageStream { - self.execute_inner().boxed() - } -} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 4c3323e864210..ba433595718a3 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -46,7 +46,6 @@ mod source_backfill; mod stateless_simple_agg; mod stream_cdc_scan; mod stream_scan; -mod subscription; mod temporal_join; mod top_n; mod union; @@ -96,7 +95,6 @@ use self::union::*; use self::watermark_filter::WatermarkFilterBuilder; use crate::error::StreamResult; use crate::executor::{Execute, Executor, ExecutorInfo}; -use crate::from_proto::subscription::SubscriptionExecutorBuilder; use crate::from_proto::values::ValuesExecutorBuilder; use crate::task::ExecutorParams; @@ -149,7 +147,6 @@ pub async fn create_executor( NodeBody::BatchPlan => BatchQueryExecutorBuilder, NodeBody::Merge => MergeExecutorBuilder, NodeBody::Materialize => MaterializeExecutorBuilder, - NodeBody::Subscription => SubscriptionExecutorBuilder, NodeBody::Filter => FilterExecutorBuilder, NodeBody::CdcFilter => CdcFilterExecutorBuilder, NodeBody::Arrange => ArrangeExecutorBuilder, diff --git a/src/stream/src/from_proto/subscription.rs b/src/stream/src/from_proto/subscription.rs deleted file mode 100644 index a7e8b79b16c10..0000000000000 --- a/src/stream/src/from_proto/subscription.rs +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_pb::stream_plan::SubscriptionNode; -use risingwave_storage::store::{NewLocalOptions, OpConsistencyLevel}; - -use super::ExecutorBuilder; -use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; -use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO; -use crate::common::log_store_impl::subscription_log_store::SubscriptionLogStoreWriter; -use crate::error::StreamResult; -use crate::executor::{Executor, SubscriptionExecutor}; - -pub struct SubscriptionExecutorBuilder; - -impl ExecutorBuilder for SubscriptionExecutorBuilder { - type Node = SubscriptionNode; - - async fn new_boxed_executor( - params: crate::task::ExecutorParams, - node: &Self::Node, - state_store: impl risingwave_storage::StateStore, - ) -> StreamResult { - let [input]: [_; 1] = params.input.try_into().unwrap(); - let table_id = TableId::new(node.log_store_table.as_ref().unwrap().id); - let vnodes = std::sync::Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for subscription"), - ); - let serde = LogStoreRowSerde::new( - node.log_store_table.as_ref().unwrap(), - Some(vnodes), - &KV_LOG_STORE_V2_INFO, - ); - - let local_state_store = state_store - .new_local(NewLocalOptions { - table_id: TableId { - table_id: node.log_store_table.as_ref().unwrap().id, - }, - op_consistency_level: OpConsistencyLevel::Inconsistent, - table_option: TableOption { - retention_seconds: None, - }, - is_replicated: false, - vnodes: serde.vnodes().clone(), - }) - .await; - - let log_store_identity = format!( - "subscription[{}]-executor[{}]", - node.subscription_catalog.as_ref().unwrap().id, - params.executor_id - ); - let log_store = - SubscriptionLogStoreWriter::new(table_id, local_state_store, serde, log_store_identity); - let exec = SubscriptionExecutor::new( - params.actor_context, - input, - log_store, - node.subscription_catalog - .as_ref() - .unwrap() - .properties - .clone(), - ) - .await?; - Ok((params.info, exec).into()) - } -}