From 7d01beece4864954c8325818908acddf90f237d1 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 25 Dec 2023 15:01:27 +0800 Subject: [PATCH] feat(catalog): add support for redacted_definition --- Cargo.lock | 22 ++ Cargo.toml | 2 + proto/catalog.proto | 4 + src/connector/Cargo.toml | 2 + src/connector/src/sink/catalog/desc.rs | 2 + src/connector/src/sink/catalog/mod.rs | 4 + src/connector/src/source/cdc/mod.rs | 3 +- src/connector/src/source/datagen/mod.rs | 3 +- .../source/filesystem/opendal_source/mod.rs | 6 +- src/connector/src/source/filesystem/s3/mod.rs | 3 +- src/connector/src/source/google_pubsub/mod.rs | 3 +- src/connector/src/source/kafka/mod.rs | 3 +- src/connector/src/source/kinesis/mod.rs | 3 +- src/connector/src/source/nats/mod.rs | 3 +- src/connector/src/source/nexmark/mod.rs | 12 +- src/connector/src/source/pulsar/mod.rs | 3 +- src/connector/src/source/test_source.rs | 3 +- src/frontend/Cargo.toml | 2 + src/frontend/src/catalog/source_catalog.rs | 3 + src/frontend/src/catalog/table_catalog.rs | 6 + src/frontend/src/handler/create_source.rs | 2 + src/frontend/src/handler/create_table.rs | 1 + src/frontend/src/handler/mod.rs | 99 ++++++ .../optimizer/plan_node/stream_materialize.rs | 2 + src/frontend/src/optimizer/plan_node/utils.rs | 1 + .../src/scheduler/distributed/query.rs | 1 + src/meta/src/controller/mod.rs | 6 + .../src/delete_range_runner.rs | 1 + src/utils/redaction/redaction/Cargo.toml | 24 ++ src/utils/redaction/redaction/src/lib.rs | 18 + .../redaction/redaction_derive/Cargo.toml | 31 ++ .../redaction/redaction_derive/src/lib.rs | 326 ++++++++++++++++++ 32 files changed, 591 insertions(+), 13 deletions(-) create mode 100644 src/utils/redaction/redaction/Cargo.toml create mode 100644 src/utils/redaction/redaction/src/lib.rs create mode 100644 src/utils/redaction/redaction_derive/Cargo.toml create mode 100644 src/utils/redaction/redaction_derive/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0c8b66655c0ec..1ffbf3d9a711a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5536,6 +5536,24 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "mark_redaction" +version = "1.5.0-alpha" +dependencies = [ + "workspace-hack", +] + +[[package]] +name = "mark_redaction_derive" +version = "1.5.0-alpha" +dependencies = [ + "mark_redaction", + "proc-macro2", + "quote", + "syn 1.0.109", + "workspace-hack", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -8461,6 +8479,8 @@ dependencies = [ "madsim-rdkafka", "madsim-tokio", "maplit", + "mark_redaction", + "mark_redaction_derive", "moka", "mysql_async", "mysql_common", @@ -8695,6 +8715,8 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "maplit", + "mark_redaction", + "mark_redaction_derive", "md5", "num-integer", "parking_lot 0.12.1", diff --git a/Cargo.toml b/Cargo.toml index 80d6d6092fb49..5e23acfb9cb17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,8 @@ members = [ "src/utils/sync-point", "src/utils/variables", "src/utils/with_options", + "src/utils/redaction/redaction", + "src/utils/redaction/redaction_derive", "src/utils/workspace-config", "src/workspace-hack", ] diff --git a/proto/catalog.proto b/proto/catalog.proto index 33d56224976ea..3f8271df982b9 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -100,6 +100,7 @@ message Source { optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; + optional string redacted_definition = 17; // Per-source catalog version, used by schema change. uint64 version = 100; @@ -149,6 +150,7 @@ message Sink { // Target table id (only applicable for table sink) optional uint32 target_table = 21; + optional string redacted_definition = 22; } message Connection { @@ -295,6 +297,8 @@ message Table { // This field is used to mark the the sink into this table. repeated uint32 incoming_sinks = 34; + optional string redacted_definition = 35; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 92aa632201409..bd6aeb4b03f51 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -70,6 +70,8 @@ itertools = "0.12" jni = { version = "0.21.1", features = ["invocation"] } jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" +mark_redaction = { path = "../utils/redaction/redaction" } +mark_redaction_derive = { path = "../utils/redaction/redaction_derive" } moka = { version = "0.12", features = ["future"] } mysql_async = { version = "0.33", default-features = false, features = [ "default", diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 3fa82e36f976b..cb199a32d0ccf 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -99,6 +99,8 @@ impl SinkDesc { db_name: self.db_name, sink_from_name: self.sink_from_name, target_table: self.target_table, + // TODO #14115: redact_statement is inaccessible because it's defined in risingwave_frontend. + redacted_definition: None, } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 8ced168ce919d..e18583eef1a14 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -301,6 +301,8 @@ pub struct SinkCatalog { pub sink_from_name: String, pub target_table: Option, + + pub redacted_definition: Option, } impl SinkCatalog { @@ -339,6 +341,7 @@ impl SinkCatalog { sink_from_name: self.sink_from_name.clone(), stream_job_status: PbStreamJobStatus::Creating.into(), target_table: self.target_table.map(|table_id| table_id.table_id()), + redacted_definition: self.redacted_definition.clone(), } } @@ -428,6 +431,7 @@ impl From for SinkCatalog { db_name: pb.db_name, sink_from_name: pb.sink_from_name, target_table: pb.target_table.map(TableId::new), + redacted_definition: pb.redacted_definition, } } } diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index fb9f898445621..918b350d6ba2a 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -20,6 +20,7 @@ use std::marker::PhantomData; pub use enumerator::*; use itertools::Itertools; +use mark_redaction_derive::MarkRedaction; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_pb::catalog::PbSource; use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema}; @@ -68,7 +69,7 @@ impl CdcSourceType { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, MarkRedaction)] pub struct CdcProperties { /// Properties specified in the WITH clause by user pub properties: HashMap, diff --git a/src/connector/src/source/datagen/mod.rs b/src/connector/src/source/datagen/mod.rs index af2dd2c388e92..573a25c9ca021 100644 --- a/src/connector/src/source/datagen/mod.rs +++ b/src/connector/src/source/datagen/mod.rs @@ -19,6 +19,7 @@ pub mod split; use std::collections::HashMap; pub use enumerator::*; +use mark_redaction_derive::MarkRedaction; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; pub use source::*; @@ -29,7 +30,7 @@ use crate::source::SourceProperties; pub const DATAGEN_CONNECTOR: &str = "datagen"; #[serde_as] -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, MarkRedaction)] pub struct DatagenProperties { /// split_num means data source partition #[serde(rename = "datagen.split.num")] diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index d72f94badb85c..54ba369ba26db 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -21,6 +21,8 @@ use serde::Deserialize; pub mod opendal_enumerator; pub mod opendal_reader; +use mark_redaction_derive::MarkRedaction; + use self::opendal_enumerator::OpendalEnumerator; use self::opendal_reader::OpendalReader; use super::{OpendalFsSplit, S3Properties}; @@ -30,7 +32,7 @@ pub const GCS_CONNECTOR: &str = "gcs"; // The new s3_v2 will use opendal. pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, PartialEq, MarkRedaction)] pub struct GcsProperties { #[serde(rename = "gcs.bucket_name")] pub bucket_name: String, @@ -78,7 +80,7 @@ impl OpendalSource for OpendalGcs { } } -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, PartialEq, MarkRedaction)] pub struct OpendalS3Properties { pub s3_properties: S3Properties, #[serde(rename = "s3.assume_role", default)] diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index c60f8994f76f1..f6ab8d132800c 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -15,6 +15,7 @@ pub mod enumerator; pub use enumerator::S3SplitEnumerator; mod source; +use mark_redaction_derive::MarkRedaction; use serde::Deserialize; pub use source::S3FileReader; @@ -24,7 +25,7 @@ use crate::source::SourceProperties; pub const S3_CONNECTOR: &str = "s3"; -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, PartialEq, MarkRedaction)] pub struct S3Properties { #[serde(rename = "s3.region_name")] pub region_name: String, diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index e4eee129c36bc..4ffd84ad65c8c 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -19,6 +19,7 @@ pub mod source; pub mod split; pub use enumerator::*; +use mark_redaction_derive::MarkRedaction; use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; @@ -29,7 +30,7 @@ use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; #[serde_as] -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Hash, WithOptions)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Hash, WithOptions, MarkRedaction)] pub struct PubsubProperties { #[serde_as(as = "DisplayFromStr")] #[serde(rename = "pubsub.split_count")] diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 34443e4dfc42c..b89675cb0137e 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -22,6 +22,7 @@ pub mod split; pub mod stats; pub use enumerator::*; +use mark_redaction_derive::MarkRedaction; pub use private_link::*; pub use source::*; pub use split::*; @@ -87,7 +88,7 @@ pub struct RdKafkaPropertiesConsumer { pub enable_auto_commit: Option, } -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)] pub struct KafkaProperties { /// This parameter is not intended to be exposed to users. /// This parameter specifies only for one parallelism. The parallelism of kafka source diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index ebb3184cc501b..c860e674a298f 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -16,6 +16,7 @@ pub mod enumerator; pub mod source; pub mod split; +use mark_redaction_derive::MarkRedaction; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; @@ -29,7 +30,7 @@ use crate::source::SourceProperties; pub const KINESIS_CONNECTOR: &str = "kinesis"; #[serde_as] -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)] pub struct KinesisProperties { #[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")] // accepted values: "latest", "earliest", "timestamp" diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index cbc935c17c4ee..5f712012c16a9 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -16,6 +16,7 @@ pub mod enumerator; pub mod source; pub mod split; +use mark_redaction_derive::MarkRedaction; use serde::Deserialize; use with_options::WithOptions; @@ -26,7 +27,7 @@ use crate::source::SourceProperties; pub const NATS_CONNECTOR: &str = "nats"; -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)] pub struct NatsProperties { #[serde(flatten)] pub common: NatsCommon, diff --git a/src/connector/src/source/nexmark/mod.rs b/src/connector/src/source/nexmark/mod.rs index 474eeaf6f0619..48c67eaa91996 100644 --- a/src/connector/src/source/nexmark/mod.rs +++ b/src/connector/src/source/nexmark/mod.rs @@ -16,9 +16,11 @@ pub mod enumerator; pub mod source; pub mod split; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; pub use enumerator::*; +use mark_redaction::MarkRedaction; +use mark_redaction_derive::MarkRedaction; use nexmark::config::{NexmarkConfig, RateShape}; use nexmark::event::EventType; use serde::Deserialize; @@ -45,8 +47,14 @@ const fn none() -> Option { pub type NexmarkProperties = Box; +impl MarkRedaction for NexmarkProperties { + fn marks() -> HashSet { + NexmarkPropertiesInner::marks() + } +} + #[serde_as] -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)] pub struct NexmarkPropertiesInner { #[serde_as(as = "DisplayFromStr")] #[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")] diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 4464e91b9e541..70e19f93430dc 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -18,6 +18,7 @@ pub mod split; pub mod topic; pub use enumerator::*; +use mark_redaction_derive::MarkRedaction; use serde::Deserialize; use serde_with::serde_as; pub use split::*; @@ -37,7 +38,7 @@ impl SourceProperties for PulsarProperties { const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; } -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)] #[serde_as] pub struct PulsarProperties { #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")] diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index 743ae3b179427..374c44a362cd6 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -17,6 +17,7 @@ use std::sync::{Arc, OnceLock}; use anyhow::anyhow; use async_trait::async_trait; +use mark_redaction_derive::MarkRedaction; use parking_lot::Mutex; use risingwave_common::types::JsonbVal; use serde_derive::{Deserialize, Serialize}; @@ -114,7 +115,7 @@ pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard { pub const TEST_CONNECTOR: &str = "test"; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, MarkRedaction)] pub struct TestSourceProperties { properties: HashMap, } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 73df1156cd035..bd4949fe0edd4 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -38,6 +38,8 @@ futures-async-stream = { workspace = true } iana-time-zone = "0.1" itertools = "0.12" maplit = "1" +mark_redaction = { path = "../utils/redaction/redaction" } +mark_redaction_derive = { path = "../utils/redaction/redaction_derive" } md5 = "0.7.0" num-integer = "0.1" parking_lot = "0.12" diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index c5fd8232cf0c1..1163d57908de6 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -39,6 +39,7 @@ pub struct SourceCatalog { pub watermark_descs: Vec, pub associated_table_id: Option, pub definition: String, + pub redacted_definition: Option, pub connection_id: Option, pub created_at_epoch: Option, pub initialized_at_epoch: Option, @@ -65,6 +66,7 @@ impl SourceCatalog { info: Some(self.info.clone()), watermark_descs: self.watermark_descs.clone(), definition: self.definition.clone(), + redacted_definition: self.redacted_definition.clone(), connection_id: self.connection_id, initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0), created_at_epoch: self.created_at_epoch.map(|x| x.0), @@ -123,6 +125,7 @@ impl From<&PbSource> for SourceCatalog { watermark_descs, associated_table_id: associated_table_id.map(|x| x.into()), definition: prost.definition.clone(), + redacted_definition: prost.redacted_definition.clone(), connection_id, created_at_epoch: prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 9622ea3cfcedd..49d276ded1362 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -157,6 +157,8 @@ pub struct TableCatalog { /// Incoming sinks, used for sink into table pub incoming_sinks: Vec, + + pub redacted_definition: Option, } // How the stream job was created will determine @@ -445,6 +447,7 @@ impl TableCatalog { create_type: self.create_type.to_prost().into(), description: self.description.clone(), incoming_sinks: self.incoming_sinks.clone(), + redacted_definition: self.redacted_definition.clone(), } } @@ -560,6 +563,7 @@ impl From for TableCatalog { create_type: CreateType::from_prost(create_type), description: tb.description, incoming_sinks: tb.incoming_sinks.clone(), + redacted_definition: tb.redacted_definition.clone(), } } } @@ -639,6 +643,7 @@ mod tests { initialized_at_epoch: None, value_indices: vec![0], definition: "".into(), + redacted_definition: None, read_prefix_len_hint: 0, vnode_col_index: None, row_id_index: None, @@ -704,6 +709,7 @@ mod tests { row_id_index: None, value_indices: vec![0], definition: "".into(), + redacted_definition: None, conflict_behavior: ConflictBehavior::NoCheck, read_prefix_len_hint: 0, version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))), diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 8a9ef0edfbc38..3cde04bbcbbbd 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1371,6 +1371,7 @@ pub async fn handle_create_source( let connection_id = resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?; let definition = handler_args.normalized_sql.clone(); + let redacted_definition = handler_args.redacted_sql.clone(); let source = PbSource { id: TableId::placeholder().table_id, @@ -1390,6 +1391,7 @@ pub async fn handle_create_source( created_at_epoch: None, optional_associated_table_id: None, version: INITIAL_SOURCE_VERSION_ID, + redacted_definition, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index f7843310e482d..c2e1b7462c297 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -675,6 +675,7 @@ fn gen_table_plan_inner( owner: session.user_id(), watermark_descs: watermark_descs.clone(), definition: "".to_string(), + redacted_definition: None, connection_id, initialized_at_epoch: None, created_at_epoch: None, diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 25aec90be772a..d2703bcc65bed 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -24,6 +24,7 @@ use pgwire::pg_server::BoxedError; use pgwire::types::{Format, Row}; use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_sqlparser::ast::*; use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt}; @@ -117,6 +118,7 @@ pub struct HandlerArgs { pub sql: Arc, pub normalized_sql: String, pub with_options: WithOptions, + pub redacted_sql: Option, } impl HandlerArgs { @@ -126,6 +128,7 @@ impl HandlerArgs { sql, with_options: WithOptions::try_from(stmt)?, normalized_sql: Self::normalize_sql(stmt), + redacted_sql: redact_statement(stmt).as_ref().map(Self::normalize_sql), }) } @@ -698,3 +701,99 @@ pub async fn handle( _ => bail_not_implemented!("Unhandled statement: {}", stmt), } } + +pub fn redact_sql(definition: &str) -> Option { + let Ok(ast) = risingwave_sqlparser::parser::Parser::parse_sql(definition) else { + tracing::error!("failed to parse relation definition"); + return None; + }; + use itertools::Itertools; + let stmt = ast + .into_iter() + .exactly_one() + .expect("should contains only one statement"); + redact_statement(&stmt) + .as_ref() + .map(HandlerArgs::normalize_sql) +} + +pub fn redact_statement(stmt: &Statement) -> Option { + use mark_redaction::MarkRedaction; + use risingwave_connector::match_source_name_str; + use risingwave_connector::source::SourceProperties; + + let mut stmt = stmt.clone(); + let sql_options = match &mut stmt { + Statement::CreateSource { stmt } => &mut stmt.with_properties.0, + Statement::CreateTable { with_options, .. } => with_options, + Statement::CreateSink { stmt } => &mut stmt.with_properties.0, + _ => { + return Some(stmt); + } + }; + let Ok(mut with_properties) = + WithOptions::try_from(sql_options.as_slice()).map(WithOptions::into_inner) + else { + return None; + }; + let Some(connector) = with_properties.remove(UPSTREAM_SOURCE_KEY) else { + tracing::error!("Must specify 'connector' in WITH clause"); + return None; + }; + fn redact_marked_field(options: &mut Vec, marks: std::collections::HashSet) { + for option in options { + if marks.contains(&option.name.real_value()) { + option.value = Value::SingleQuotedString("[REDACTED]".into()); + } + } + // TODO #14115: redact table name and column names. + } + match_source_name_str!( + connector.to_lowercase().as_str(), + PropType, + { + let marked = PropType::marks(); + redact_marked_field(sql_options, marked); + }, + |_| {} + ); + Some(stmt) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use itertools::Itertools; + use mark_redaction::MarkRedaction as MarkRedactionTrait; + use mark_redaction_derive::MarkRedaction; + + #[test] + fn test_redaction_options() { + #[derive(MarkRedaction)] + #[expect(dead_code)] + struct Bar { + i_a: i32, + #[mark_redaction] + i_b: i32, + } + + #[derive(MarkRedaction)] + #[expect(dead_code)] + struct Foo { + #[mark_redaction] + a: i32, + b: i32, + #[mark_redaction(rename = "c_rename", alias = "c_1", alias = "c_2")] + c: i32, + #[mark_redaction(alias = "d_1")] + d: i32, + #[mark_redaction(flatten)] + e: Bar, + } + + let marks: HashSet = Foo::marks(); + let expected_marks = ["a", "c_rename", "c_1", "c_2", "d", "d_1", "i_b"]; + assert!(marks.iter().sorted().eq(expected_marks.iter().sorted())); + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index efa059e50efd9..800fd747421ec 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -223,6 +223,7 @@ impl StreamMaterialize { // assert: `stream_key` is a subset of `table_pk` let read_prefix_len_hint = table_pk.len(); + let redacted_definition = crate::handler::redact_sql(&definition); Ok(TableCatalog { id: TableId::placeholder(), associated_source_id: None, @@ -254,6 +255,7 @@ impl StreamMaterialize { create_type: CreateType::Foreground, // Will be updated in the handler itself. description: None, incoming_sinks: vec![], + redacted_definition, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 1ae1a21a04442..4330351b7a807 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -169,6 +169,7 @@ impl TableCatalogBuilder { .value_indices .unwrap_or_else(|| (0..self.columns.len()).collect_vec()), definition: "".into(), + redacted_definition: None, conflict_behavior: ConflictBehavior::NoCheck, read_prefix_len_hint, version: None, // the internal table is not versioned and can't be schema changed diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 25ba6ebcec7c3..2e126f5bc9f7e 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -579,6 +579,7 @@ pub(crate) mod tests { row_id_index: None, value_indices: vec![0, 1, 2], definition: "".to_string(), + redacted_definition: None, conflict_behavior: ConflictBehavior::NoCheck, read_prefix_len_hint: 0, version: None, diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 6a8fdac04e5fe..07bf2e5eb2045 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -137,6 +137,8 @@ impl From> for PbTable { description: None, // TODO: fix it for model v2. incoming_sinks: vec![], + // TODO #14115 + redacted_definition: None, } } } @@ -156,6 +158,8 @@ impl From> for PbSource { info: value.0.source_info.map(|info| info.0), watermark_descs: value.0.watermark_descs.0, definition: value.0.definition, + // TODO #14115 + redacted_definition: None, connection_id: value.0.connection_id.map(|id| id as _), // todo: using the timestamp from the database directly. initialized_at_epoch: Some( @@ -202,6 +206,8 @@ impl From> for PbSink { format_desc: value.0.sink_format_desc.map(|desc| desc.0), // todo: fix this for model v2 target_table: None, + // TODO #14115 + redacted_definition: None, } } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index d2acd7c754c74..f7402eb7b3690 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -159,6 +159,7 @@ async fn compaction_test( create_type: PbCreateType::Foreground.into(), description: None, incoming_sinks: vec![], + redacted_definition: None, }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2; diff --git a/src/utils/redaction/redaction/Cargo.toml b/src/utils/redaction/redaction/Cargo.toml new file mode 100644 index 0000000000000..14756e3a43f1e --- /dev/null +++ b/src/utils/redaction/redaction/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "mark_redaction" +version.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +[lints] +workspace = true + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[target.'cfg(not(madsim))'.dependencies] +workspace-hack = { path = "../../../workspace-hack" } \ No newline at end of file diff --git a/src/utils/redaction/redaction/src/lib.rs b/src/utils/redaction/redaction/src/lib.rs new file mode 100644 index 0000000000000..3ef77b205157b --- /dev/null +++ b/src/utils/redaction/redaction/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub trait MarkRedaction { + /// Returns names of fields that need redaction. + fn marks() -> std::collections::HashSet; +} diff --git a/src/utils/redaction/redaction_derive/Cargo.toml b/src/utils/redaction/redaction_derive/Cargo.toml new file mode 100644 index 0000000000000..47a43282f8291 --- /dev/null +++ b/src/utils/redaction/redaction_derive/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "mark_redaction_derive" +version.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1.0" +quote = "1.0" +mark_redaction = { path="../redaction" } +syn = { version = "1.0", features = ["full"] } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[lints] +workspace = true + +[target.'cfg(not(madsim))'.dependencies] +workspace-hack = { path = "../../../workspace-hack" } \ No newline at end of file diff --git a/src/utils/redaction/redaction_derive/src/lib.rs b/src/utils/redaction/redaction_derive/src/lib.rs new file mode 100644 index 0000000000000..082c1a60ea7c4 --- /dev/null +++ b/src/utils/redaction/redaction_derive/src/lib.rs @@ -0,0 +1,326 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(let_chains)] + +use proc_macro::TokenStream; +use proc_macro2::TokenStream as TokenStream2; +use quote::{quote, ToTokens}; +use syn::{parse_macro_input, Data, DeriveInput, Fields, Meta, NestedMeta}; + +enum FieldRedactionAction { + NoAction, + Rename(String), + UseRustName, + Flatten, +} + +#[proc_macro_derive(MarkRedaction, attributes(mark_redaction))] +pub fn mark_redaction(input: TokenStream) -> TokenStream { + let input: DeriveInput = parse_macro_input!(input); + match mark_redaction_impl(input) { + Ok(token_stream) => token_stream.into(), + Err(e) => e.to_compile_error().into(), + } +} + +fn mark_redaction_impl(input: DeriveInput) -> Result { + let fields = match &input.data { + Data::Struct(data) => &data.fields, + _ => { + return Err(syn::Error::new_spanned( + &input, + "MarkRedaction can only annotate struct", + )); + } + }; + let Fields::Named(fields) = fields else { + return Err(syn::Error::new_spanned( + fields, + "MarkRedaction can only annotate struct with named fields", + )); + }; + let struct_name = input.ident; + let (impl_generics, type_generics, where_clause) = input.generics.split_for_impl(); + let mut marked_names = vec![]; + let mut field_types_to_expand = vec![]; + let helper_attr = "mark_redaction"; + let flatten_ident = "flatten"; + let alias_ident = "alias"; + let rename_ident = "rename"; + for field in &fields.named { + let mut action = FieldRedactionAction::NoAction; + let field_name = field + .ident + .clone() + .unwrap_or_else(|| panic!("named field must have name")) + .to_string(); + for attr in &field.attrs { + let meta = match attr.parse_meta() { + Ok(m) => m, + Err(e) => { + return Err(syn::Error::new_spanned( + attr, + format!("failed to parse attrs: {e}"), + )); + } + }; + match meta { + Meta::Path(path) => { + if !path.is_ident(helper_attr) { + continue; + } + // #[mark_redaction] + try_set_action(&mut action, FieldRedactionAction::UseRustName, &path)?; + } + Meta::List(list) => { + if !list.path.is_ident(helper_attr) { + continue; + } + // action may be override + try_set_action(&mut action, FieldRedactionAction::UseRustName, &list)?; + for nested_meta in list.nested { + let attr_meta = match nested_meta { + NestedMeta::Meta(m) => m, + NestedMeta::Lit(l) => { + return Err(syn::Error::new_spanned(&l, "unexpected attr literal")); + } + }; + let mut accepted = false; + match &attr_meta { + Meta::Path(path) => { + if path.is_ident(flatten_ident) { + // #[mark_redaction(flatten)] + try_set_action(&mut action, FieldRedactionAction::Flatten, &path)?; + field_types_to_expand.push(field.ty.clone()); + accepted = true; + } + } + Meta::NameValue(name_value) => { + if name_value.path.is_ident(alias_ident) { + if let syn::Lit::Str(l) = &name_value.lit { + // #[mark_redaction(alias="xxx")] + marked_names.push(l.value()); + accepted = true; + } + } else if name_value.path.is_ident(rename_ident) { + if let syn::Lit::Str(l) = &name_value.lit { + // #[mark_redaction(rename="xxx")] + try_set_action( + &mut action, + FieldRedactionAction::Rename(l.value()), + &name_value, + )?; + accepted = true; + } + } + } + Meta::List(_) => {} + } + if !accepted { + return Err(syn::Error::new_spanned( + &attr_meta, + "unexpected attr meta", + )); + } + } + } + Meta::NameValue(_) => {} + } + } + match action { + FieldRedactionAction::Rename(rename) => { + marked_names.push(rename); + } + FieldRedactionAction::UseRustName => { + marked_names.push(field_name); + } + _ => {} + } + } + + let output = quote! { + impl #impl_generics mark_redaction::MarkRedaction for #struct_name #type_generics #where_clause { + fn marks() -> std::collections::HashSet { + let mut ret: std::collections::HashSet = Default::default(); + let ident: Vec = vec![#(#marked_names.to_string(),)*]; + ret.extend(ident); + let expand: Vec> = vec![#(#field_types_to_expand::marks(),)*]; + ret.extend(expand.into_iter().flatten()); + ret + } + } + }; + + Ok(output) +} + +fn try_set_action( + current_action: &mut FieldRedactionAction, + new_action: FieldRedactionAction, + tokens: impl ToTokens, +) -> Result<(), syn::Error> { + let mut reject = false; + match current_action { + FieldRedactionAction::NoAction => { + *current_action = new_action; + } + FieldRedactionAction::Rename(_) => { + // rename is incompatible with all other actions. + reject = true; + } + FieldRedactionAction::UseRustName => { + if matches!(new_action, FieldRedactionAction::NoAction) { + // unset is not allowed + reject = true; + } else { + *current_action = new_action; + } + } + FieldRedactionAction::Flatten => { + // flatten is incompatible with all other actions. + reject = true; + } + } + if reject { + return Err(syn::Error::new_spanned( + tokens, + "incompatible redaction actions", + )); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use syn::{parse_quote, DeriveInput}; + + use crate::mark_redaction_impl; + + #[test] + fn test_basic() { + let input: DeriveInput = parse_quote!( + struct Foo {} + ); + mark_redaction_impl(input).unwrap(); + + let input: DeriveInput = parse_quote!( + struct Foo(i32); + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + enum Foo { + A, + B, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + union Foo { + a: i32, + b: i32, + } + ); + mark_redaction_impl(input).unwrap_err(); + } + + #[test] + fn test_attr() { + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction] + a: i32, + b: i32, + #[mark_redaction(rename = "c_rename", alias = "c_1", alias = "c_2")] + c: i32, + #[mark_redaction(alias = "d_1")] + d: i32, + #[mark_redaction(flatten)] + e: Bar, + #[mark_redaction(flatten)] + f: HashSet, + } + ); + mark_redaction_impl(input).unwrap(); + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction] + a: i32, + #[mark_redaction] + b: T, + #[mark_redaction(flatten)] + c: T, + } + ); + mark_redaction_impl(input).unwrap(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(rename = "c_rename", rename = "c_rename_2")] + a: i32, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(flatten, rename = "c_rename_2")] + a: Bar, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(flatten, flatten)] + a: Bar, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(non_sense)] + a: Bar, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(non_sense = "123")] + a: Bar, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(alias = 123)] + a: Bar, + } + ); + mark_redaction_impl(input).unwrap_err(); + + let input: DeriveInput = parse_quote!( + struct Foo { + #[mark_redaction(rename = 123)] + a: Bar, + } + ); + mark_redaction_impl(input).unwrap_err(); + } +}