Skip to content

Commit

Permalink
feat(secret): secret management (part 1) add secret ref protos and re…
Browse files Browse the repository at this point in the history
…ferent count in meta catalog (#17474)
  • Loading branch information
yuhao-su authored Jul 2, 2024
1 parent 5bfa202 commit 2dfad37
Show file tree
Hide file tree
Showing 25 changed files with 205 additions and 27 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, fuzzing (pre-generated-queries)"
timeout 10m seq 64 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'
timeout 10m seq 64 | parallel MADSIM_TEST_SEED={} RUST_MIN_STACK=4194304 './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, e2e extended mode test"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -e 2> $LOGDIR/extended-{}.log && rm $LOGDIR/extended-{}.log'
2 changes: 2 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "secret.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;
Expand Down Expand Up @@ -61,6 +62,7 @@ message SourceNode {
map<string, string> with_properties = 3;
repeated bytes split = 4;
catalog.StreamSourceInfo info = 5;
map<string, secret.SecretRef> secret_refs = 6;
}

message ProjectNode {
Expand Down
11 changes: 9 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ message StreamSourceInfo {
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, secret.SecretRef> secret_ref = 16;
// For format and encode options.
map<string, secret.SecretRef> format_encode_secret_refs = 16;
}

message Source {
Expand Down Expand Up @@ -123,6 +124,9 @@ message Source {
// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 17;
optional string created_at_cluster_version = 18;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
// Used for secret connect options.
map<string, secret.SecretRef> secret_refs = 19;

// Per-source catalog version, used by schema change.
uint64 version = 100;
Expand All @@ -141,6 +145,8 @@ message SinkFormatDesc {
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
optional plan_common.EncodeType key_encode = 4;
// Secret used for format encode options.
map<string, secret.SecretRef> secret_refs = 5;
}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
Expand Down Expand Up @@ -182,7 +188,8 @@ message Sink {
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, secret.SecretRef> secret_ref = 25;
// Used for connect options.
map<string, secret.SecretRef> secret_refs = 25;
}

message Subscription {
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package plan_common;
import "common.proto";
import "data.proto";
import "expr.proto";
import "secret.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;
Expand Down Expand Up @@ -108,6 +109,7 @@ message ExternalTableDesc {
map<string, string> connect_properties = 6;
// upstream cdc source job id
uint32 source_id = 7;
map<string, secret.SecretRef> secret_refs = 8;
}

enum JoinType {
Expand Down
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "secret.proto";
import "source.proto";

option java_package = "com.risingwave.proto";
Expand Down Expand Up @@ -188,6 +189,7 @@ message StreamSource {
string source_name = 8;
// Streaming rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}

// copy contents from StreamSource to prevent compatibility issues in the future
Expand All @@ -203,6 +205,7 @@ message StreamFsFetch {
string source_name = 8;
// Streaming rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}

// The executor only for receiving barrier from the meta service. It always resides in the leaves
Expand Down Expand Up @@ -233,6 +236,7 @@ message SourceBackfillNode {

// `| partition_id | backfill_progress |`
catalog.Table state_table = 8;
map<string, secret.SecretRef> secret_refs = 9;
}

message SinkDesc {
Expand All @@ -254,6 +258,7 @@ message SinkDesc {
catalog.SinkFormatDesc format_desc = 13;
optional uint32 target_table = 14;
optional uint64 extra_partition_col_idx = 15;
map<string, secret.SecretRef> secret_refs = 16;
}

enum SinkLogStoreType {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl CdcTableDesc {
table_name: self.external_table_name.clone(),
stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
connect_properties: self.connect_properties.clone(),
secret_refs: Default::default(),
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ impl SinkDesc {
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties.into_iter().collect(),
properties: self.properties,
secret_refs: secret_ref,
sink_type: self.sink_type,
format_desc: self.format_desc,
connection_id,
Expand All @@ -110,7 +111,6 @@ impl SinkDesc {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: self.create_type,
secret_ref,
}
}

Expand All @@ -134,6 +134,7 @@ impl SinkDesc {
sink_from_name: self.sink_from_name.clone(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
secret_refs: Default::default(),
}
}
}
7 changes: 4 additions & 3 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl SinkFormatDesc {
encode: encode.into(),
options,
key_encode,
secret_refs: Default::default(),
}
}
}
Expand Down Expand Up @@ -340,7 +341,7 @@ pub struct SinkCatalog {
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, PbSecretRef>,
pub secret_refs: BTreeMap<String, PbSecretRef>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -382,7 +383,7 @@ impl SinkCatalog {
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
secret_ref: self.secret_ref.clone(),
secret_refs: self.secret_refs.clone(),
}
}

Expand Down Expand Up @@ -476,7 +477,7 @@ impl From<PbSink> for SinkCatalog {
initialized_at_cluster_version: pb.initialized_at_cluster_version,
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
secret_ref: pb.secret_ref,
secret_refs: pb.secret_refs,
}
}
}
Expand Down
40 changes: 33 additions & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -489,15 +489,35 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
}
println!("table fragments migrated");

let mut object_dependencies = vec![];

// catalogs.
// source
if !sources.is_empty() {
let source_models: Vec<source::ActiveModel> = sources
.into_iter()
.map(|mut src| {
let mut dependent_secret_ids = HashSet::new();
if let Some(id) = src.connection_id.as_mut() {
*id = *connection_rewrite.get(id).unwrap();
}
for secret_ref in src.secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_ids.insert(secret_ref.secret_id);
}
if let Some(info) = &mut src.info {
for secret_ref in info.format_encode_secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_ids.insert(secret_ref.secret_id);
}
}
object_dependencies.extend(dependent_secret_ids.into_iter().map(|secret_id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(secret_id as _),
used_by: Set(src.id as _),
}
}));
src.into()
})
.collect();
Expand All @@ -507,8 +527,6 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
}
println!("sources migrated");

let mut object_dependencies = vec![];

// table
for table in tables {
let job_id = if table.table_type() == PbTableType::Internal {
Expand Down Expand Up @@ -554,13 +572,21 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
if let Some(id) = s.connection_id.as_mut() {
*id = *connection_rewrite.get(id).unwrap();
}
for secret_id in s.secret_ref.values_mut() {
secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap();
let mut dependent_secret_ids = HashSet::new();
for secret_ref in s.secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_ids.insert(secret_ref.secret_id);
}
if let Some(desc) = &mut s.format_desc {
for secret_ref in desc.secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_ids.insert(secret_ref.secret_id);
}
}
object_dependencies.extend(s.secret_ref.values().map(|id| {
object_dependencies.extend(dependent_secret_ids.into_iter().map(|secret_id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(id.secret_id as _),
oid: Set(secret_id as _),
used_by: Set(s.id as _),
}
}));
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl SourceCatalog {
version: self.version,
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
secret_refs: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl ToBatchPb for BatchIcebergScan {
.collect(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
split: vec![],
secret_refs: Default::default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl ToBatchPb for BatchKafkaScan {
.collect(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
split: vec![],
secret_refs: Default::default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl ToBatchPb for BatchSource {
.collect(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
split: vec![],
secret_refs: Default::default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl StreamNode for StreamFsFetch {
.collect_vec(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
secret_refs: Default::default(),
});
NodeBody::StreamFsFetch(StreamFsFetchNode {
node_inner: source_inner,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl StreamNode for StreamSource {
.collect_vec(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
secret_refs: Default::default(),
});
PbNodeBody::Source(SourceNode { source_inner })
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl StreamSourceScan {
.collect_vec(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
secret_refs: Default::default(),
};

let fields = self.schema().to_prost();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl From<PbSink> for ActiveModel {
sink_from_name: Set(pb_sink.sink_from_name),
sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
target_table: Set(pb_sink.target_table.map(|x| x as _)),
secret_ref: Set(Some(SecretRef::from(pb_sink.secret_ref))),
secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
}
}
}
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl From<PbSource> for ActiveModel {
optional_associated_table_id: Set(optional_associated_table_id),
connection_id: Set(source.connection_id.map(|id| id as _)),
version: Set(source.version as _),
secret_ref: Set(None),
secret_ref: Set(Some(SecretRef::from(source.secret_refs))),
}
}
}
7 changes: 6 additions & 1 deletion src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl From<ObjectModel<table::Model>> for PbTable {

impl From<ObjectModel<source::Model>> for PbSource {
fn from(value: ObjectModel<source::Model>) -> Self {
let mut secret_ref_map = BTreeMap::new();
if let Some(secret_ref) = value.0.secret_ref {
secret_ref_map = secret_ref.to_protobuf();
}
Self {
id: value.0.source_id as _,
schema_id: value.1.schema_id.unwrap() as _,
Expand Down Expand Up @@ -195,6 +199,7 @@ impl From<ObjectModel<source::Model>> for PbSource {
.map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
secret_refs: secret_ref_map,
}
}
}
Expand Down Expand Up @@ -234,7 +239,7 @@ impl From<ObjectModel<sink::Model>> for PbSink {
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
create_type: PbCreateType::Foreground as _,
secret_ref: secret_ref_map,
secret_refs: secret_ref_map,
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,17 @@ impl CatalogController {
}
}

// get dependent secret ref.
let dependent_secret_refs = streaming_job.dependent_secret_refs()?;

let dependent_objs = dependent_relations
.iter()
.chain(dependent_secret_refs.iter());
// record object dependency.
if !dependent_relations.is_empty() {
ObjectDependency::insert_many(dependent_relations.into_iter().map(|id| {
if !dependent_secret_refs.is_empty() || !dependent_relations.is_empty() {
ObjectDependency::insert_many(dependent_objs.map(|id| {
object_dependency::ActiveModel {
oid: Set(id as _),
oid: Set(*id as _),
used_by: Set(streaming_job.id() as _),
..Default::default()
}
Expand Down
Loading

0 comments on commit 2dfad37

Please sign in to comment.