Skip to content

Commit

Permalink
feat(catalog): add support for redacted_definition
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 26, 2023
1 parent 4f354b2 commit a07b7b8
Show file tree
Hide file tree
Showing 32 changed files with 427 additions and 13 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ members = [
"src/utils/sync-point",
"src/utils/variables",
"src/utils/with_options",
"src/utils/redaction/redaction",
"src/utils/redaction/redaction_derive",
"src/utils/workspace-config",
"src/workspace-hack",
]
Expand Down
4 changes: 4 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ message Source {

optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;
optional string redacted_definition = 17;

// Per-source catalog version, used by schema change.
uint64 version = 100;
Expand Down Expand Up @@ -149,6 +150,7 @@ message Sink {

// Target table id (only applicable for table sink)
optional uint32 target_table = 21;
optional string redacted_definition = 22;
}

message Connection {
Expand Down Expand Up @@ -295,6 +297,8 @@ message Table {
// This field is used to mark the the sink into this table.
repeated uint32 incoming_sinks = 34;

optional string redacted_definition = 35;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ itertools = "0.12"
jni = { version = "0.21.1", features = ["invocation"] }
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
mark_redaction = { path = "../utils/redaction/redaction" }
mark_redaction_derive = { path = "../utils/redaction/redaction_derive" }
moka = { version = "0.12", features = ["future"] }
mysql_async = { version = "0.33", default-features = false, features = [
"default",
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl SinkDesc {
db_name: self.db_name,
sink_from_name: self.sink_from_name,
target_table: self.target_table,
// TODO #14115: redact_statement is inaccessible because it's defined in risingwave_frontend.
redacted_definition: None,
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ pub struct SinkCatalog {
pub sink_from_name: String,

pub target_table: Option<TableId>,

pub redacted_definition: Option<String>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -339,6 +341,7 @@ impl SinkCatalog {
sink_from_name: self.sink_from_name.clone(),
stream_job_status: PbStreamJobStatus::Creating.into(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
redacted_definition: self.redacted_definition.clone(),
}
}

Expand Down Expand Up @@ -428,6 +431,7 @@ impl From<PbSink> for SinkCatalog {
db_name: pb.db_name,
sink_from_name: pb.sink_from_name,
target_table: pb.target_table.map(TableId::new),
redacted_definition: pb.redacted_definition,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::marker::PhantomData;

pub use enumerator::*;
use itertools::Itertools;
use mark_redaction_derive::MarkRedaction;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_pb::catalog::PbSource;
use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
Expand Down Expand Up @@ -68,7 +69,7 @@ impl CdcSourceType {
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, MarkRedaction)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
pub properties: HashMap<String, String>,
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod split;
use std::collections::HashMap;

pub use enumerator::*;
use mark_redaction_derive::MarkRedaction;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
pub use source::*;
Expand All @@ -29,7 +30,7 @@ use crate::source::SourceProperties;
pub const DATAGEN_CONNECTOR: &str = "datagen";

#[serde_as]
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, MarkRedaction)]
pub struct DatagenProperties {
/// split_num means data source partition
#[serde(rename = "datagen.split.num")]
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use serde::Deserialize;
pub mod opendal_enumerator;
pub mod opendal_reader;

use mark_redaction_derive::MarkRedaction;

use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::{OpendalFsSplit, S3Properties};
Expand All @@ -30,7 +32,7 @@ pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, MarkRedaction)]
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,
Expand Down Expand Up @@ -78,7 +80,7 @@ impl OpendalSource for OpendalGcs {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, MarkRedaction)]
pub struct OpendalS3Properties {
pub s3_properties: S3Properties,
#[serde(rename = "s3.assume_role", default)]
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod enumerator;

pub use enumerator::S3SplitEnumerator;
mod source;
use mark_redaction_derive::MarkRedaction;
use serde::Deserialize;
pub use source::S3FileReader;

Expand All @@ -24,7 +25,7 @@ use crate::source::SourceProperties;

pub const S3_CONNECTOR: &str = "s3";

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, MarkRedaction)]
pub struct S3Properties {
#[serde(rename = "s3.region_name")]
pub region_name: String,
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/google_pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod source;
pub mod split;

pub use enumerator::*;
use mark_redaction_derive::MarkRedaction;
use serde_with::{serde_as, DisplayFromStr};
pub use source::*;
pub use split::*;
Expand All @@ -29,7 +30,7 @@ use crate::source::SourceProperties;
pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub";

#[serde_as]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Hash, WithOptions)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Hash, WithOptions, MarkRedaction)]
pub struct PubsubProperties {
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "pubsub.split_count")]
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod split;
pub mod stats;

pub use enumerator::*;
use mark_redaction_derive::MarkRedaction;
pub use private_link::*;
pub use source::*;
pub use split::*;
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct RdKafkaPropertiesConsumer {
pub enable_auto_commit: Option<bool>,
}

#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)]
pub struct KafkaProperties {
/// This parameter is not intended to be exposed to users.
/// This parameter specifies only for one parallelism. The parallelism of kafka source
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod enumerator;
pub mod source;
pub mod split;

use mark_redaction_derive::MarkRedaction;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;
Expand All @@ -29,7 +30,7 @@ use crate::source::SourceProperties;
pub const KINESIS_CONNECTOR: &str = "kinesis";

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)]
pub struct KinesisProperties {
#[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")]
// accepted values: "latest", "earliest", "timestamp"
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod enumerator;
pub mod source;
pub mod split;

use mark_redaction_derive::MarkRedaction;
use serde::Deserialize;
use with_options::WithOptions;

Expand All @@ -26,7 +27,7 @@ use crate::source::SourceProperties;

pub const NATS_CONNECTOR: &str = "nats";

#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)]
pub struct NatsProperties {
#[serde(flatten)]
pub common: NatsCommon,
Expand Down
12 changes: 10 additions & 2 deletions src/connector/src/source/nexmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ pub mod enumerator;
pub mod source;
pub mod split;

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

pub use enumerator::*;
use mark_redaction::MarkRedaction;
use mark_redaction_derive::MarkRedaction;
use nexmark::config::{NexmarkConfig, RateShape};
use nexmark::event::EventType;
use serde::Deserialize;
Expand All @@ -45,8 +47,14 @@ const fn none<T>() -> Option<T> {

pub type NexmarkProperties = Box<NexmarkPropertiesInner>;

impl MarkRedaction for NexmarkProperties {
fn marks() -> HashSet<String> {
NexmarkPropertiesInner::marks()
}
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)]
pub struct NexmarkPropertiesInner {
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod split;
pub mod topic;

pub use enumerator::*;
use mark_redaction_derive::MarkRedaction;
use serde::Deserialize;
use serde_with::serde_as;
pub use split::*;
Expand All @@ -37,7 +38,7 @@ impl SourceProperties for PulsarProperties {
const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
}

#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Deserialize, WithOptions, MarkRedaction)]
#[serde_as]
pub struct PulsarProperties {
#[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/test_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::{Arc, OnceLock};

use anyhow::anyhow;
use async_trait::async_trait;
use mark_redaction_derive::MarkRedaction;
use parking_lot::Mutex;
use risingwave_common::types::JsonbVal;
use serde_derive::{Deserialize, Serialize};
Expand Down Expand Up @@ -114,7 +115,7 @@ pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {

pub const TEST_CONNECTOR: &str = "test";

#[derive(Clone, Debug)]
#[derive(Clone, Debug, MarkRedaction)]
pub struct TestSourceProperties {
properties: HashMap<String, String>,
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ futures-async-stream = { workspace = true }
iana-time-zone = "0.1"
itertools = "0.12"
maplit = "1"
mark_redaction = { path = "../utils/redaction/redaction" }
mark_redaction_derive = { path = "../utils/redaction/redaction_derive" }
md5 = "0.7.0"
num-integer = "0.1"
parking_lot = "0.12"
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct SourceCatalog {
pub watermark_descs: Vec<WatermarkDesc>,
pub associated_table_id: Option<TableId>,
pub definition: String,
pub redacted_definition: Option<String>,
pub connection_id: Option<ConnectionId>,
pub created_at_epoch: Option<Epoch>,
pub initialized_at_epoch: Option<Epoch>,
Expand All @@ -65,6 +66,7 @@ impl SourceCatalog {
info: Some(self.info.clone()),
watermark_descs: self.watermark_descs.clone(),
definition: self.definition.clone(),
redacted_definition: self.redacted_definition.clone(),
connection_id: self.connection_id,
initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0),
created_at_epoch: self.created_at_epoch.map(|x| x.0),
Expand Down Expand Up @@ -123,6 +125,7 @@ impl From<&PbSource> for SourceCatalog {
watermark_descs,
associated_table_id: associated_table_id.map(|x| x.into()),
definition: prost.definition.clone(),
redacted_definition: prost.redacted_definition.clone(),
connection_id,
created_at_epoch: prost.created_at_epoch.map(Epoch::from),
initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub static RW_SOURCES_COLUMNS: LazyLock<Vec<SystemCatalogColumnsDef<'_>>> = Lazy
(DataType::Boolean, "append_only"),
(DataType::Int32, "connection_id"),
(DataType::Varchar, "definition"),
(DataType::Varchar, "redacted_definition"),
(DataType::Varchar, "acl"),
(DataType::Timestamptz, "initialized_at"),
(DataType::Timestamptz, "created_at"),
Expand Down Expand Up @@ -100,6 +101,10 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Bool(source.append_only)),
source.connection_id.map(|id| ScalarImpl::Int32(id as i32)),
Some(ScalarImpl::Utf8(source.create_sql().into())),
source
.redacted_definition
.clone()
.map(|d| ScalarImpl::Utf8(d.into())),
Some(
get_acl_items(
&Object::SourceId(source.id),
Expand Down
Loading

0 comments on commit a07b7b8

Please sign in to comment.