Skip to content

Commit

Permalink
database_name and table_name meta column
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 31, 2024
1 parent e154a37 commit 8a89ddb
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 18 deletions.
12 changes: 12 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ message AdditionalColumnHeader {
data.DataType data_type = 2;
}


message AdditionalDatabaseName {}

message AdditionalTableName {}

message AdditionalCollectionName {}


// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

Expand All @@ -215,6 +223,10 @@ message AdditionalColumn {
AdditionalColumnHeader header_inner = 5;
AdditionalColumnFilename filename = 6;
AdditionalColumnHeaders headers = 7;
// metadata column for cdc table
AdditionalDatabaseName database_name = 8;
AdditionalTableName table_name = 9;
AdditionalCollectionName collection_name = 10;
}
}

Expand Down
53 changes: 49 additions & 4 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as PbDataType;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders,
AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition,
AdditionalColumnTimestamp,
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName,
AdditionalTableName,
};

use crate::error::ConnectorResult;
Expand Down Expand Up @@ -66,7 +67,14 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet

// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
LazyLock::new(|| Some(HashSet::from(["timestamp"])));
LazyLock::new(|| {
Some(HashSet::from([
"timestamp",
"database_name",
"table_name",
"collection_name",
]))
});

pub fn get_supported_additional_columns(
connector_name: &str,
Expand Down Expand Up @@ -201,6 +209,43 @@ pub fn build_additional_column_catalog(
is_hidden: false,
},
"header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
"database_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::DatabaseName(
AdditionalDatabaseName {},
)),
},
),
is_hidden: false,
},
"table_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
},
),
is_hidden: false,
},
"collection_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::CollectionName(
AdditionalCollectionName {},
)),
},
),
is_hidden: false,
},
_ => unreachable!(),
};

Expand Down
16 changes: 13 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::maxwell::MaxwellParser;
use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
use crate::parser::util::{
extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta,
extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
};
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::schema::InvalidOptionError;
Expand Down Expand Up @@ -400,12 +400,22 @@ impl SourceStreamChunkRowWriter<'_> {
.unwrap(), // handled all match cases in internal match, unwrap is safe
));
}
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
(
_,
&Some(ref col @ AdditionalColumnType::DatabaseName(_))
| &Some(ref col @ AdditionalColumnType::TableName(_))
| &Some(ref col @ AdditionalColumnType::Timestamp(_)),
) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(
extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None),
extract_cdc_meta_column(row_meta.meta, col, desc.name.as_str())?
.unwrap_or(None),
)),
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
// collection name for `mongodb-cdc` should be parsed from the message payload
parse_field(desc)
}
(_, &Some(AdditionalColumnType::Partition(_))) => {
// the meta info does not involve spec connector
return Ok(A::output_for(
Expand Down
20 changes: 20 additions & 0 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ pub struct DebeziumChangeEvent<A> {

const BEFORE: &str = "before";
const AFTER: &str = "after";

const SOURCE: &str = "source";
const SOURCE_TS_MS: &str = "ts_ms";
const SOURCE_DB: &str = "db";
const SOURCE_TABLE: &str = "table";
const SOURCE_COLLECTION: &str = "collection";

const OP: &str = "op";
pub const TRANSACTION_STATUS: &str = "status";
pub const TRANSACTION_ID: &str = "id";
Expand Down Expand Up @@ -210,6 +215,21 @@ where
.to_scalar_value()
}))
}
&ColumnType::DatabaseName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_DB], Some(&desc.data_type)),
&ColumnType::TableName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_TABLE], Some(&desc.data_type)),
&ColumnType::CollectionName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_COLLECTION], Some(&desc.data_type)),
_ => Err(AccessError::UnsupportedAdditionalColumn {
name: desc.name.clone(),
}),
Expand Down
29 changes: 24 additions & 5 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;

use anyhow::Context;
Expand Down Expand Up @@ -38,6 +39,10 @@ macro_rules! log_error {
};
}
pub(crate) use log_error;
use risingwave_pb::plan_common::additional_column;
use risingwave_pb::plan_common::additional_column::ColumnType;

use crate::parser::{AccessError, AccessResult};

/// get kafka topic name
pub(super) fn get_kafka_topic(props: &HashMap<String, String>) -> ConnectorResult<&String> {
Expand Down Expand Up @@ -127,11 +132,25 @@ pub(super) async fn bytes_from_url(
}
}

pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<Datum> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
SourceMeta::DebeziumCdc(debezium_meta) => debezium_meta.extract_timestamp(),
_ => None,
pub fn extract_cdc_meta_column(
meta: &SourceMeta,
column_type: &additional_column::ColumnType,
column_name: &str,
) -> AccessResult<Option<Datum>> {
assert_matches!(meta, &SourceMeta::DebeziumCdc(_));

let cdc_meta = match meta {
SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta,
_ => unreachable!(),
};

match column_type {
ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name()),
_ => Err(AccessError::UnsupportedAdditionalColumn {
name: column_name.to_string(),
}),
}
}

Expand Down
38 changes: 32 additions & 6 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Datum, Scalar, Timestamptz};
use risingwave_common::types::{Datum, Scalar, ScalarImpl, Timestamptz};
use risingwave_pb::connector_service::CdcMessage;

use crate::source::base::SourceMessage;
use crate::source::SourceMeta;

#[derive(Debug, Clone)]
pub struct DebeziumCdcMeta {
db_name_prefix_len: usize,

pub full_table_name: String,
// extracted from `payload.source.ts_ms`, the time that the change event was made in the database
pub source_ts_ms: i64,
Expand All @@ -36,6 +38,30 @@ impl DebeziumCdcMeta {
)
.into()
}

pub fn extract_database_name(&self) -> Option<Datum> {
Some(ScalarImpl::from(
self.full_table_name.as_str()[0..self.db_name_prefix_len].to_string(),
))
.into()
}

pub fn extract_table_name(&self) -> Option<Datum> {
Some(ScalarImpl::from(
self.full_table_name.as_str()[self.db_name_prefix_len..].to_string(),
))
.into()
}

pub fn new(full_table_name: String, source_ts_ms: i64, is_transaction_meta: bool) -> Self {
let db_name_prefix_len = full_table_name.as_str().find('.').unwrap_or(0);
Self {
db_name_prefix_len,
full_table_name,
source_ts_ms,
is_transaction_meta,
}
}
}

impl From<CdcMessage> for SourceMessage {
Expand All @@ -53,11 +79,11 @@ impl From<CdcMessage> for SourceMessage {
},
offset: message.offset,
split_id: message.partition.into(),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: message.full_table_name,
source_ts_ms: message.source_ts_ms,
is_transaction_meta: message.is_transaction_meta,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
message.full_table_name,
message.source_ts_ms,
message.is_transaction_meta,
)),
}
}
}
6 changes: 6 additions & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("plan_common.AdditionalColumnHeader", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalColumnHeaders", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalColumnOffset", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalDatabaseName", "#[derive(Eq, Hash)]")
.type_attribute("plan_common.AdditionalTableName", "#[derive(Eq, Hash)]")
.type_attribute(
"plan_common.AdditionalCollectionName",
"#[derive(Eq, Hash)]",
)
.type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]")
.type_attribute("common.OrderType", "#[derive(Eq, Hash)]")
.type_attribute("common.Buffer", "#[derive(Eq)]")
Expand Down

0 comments on commit 8a89ddb

Please sign in to comment.