From 997cc3e04f494c5d174aee01a6664e882e632644 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 26 Jun 2024 14:20:39 -0500 Subject: [PATCH 1/7] init --- proto/batch_plan.proto | 2 + proto/catalog.proto | 8 ++- proto/plan_common.proto | 2 + proto/stream_plan.proto | 5 ++ src/common/src/catalog/external_table.rs | 1 + src/connector/src/sink/catalog/desc.rs | 5 +- src/connector/src/sink/catalog/mod.rs | 7 +-- src/ctl/src/cmd_impl/meta/migration.rs | 38 +++++++++++--- src/frontend/src/catalog/source_catalog.rs | 1 + .../optimizer/plan_node/batch_iceberg_scan.rs | 1 + .../optimizer/plan_node/batch_kafka_scan.rs | 1 + .../src/optimizer/plan_node/batch_source.rs | 1 + .../optimizer/plan_node/stream_fs_fetch.rs | 1 + .../src/optimizer/plan_node/stream_source.rs | 1 + .../optimizer/plan_node/stream_source_scan.rs | 1 + src/meta/model_v2/src/sink.rs | 2 +- src/meta/model_v2/src/source.rs | 2 +- src/meta/src/controller/mod.rs | 7 ++- src/meta/src/controller/streaming_job.rs | 13 +++-- src/meta/src/manager/catalog/database.rs | 24 ++++++++- src/meta/src/manager/catalog/mod.rs | 19 ++++++- src/meta/src/manager/catalog/utils.rs | 50 +++++++++++++++++++ src/meta/src/manager/streaming_job.rs | 39 +++++++++++++++ src/meta/src/rpc/ddl_controller.rs | 2 +- 24 files changed, 210 insertions(+), 23 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8a196eecf5021..592b60a5ab0ac 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -7,6 +7,7 @@ import "common.proto"; import "data.proto"; import "expr.proto"; import "plan_common.proto"; +import "secret.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -61,6 +62,7 @@ message SourceNode { map with_properties = 3; repeated bytes split = 4; catalog.StreamSourceInfo info = 5; + map secret_refs = 6; } message ProjectNode { diff --git a/proto/catalog.proto b/proto/catalog.proto index 86881c9bc547b..b7f905af25fcc 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -85,7 +85,8 @@ message StreamSourceInfo { map format_encode_options = 14; // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type. - map secret_ref = 16; + // For format and encode options. + map format_encode_secret_refs = 16; } message Source { @@ -123,6 +124,8 @@ message Source { // Cluster version (tracked by git commit) when initialized/created optional string initialized_at_cluster_version = 17; optional string created_at_cluster_version = 18; + // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id. + map secret_refs = 19; // Per-source catalog version, used by schema change. uint64 version = 100; @@ -141,6 +144,7 @@ message SinkFormatDesc { plan_common.EncodeType encode = 2; map options = 3; optional plan_common.EncodeType key_encode = 4; + map secret_refs = 5; } // the catalog of the sink. There are two kind of schema here. The full schema is all columns @@ -182,7 +186,7 @@ message Sink { CreateType create_type = 24; // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type. - map secret_ref = 25; + map secret_refs = 25; } message Subscription { diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 31718ed9ac5cc..7b763c24a44a8 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -5,6 +5,7 @@ package plan_common; import "common.proto"; import "data.proto"; import "expr.proto"; +import "secret.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -108,6 +109,7 @@ message ExternalTableDesc { map connect_properties = 6; // upstream cdc source job id uint32 source_id = 7; + map secret_refs = 8; } enum JoinType { diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 75b3d6f177dd0..35fd2810b62a7 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -8,6 +8,7 @@ import "data.proto"; import "expr.proto"; import "plan_common.proto"; import "source.proto"; +import "secret.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -188,6 +189,7 @@ message StreamSource { string source_name = 8; // Streaming rate limit optional uint32 rate_limit = 9; + map secret_refs = 10; } // copy contents from StreamSource to prevent compatibility issues in the future @@ -203,6 +205,7 @@ message StreamFsFetch { string source_name = 8; // Streaming rate limit optional uint32 rate_limit = 9; + map secret_refs = 10; } // The executor only for receiving barrier from the meta service. It always resides in the leaves @@ -233,6 +236,7 @@ message SourceBackfillNode { // `| partition_id | backfill_progress |` catalog.Table state_table = 8; + map secret_refs = 9; } message SinkDesc { @@ -254,6 +258,7 @@ message SinkDesc { catalog.SinkFormatDesc format_desc = 13; optional uint32 target_table = 14; optional uint64 extra_partition_col_idx = 15; + map secret_refs = 16; } enum SinkLogStoreType { diff --git a/src/common/src/catalog/external_table.rs b/src/common/src/catalog/external_table.rs index f9c6448358066..797d789b85fc0 100644 --- a/src/common/src/catalog/external_table.rs +++ b/src/common/src/catalog/external_table.rs @@ -65,6 +65,7 @@ impl CdcTableDesc { table_name: self.external_table_name.clone(), stream_key: self.stream_key.iter().map(|k| *k as _).collect(), connect_properties: self.connect_properties.clone(), + secret_refs: Default::default(), } } diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 62ea42b4727b0..64e618bdd933e 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -98,7 +98,8 @@ impl SinkDesc { distribution_key: self.distribution_key, owner, dependent_relations, - properties: self.properties.into_iter().collect(), + properties: self.properties, + secret_refs: secret_ref, sink_type: self.sink_type, format_desc: self.format_desc, connection_id, @@ -110,7 +111,6 @@ impl SinkDesc { created_at_cluster_version: None, initialized_at_cluster_version: None, create_type: self.create_type, - secret_ref, } } @@ -134,6 +134,7 @@ impl SinkDesc { sink_from_name: self.sink_from_name.clone(), target_table: self.target_table.map(|table_id| table_id.table_id()), extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64), + secret_refs: Default::default(), } } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 5adda694703c6..cb814154c8222 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -203,6 +203,7 @@ impl SinkFormatDesc { encode: encode.into(), options, key_encode, + secret_refs: Default::default(), } } } @@ -340,7 +341,7 @@ pub struct SinkCatalog { pub create_type: CreateType, /// The secret reference for the sink, mapping from property name to secret id. - pub secret_ref: BTreeMap, + pub secret_refs: BTreeMap, } impl SinkCatalog { @@ -382,7 +383,7 @@ impl SinkCatalog { created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), create_type: self.create_type.to_proto() as i32, - secret_ref: self.secret_ref.clone(), + secret_refs: self.secret_refs.clone(), } } @@ -476,7 +477,7 @@ impl From for SinkCatalog { initialized_at_cluster_version: pb.initialized_at_cluster_version, created_at_cluster_version: pb.created_at_cluster_version, create_type: CreateType::from_proto(create_type), - secret_ref: pb.secret_ref, + secret_refs: pb.secret_refs, } } } diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 93be066d4e727..2d2af6d006375 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -489,15 +489,35 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an } println!("table fragments migrated"); + let mut object_dependencies = vec![]; + // catalogs. // source if !sources.is_empty() { let source_models: Vec = sources .into_iter() .map(|mut src| { + let mut dependent_secret_refs = vec![]; if let Some(id) = src.connection_id.as_mut() { *id = *connection_rewrite.get(id).unwrap(); } + for secret_ref in src.secret_refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + dependent_secret_refs.push(secret_ref.secret_id); + } + if let Some(info) = &mut src.info { + for secret_ref in info.format_encode_secret_refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + dependent_secret_refs.push(secret_ref.secret_id); + } + } + object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| { + object_dependency::ActiveModel { + id: NotSet, + oid: Set(secret_id as _), + used_by: Set(src.id as _), + } + })); src.into() }) .collect(); @@ -507,8 +527,6 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an } println!("sources migrated"); - let mut object_dependencies = vec![]; - // table for table in tables { let job_id = if table.table_type() == PbTableType::Internal { @@ -554,13 +572,21 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an if let Some(id) = s.connection_id.as_mut() { *id = *connection_rewrite.get(id).unwrap(); } - for secret_id in s.secret_ref.values_mut() { - secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap(); + let mut dependent_secret_refs = vec![]; + for secret_ref in s.secret_refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + dependent_secret_refs.push(secret_ref.secret_id); + } + if let Some(desc) = &mut s.format_desc { + for secret_ref in desc.secret_refs.values_mut() { + secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); + dependent_secret_refs.push(secret_ref.secret_id); + } } - object_dependencies.extend(s.secret_ref.values().map(|id| { + object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| { object_dependency::ActiveModel { id: NotSet, - oid: Set(id.secret_id as _), + oid: Set(secret_id as _), used_by: Set(s.id as _), } })); diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index 17292b1324ed2..1ee095a918e5d 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -77,6 +77,7 @@ impl SourceCatalog { version: self.version, created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), + secret_refs: Default::default(), } } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 699d32c5a2bf5..0ac2451759543 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -109,6 +109,7 @@ impl ToBatchPb for BatchIcebergScan { .collect(), with_properties: source_catalog.with_properties.clone().into_iter().collect(), split: vec![], + secret_refs: Default::default(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs index 423fe2a6771ee..8dca305121399 100644 --- a/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs @@ -131,6 +131,7 @@ impl ToBatchPb for BatchKafkaScan { .collect(), with_properties: source_catalog.with_properties.clone().into_iter().collect(), split: vec![], + secret_refs: Default::default(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 3836940be745e..ac5e25d703636 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -113,6 +113,7 @@ impl ToBatchPb for BatchSource { .collect(), with_properties: source_catalog.with_properties.clone().into_iter().collect(), split: vec![], + secret_refs: Default::default(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 931fbe045b78d..43c14686e2a22 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -115,6 +115,7 @@ impl StreamNode for StreamFsFetch { .collect_vec(), with_properties: source_catalog.with_properties.clone().into_iter().collect(), rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + secret_refs: Default::default(), }); NodeBody::StreamFsFetch(StreamFsFetchNode { node_inner: source_inner, diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 40dbbc469f52e..83ad14886872a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -109,6 +109,7 @@ impl StreamNode for StreamSource { .collect_vec(), with_properties: source_catalog.with_properties.clone().into_iter().collect(), rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + secret_refs: Default::default(), }); PbNodeBody::Source(SourceNode { source_inner }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 6d1d75d8e46aa..0449648798ea7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -157,6 +157,7 @@ impl StreamSourceScan { .collect_vec(), with_properties: source_catalog.with_properties.clone().into_iter().collect(), rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + secret_refs: Default::default(), }; let fields = self.schema().to_prost(); diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 25d6293b0b120..ad72aa3df981c 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -129,7 +129,7 @@ impl From for ActiveModel { sink_from_name: Set(pb_sink.sink_from_name), sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())), target_table: Set(pb_sink.target_table.map(|x| x as _)), - secret_ref: Set(Some(SecretRef::from(pb_sink.secret_ref))), + secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))), } } } diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index a90f399e4b8cc..9dd4bb5b1a11a 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -103,7 +103,7 @@ impl From for ActiveModel { optional_associated_table_id: Set(optional_associated_table_id), connection_id: Set(source.connection_id.map(|id| id as _)), version: Set(source.version as _), - secret_ref: Set(None), + secret_ref: Set(Some(SecretRef::from(source.secret_refs))), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index b6997240f7723..5ace222739d52 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -167,6 +167,10 @@ impl From> for PbTable { impl From> for PbSource { fn from(value: ObjectModel) -> Self { + let mut secret_ref_map = BTreeMap::new(); + if let Some(secret_ref) = value.0.secret_ref { + secret_ref_map = secret_ref.to_protobuf(); + } Self { id: value.0.source_id as _, schema_id: value.1.schema_id.unwrap() as _, @@ -195,6 +199,7 @@ impl From> for PbSource { .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, + secret_refs: secret_ref_map, } } } @@ -234,7 +239,7 @@ impl From> for PbSink { initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, create_type: PbCreateType::Foreground as _, - secret_ref: secret_ref_map, + secret_refs: secret_ref_map, } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 3e2c5db4776db..91f75d1b2ef84 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -288,11 +288,18 @@ impl CatalogController { } } + // get dependent secret ref. + // XXX: A relation can ref a secret more than 1 time. + let dependent_secret_refs = streaming_job.dependent_secret_refs(); + + let dependent_objs = dependent_relations + .iter() + .chain(dependent_secret_refs.iter()); // record object dependency. - if !dependent_relations.is_empty() { - ObjectDependency::insert_many(dependent_relations.into_iter().map(|id| { + if !dependent_secret_refs.is_empty() || !dependent_relations.is_empty() { + ObjectDependency::insert_many(dependent_objs.map(|id| { object_dependency::ActiveModel { - oid: Set(id as _), + oid: Set(*id as _), used_by: Set(streaming_job.id() as _), ..Default::default() } diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 5911188e2bfb5..f0bb42e19aeb9 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -112,7 +112,7 @@ impl DatabaseManager { let mut relation_ref_count = HashMap::new(); let mut connection_ref_count = HashMap::new(); - let mut _secret_ref_count = HashMap::new(); + let mut secret_ref_count = HashMap::new(); let databases = BTreeMap::from_iter( databases @@ -126,6 +126,14 @@ impl DatabaseManager { } (source.id, source) })); + for source in sources.values() { + for secret_ref in source.get_secret_refs().values() { + *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; + } + for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { + *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; + } + } let sinks = BTreeMap::from_iter(sinks.into_iter().map(|sink| { for depend_relation_id in &sink.dependent_relations { *relation_ref_count.entry(*depend_relation_id).or_default() += 1; @@ -133,6 +141,14 @@ impl DatabaseManager { if let Some(connection_id) = sink.connection_id { *connection_ref_count.entry(connection_id).or_default() += 1; } + for secret_ref in sink.get_secret_refs().values() { + *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; + } + if let Some(format_desc) = &sink.format_desc { + for secret_ref in format_desc.get_secret_refs().values() { + *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; + } + } (sink.id, sink) })); let subscriptions = BTreeMap::from_iter(subscriptions.into_iter().map(|subscription| { @@ -174,7 +190,7 @@ impl DatabaseManager { relation_ref_count, connection_ref_count, secrets, - secret_ref_count: _secret_ref_count, + secret_ref_count, in_progress_creation_tracker: HashSet::default(), in_progress_creation_streaming_job: HashMap::default(), in_progress_creating_tables: HashMap::default(), @@ -353,6 +369,10 @@ impl DatabaseManager { self.tables.values().cloned().collect_vec() } + pub fn list_secrets(&self) -> Vec { + self.secrets.values().cloned().collect_vec() + } + pub fn get_table(&self, table_id: TableId) -> Option<&Table> { self.tables.get(&table_id) } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 9ba9333cee8e1..ede32fb1879f7 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -128,6 +128,10 @@ use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::{Relation, RelationGroup}; pub(crate) use {commit_meta, commit_meta_with_trx}; +use self::utils::{ + refcnt_dec_sink_secret_ref, refcnt_dec_source_secret_ref, refcnt_inc_sink_secret_ref, + refcnt_inc_source_secret_ref, +}; use crate::controller::rename::{ alter_relation_rename, alter_relation_rename_refs, ReplaceTableExprRewriter, }; @@ -1800,6 +1804,7 @@ impl CatalogManager { for source in &sources_removed { user_core.decrease_ref(source.owner); refcnt_dec_connection(database_core, source.connection_id); + refcnt_dec_source_secret_ref(database_core, source)?; } for view in &views_removed { @@ -1837,6 +1842,7 @@ impl CatalogManager { for dependent_relation_id in &sink.dependent_relations { database_core.decrease_relation_ref_count(*dependent_relation_id); } + refcnt_dec_sink_secret_ref(database_core, sink); } for subscription in &subscriptions_removed { @@ -2840,6 +2846,7 @@ impl CatalogManager { } else { database_core.mark_creating(&key); user_core.increase_ref(source.owner); + refcnt_inc_source_secret_ref(database_core, source)?; // We have validate the status of connection before starting the procedure. refcnt_inc_connection(database_core, source.connection_id)?; Ok(()) @@ -2915,6 +2922,7 @@ impl CatalogManager { database_core.unmark_creating(&key); user_core.decrease_ref(source.owner); refcnt_dec_connection(database_core, source.connection_id); + refcnt_dec_source_secret_ref(database_core, source)?; Ok(()) } @@ -2943,6 +2951,7 @@ impl CatalogManager { database_core.mark_creating(&source_key); database_core.mark_creating(&mview_key); database_core.mark_creating_streaming_job(table.id, mview_key); + refcnt_inc_source_secret_ref(database_core, source)?; ensure!(table.dependent_relations.is_empty()); // source and table user_core.increase_ref_count(source.owner, 2); @@ -3020,7 +3029,11 @@ impl CatalogManager { Ok(version) } - pub async fn cancel_create_table_procedure_with_source(&self, source: &Source, table: &Table) { + pub async fn cancel_create_table_procedure_with_source( + &self, + source: &Source, + table: &Table, + ) -> MetaResult<()> { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; @@ -3037,6 +3050,8 @@ impl CatalogManager { database_core.unmark_creating_streaming_job(table.id); user_core.decrease_ref_count(source.owner, 2); // source and table refcnt_dec_connection(database_core, source.connection_id); + refcnt_dec_source_secret_ref(database_core, source)?; + Ok(()) } pub async fn start_create_index_procedure( @@ -3173,6 +3188,7 @@ impl CatalogManager { for &dependent_relation_id in &sink.dependent_relations { database_core.increase_relation_ref_count(dependent_relation_id); } + refcnt_inc_sink_secret_ref(database_core, sink); user_core.increase_ref(sink.owner); // We have validate the status of connection before starting the procedure. refcnt_inc_connection(database_core, sink.connection_id)?; @@ -3249,6 +3265,7 @@ impl CatalogManager { } user_core.decrease_ref(sink.owner); refcnt_dec_connection(database_core, sink.connection_id); + refcnt_dec_sink_secret_ref(database_core, sink); if let Some((table, source)) = target_table { Self::cancel_replace_table_procedure_inner(source, table, core); diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index 4d4257ec5a7e5..9acba0c637e69 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -13,8 +13,10 @@ // limitations under the License. use risingwave_common::bail; +use risingwave_pb::catalog::{Sink, Source}; use crate::manager::{ConnectionId, DatabaseManager}; +use crate::MetaResult; pub fn refcnt_inc_connection( database_mgr: &mut DatabaseManager, @@ -38,3 +40,51 @@ pub fn refcnt_dec_connection( database_mgr.decrease_connection_ref_count(connection_id); } } + +pub fn refcnt_inc_source_secret_ref( + database_mgr: &mut DatabaseManager, + source: &Source, +) -> MetaResult<()> { + for secret_ref in source.get_secret_refs().values() { + database_mgr.increase_secret_ref_count(secret_ref.secret_id) + } + for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { + database_mgr.increase_secret_ref_count(secret_ref.secret_id) + } + Ok(()) +} + +pub fn refcnt_dec_source_secret_ref( + database_mgr: &mut DatabaseManager, + source: &Source, +) -> MetaResult<()> { + for secret_ref in source.get_secret_refs().values() { + database_mgr.decrease_secret_ref_count(secret_ref.secret_id) + } + for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { + database_mgr.decrease_secret_ref_count(secret_ref.secret_id) + } + Ok(()) +} + +pub fn refcnt_inc_sink_secret_ref(database_mgr: &mut DatabaseManager, sink: &Sink) { + for secret_ref in sink.get_secret_refs().values() { + database_mgr.increase_secret_ref_count(secret_ref.secret_id) + } + if let Some(format_desc) = &sink.format_desc { + for secret_ref in format_desc.get_secret_refs().values() { + database_mgr.increase_secret_ref_count(secret_ref.secret_id) + } + } +} + +pub fn refcnt_dec_sink_secret_ref(database_mgr: &mut DatabaseManager, sink: &Sink) { + for secret_ref in sink.get_secret_refs().values() { + database_mgr.decrease_secret_ref_count(secret_ref.secret_id) + } + if let Some(format_desc) = &sink.format_desc { + for secret_ref in format_desc.get_secret_refs().values() { + database_mgr.decrease_secret_ref_count(secret_ref.secret_id) + } + } +} diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index a9a159e6a4535..ddd34bc896211 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -285,6 +285,45 @@ impl StreamingJob { } } + pub fn dependent_secret_refs(&self) -> Vec { + match self { + StreamingJob::Sink(sink, _) => { + let mut dependents = vec![]; + for secret_ref in sink.secret_refs.values() { + dependents.push(secret_ref.secret_id); + } + if let Some(format_desc) = &sink.format_desc { + for secret_ref in format_desc.secret_refs.values() { + dependents.push(secret_ref.secret_id); + } + } + dependents + } + StreamingJob::Table(source, _, _) => { + let mut dependents = vec![]; + if let Some(source) = source { + for secret_ref in source.secret_refs.values() { + dependents.push(secret_ref.secret_id); + } + if let Some(info) = source.info.as_ref() { + for secret_ref in info.format_encode_secret_refs.values() { + dependents.push(secret_ref.secret_id); + } + } + } + dependents + } + StreamingJob::Source(source) => source + .secret_refs + .values() + .map(|secret_ref| secret_ref.secret_id) + .collect(), + StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => { + vec![] + } + } + } + pub fn is_source_job(&self) -> bool { matches!(self, StreamingJob::Source(_)) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index cc5d7dd8971ad..2ec48306c86f7 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1721,7 +1721,7 @@ impl DdlController { if let Some(source) = source { mgr.catalog_manager .cancel_create_table_procedure_with_source(source, table) - .await; + .await?; } else { let result = mgr .catalog_manager From 6ad73a1776ab9da869c87ed1394ac22414c561e0 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 26 Jun 2024 14:38:05 -0500 Subject: [PATCH 2/7] fix --- proto/stream_plan.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 35fd2810b62a7..a06c01075e76a 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -7,8 +7,8 @@ import "common.proto"; import "data.proto"; import "expr.proto"; import "plan_common.proto"; -import "source.proto"; import "secret.proto"; +import "source.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; From 2de9d553e667f377fed1bc36b8294bf3959efe82 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 27 Jun 2024 12:07:29 -0500 Subject: [PATCH 3/7] comments --- proto/catalog.proto | 3 +++ 1 file changed, 3 insertions(+) diff --git a/proto/catalog.proto b/proto/catalog.proto index b7f905af25fcc..8a360418159b3 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -125,6 +125,7 @@ message Source { optional string initialized_at_cluster_version = 17; optional string created_at_cluster_version = 18; // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id. + // Used for secret connect options. map secret_refs = 19; // Per-source catalog version, used by schema change. @@ -144,6 +145,7 @@ message SinkFormatDesc { plan_common.EncodeType encode = 2; map options = 3; optional plan_common.EncodeType key_encode = 4; + // Secret used for format encode options. map secret_refs = 5; } @@ -186,6 +188,7 @@ message Sink { CreateType create_type = 24; // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type. + // Used for connect options. map secret_refs = 25; } From 3b16f7d92c3b3b0bff38388fdf15b3f4e638491e Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 28 Jun 2024 15:16:01 -0500 Subject: [PATCH 4/7] sql backend ref count once --- src/ctl/src/cmd_impl/meta/migration.rs | 2 + src/meta/src/controller/streaming_job.rs | 3 +- src/meta/src/manager/catalog/database.rs | 17 ++----- src/meta/src/manager/catalog/mod.rs | 1 + src/meta/src/manager/catalog/utils.rs | 60 ++++++++++++++---------- src/meta/src/manager/streaming_job.rs | 40 ++++------------ 6 files changed, 54 insertions(+), 69 deletions(-) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 2d2af6d006375..b061df706689b 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -511,6 +511,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an dependent_secret_refs.push(secret_ref.secret_id); } } + dependent_secret_refs.dedup(); object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| { object_dependency::ActiveModel { id: NotSet, @@ -583,6 +584,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an dependent_secret_refs.push(secret_ref.secret_id); } } + dependent_secret_refs.dedup(); object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| { object_dependency::ActiveModel { id: NotSet, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 91f75d1b2ef84..42cf44f184cf7 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -289,8 +289,7 @@ impl CatalogController { } // get dependent secret ref. - // XXX: A relation can ref a secret more than 1 time. - let dependent_secret_refs = streaming_job.dependent_secret_refs(); + let dependent_secret_refs = streaming_job.dependent_secret_refs()?; let dependent_objs = dependent_relations .iter() diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index f0bb42e19aeb9..a5ca2604b5309 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -27,6 +27,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::data::DataType; use risingwave_pb::user::grant_privilege::PbObject; +use super::utils::{get_refed_secret_ids_from_sink, get_refed_secret_ids_from_source}; use super::{ ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, ViewId, @@ -127,11 +128,8 @@ impl DatabaseManager { (source.id, source) })); for source in sources.values() { - for secret_ref in source.get_secret_refs().values() { - *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; - } - for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { - *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; + for secret_id in get_refed_secret_ids_from_source(source)? { + *secret_ref_count.entry(secret_id).or_default() += 1; } } let sinks = BTreeMap::from_iter(sinks.into_iter().map(|sink| { @@ -141,13 +139,8 @@ impl DatabaseManager { if let Some(connection_id) = sink.connection_id { *connection_ref_count.entry(connection_id).or_default() += 1; } - for secret_ref in sink.get_secret_refs().values() { - *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; - } - if let Some(format_desc) = &sink.format_desc { - for secret_ref in format_desc.get_secret_refs().values() { - *secret_ref_count.entry(secret_ref.secret_id).or_default() += 1; - } + for secret_id in get_refed_secret_ids_from_sink(&sink) { + *secret_ref_count.entry(secret_id).or_default() += 1; } (sink.id, sink) })); diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index ede32fb1879f7..9eee579d270a4 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -46,6 +46,7 @@ use risingwave_pb::user::{GrantPrivilege, UserInfo}; use tokio::sync::{Mutex, MutexGuard}; use user::*; +pub use self::utils::{get_refed_secret_ids_from_sink, get_refed_secret_ids_from_source}; use crate::manager::{ IdCategory, MetaSrvEnv, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION, }; diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index 9acba0c637e69..8bcd3aa66bdd0 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -41,15 +41,40 @@ pub fn refcnt_dec_connection( } } +pub fn get_refed_secret_ids_from_source(source: &Source) -> MetaResult> { + let mut secret_ids = Vec::new(); + for secret_ref in source.get_secret_refs().values() { + secret_ids.push(secret_ref.secret_id); + } + // `info` must exist in `Source` + for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { + secret_ids.push(secret_ref.secret_id); + } + secret_ids.dedup(); + Ok(secret_ids) +} + +pub fn get_refed_secret_ids_from_sink(sink: &Sink) -> Vec { + let mut secret_ids = Vec::new(); + for secret_ref in sink.get_secret_refs().values() { + secret_ids.push(secret_ref.secret_id); + } + // `format_desc` may not exist in `Sink` + if let Some(format_desc) = &sink.format_desc { + for secret_ref in format_desc.get_secret_refs().values() { + secret_ids.push(secret_ref.secret_id); + } + } + secret_ids.dedup(); + secret_ids +} + pub fn refcnt_inc_source_secret_ref( database_mgr: &mut DatabaseManager, source: &Source, ) -> MetaResult<()> { - for secret_ref in source.get_secret_refs().values() { - database_mgr.increase_secret_ref_count(secret_ref.secret_id) - } - for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { - database_mgr.increase_secret_ref_count(secret_ref.secret_id) + for secret_id in get_refed_secret_ids_from_source(source)? { + database_mgr.increase_secret_ref_count(secret_id); } Ok(()) } @@ -58,33 +83,20 @@ pub fn refcnt_dec_source_secret_ref( database_mgr: &mut DatabaseManager, source: &Source, ) -> MetaResult<()> { - for secret_ref in source.get_secret_refs().values() { - database_mgr.decrease_secret_ref_count(secret_ref.secret_id) - } - for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { - database_mgr.decrease_secret_ref_count(secret_ref.secret_id) + for secret_id in get_refed_secret_ids_from_source(source)? { + database_mgr.decrease_secret_ref_count(secret_id); } Ok(()) } pub fn refcnt_inc_sink_secret_ref(database_mgr: &mut DatabaseManager, sink: &Sink) { - for secret_ref in sink.get_secret_refs().values() { - database_mgr.increase_secret_ref_count(secret_ref.secret_id) - } - if let Some(format_desc) = &sink.format_desc { - for secret_ref in format_desc.get_secret_refs().values() { - database_mgr.increase_secret_ref_count(secret_ref.secret_id) - } + for secret_id in get_refed_secret_ids_from_sink(sink) { + database_mgr.increase_secret_ref_count(secret_id); } } pub fn refcnt_dec_sink_secret_ref(database_mgr: &mut DatabaseManager, sink: &Sink) { - for secret_ref in sink.get_secret_refs().values() { - database_mgr.decrease_secret_ref_count(secret_ref.secret_id) - } - if let Some(format_desc) = &sink.format_desc { - for secret_ref in format_desc.get_secret_refs().values() { - database_mgr.decrease_secret_ref_count(secret_ref.secret_id) - } + for secret_id in get_refed_secret_ids_from_sink(sink) { + database_mgr.decrease_secret_ref_count(secret_id); } } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index ddd34bc896211..2a0b4848344af 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -19,7 +19,9 @@ use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; use strum::EnumDiscriminants; +use super::{get_refed_secret_ids_from_sink, get_refed_secret_ids_from_source}; use crate::model::FragmentId; +use crate::MetaResult; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. @@ -285,42 +287,18 @@ impl StreamingJob { } } - pub fn dependent_secret_refs(&self) -> Vec { + pub fn dependent_secret_refs(&self) -> MetaResult> { match self { - StreamingJob::Sink(sink, _) => { - let mut dependents = vec![]; - for secret_ref in sink.secret_refs.values() { - dependents.push(secret_ref.secret_id); - } - if let Some(format_desc) = &sink.format_desc { - for secret_ref in format_desc.secret_refs.values() { - dependents.push(secret_ref.secret_id); - } - } - dependents - } + StreamingJob::Sink(sink, _) => Ok(get_refed_secret_ids_from_sink(sink)), StreamingJob::Table(source, _, _) => { - let mut dependents = vec![]; if let Some(source) = source { - for secret_ref in source.secret_refs.values() { - dependents.push(secret_ref.secret_id); - } - if let Some(info) = source.info.as_ref() { - for secret_ref in info.format_encode_secret_refs.values() { - dependents.push(secret_ref.secret_id); - } - } + get_refed_secret_ids_from_source(source) + } else { + Ok(vec![]) } - dependents - } - StreamingJob::Source(source) => source - .secret_refs - .values() - .map(|secret_ref| secret_ref.secret_id) - .collect(), - StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => { - vec![] } + StreamingJob::Source(source) => get_refed_secret_ids_from_source(source), + StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(vec![]), } } From c21b125fded5c3caf0dc90230f79e2df0b8c6b84 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 28 Jun 2024 15:33:57 -0500 Subject: [PATCH 5/7] minor --- src/meta/src/manager/catalog/mod.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 9eee579d270a4..44481b0728e90 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1797,15 +1797,13 @@ impl CatalogManager { user_core.decrease_ref(index.owner); } - // `tables_removed` contains both index table and mv. + // `tables_removed` contains both index, table and mv. for table in &tables_removed { user_core.decrease_ref(table.owner); } for source in &sources_removed { user_core.decrease_ref(source.owner); - refcnt_dec_connection(database_core, source.connection_id); - refcnt_dec_source_secret_ref(database_core, source)?; } for view in &views_removed { @@ -1832,6 +1830,11 @@ impl CatalogManager { } } + for source in &sources_removed { + refcnt_dec_connection(database_core, source.connection_id); + refcnt_dec_source_secret_ref(database_core, source)?; + } + for view in &views_removed { for dependent_relation_id in &view.dependent_relations { database_core.decrease_relation_ref_count(*dependent_relation_id); @@ -2952,11 +2955,11 @@ impl CatalogManager { database_core.mark_creating(&source_key); database_core.mark_creating(&mview_key); database_core.mark_creating_streaming_job(table.id, mview_key); - refcnt_inc_source_secret_ref(database_core, source)?; ensure!(table.dependent_relations.is_empty()); // source and table user_core.increase_ref_count(source.owner, 2); + refcnt_inc_source_secret_ref(database_core, source)?; // We have validate the status of connection before starting the procedure. refcnt_inc_connection(database_core, source.connection_id)?; Ok(()) @@ -3189,8 +3192,8 @@ impl CatalogManager { for &dependent_relation_id in &sink.dependent_relations { database_core.increase_relation_ref_count(dependent_relation_id); } - refcnt_inc_sink_secret_ref(database_core, sink); user_core.increase_ref(sink.owner); + refcnt_inc_sink_secret_ref(database_core, sink); // We have validate the status of connection before starting the procedure. refcnt_inc_connection(database_core, sink.connection_id)?; Ok(()) From c827427d510c3da83a0ca8f9105740e26784fc74 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 1 Jul 2024 14:40:32 -0500 Subject: [PATCH 6/7] fix --- src/ctl/src/cmd_impl/meta/migration.rs | 20 +++++++++----------- src/meta/src/manager/catalog/utils.rs | 20 ++++++++++---------- src/meta/src/manager/streaming_job.rs | 8 +++++--- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index b061df706689b..614fcd6be225c 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::time::Duration; use anyhow::Context; @@ -497,22 +497,21 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an let source_models: Vec = sources .into_iter() .map(|mut src| { - let mut dependent_secret_refs = vec![]; + let mut dependent_secret_ids = HashSet::new(); if let Some(id) = src.connection_id.as_mut() { *id = *connection_rewrite.get(id).unwrap(); } for secret_ref in src.secret_refs.values_mut() { secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); - dependent_secret_refs.push(secret_ref.secret_id); + dependent_secret_ids.insert(secret_ref.secret_id); } if let Some(info) = &mut src.info { for secret_ref in info.format_encode_secret_refs.values_mut() { secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); - dependent_secret_refs.push(secret_ref.secret_id); + dependent_secret_ids.insert(secret_ref.secret_id); } } - dependent_secret_refs.dedup(); - object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| { + object_dependencies.extend(dependent_secret_ids.into_iter().map(|secret_id| { object_dependency::ActiveModel { id: NotSet, oid: Set(secret_id as _), @@ -573,19 +572,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an if let Some(id) = s.connection_id.as_mut() { *id = *connection_rewrite.get(id).unwrap(); } - let mut dependent_secret_refs = vec![]; + let mut dependent_secret_ids = HashSet::new(); for secret_ref in s.secret_refs.values_mut() { secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); - dependent_secret_refs.push(secret_ref.secret_id); + dependent_secret_ids.insert(secret_ref.secret_id); } if let Some(desc) = &mut s.format_desc { for secret_ref in desc.secret_refs.values_mut() { secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap(); - dependent_secret_refs.push(secret_ref.secret_id); + dependent_secret_ids.insert(secret_ref.secret_id); } } - dependent_secret_refs.dedup(); - object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| { + object_dependencies.extend(dependent_secret_ids.into_iter().map(|secret_id| { object_dependency::ActiveModel { id: NotSet, oid: Set(secret_id as _), diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index 8bcd3aa66bdd0..49d286230cb7d 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use risingwave_common::bail; use risingwave_pb::catalog::{Sink, Source}; @@ -41,31 +43,29 @@ pub fn refcnt_dec_connection( } } -pub fn get_refed_secret_ids_from_source(source: &Source) -> MetaResult> { - let mut secret_ids = Vec::new(); +pub fn get_refed_secret_ids_from_source(source: &Source) -> MetaResult> { + let mut secret_ids = HashSet::new(); for secret_ref in source.get_secret_refs().values() { - secret_ids.push(secret_ref.secret_id); + secret_ids.insert(secret_ref.secret_id); } // `info` must exist in `Source` for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() { - secret_ids.push(secret_ref.secret_id); + secret_ids.insert(secret_ref.secret_id); } - secret_ids.dedup(); Ok(secret_ids) } -pub fn get_refed_secret_ids_from_sink(sink: &Sink) -> Vec { - let mut secret_ids = Vec::new(); +pub fn get_refed_secret_ids_from_sink(sink: &Sink) -> HashSet { + let mut secret_ids = HashSet::new(); for secret_ref in sink.get_secret_refs().values() { - secret_ids.push(secret_ref.secret_id); + secret_ids.insert(secret_ref.secret_id); } // `format_desc` may not exist in `Sink` if let Some(format_desc) = &sink.format_desc { for secret_ref in format_desc.get_secret_refs().values() { - secret_ids.push(secret_ref.secret_id); + secret_ids.insert(secret_ref.secret_id); } } - secret_ids.dedup(); secret_ids } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 2a0b4848344af..90d0781174b5d 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use risingwave_common::catalog::TableVersionId; use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; @@ -287,18 +289,18 @@ impl StreamingJob { } } - pub fn dependent_secret_refs(&self) -> MetaResult> { + pub fn dependent_secret_refs(&self) -> MetaResult> { match self { StreamingJob::Sink(sink, _) => Ok(get_refed_secret_ids_from_sink(sink)), StreamingJob::Table(source, _, _) => { if let Some(source) = source { get_refed_secret_ids_from_source(source) } else { - Ok(vec![]) + Ok(HashSet::new()) } } StreamingJob::Source(source) => get_refed_secret_ids_from_source(source), - StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(vec![]), + StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()), } } From 5ecadd4f36b353e7b6592a47587e466164790a0e Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Tue, 2 Jul 2024 00:24:45 -0500 Subject: [PATCH 7/7] try RUST_MIN_STACK for ci --- ci/scripts/deterministic-e2e-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/deterministic-e2e-test.sh b/ci/scripts/deterministic-e2e-test.sh index cb23f1cd7f247..c561978e428aa 100755 --- a/ci/scripts/deterministic-e2e-test.sh +++ b/ci/scripts/deterministic-e2e-test.sh @@ -49,7 +49,7 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch" seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, fuzzing (pre-generated-queries)" -timeout 10m seq 64 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log' +timeout 10m seq 64 | parallel MADSIM_TEST_SEED={} RUST_MIN_STACK=4194304 './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, e2e extended mode test" seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -e 2> $LOGDIR/extended-{}.log && rm $LOGDIR/extended-{}.log'