Skip to content

Commit

Permalink
fix metadata extract
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Jun 3, 2024
1 parent 7cc8363 commit 417cd31
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
17 changes: 13 additions & 4 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::parser::maxwell::MaxwellParser;
use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
use crate::parser::util::{
extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
extreact_timestamp_from_meta,
};
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::schema::InvalidOptionError;
Expand Down Expand Up @@ -406,10 +407,18 @@ impl SourceStreamChunkRowWriter<'_> {
| &Some(ref col @ AdditionalColumnType::TableName(_))
| &Some(ref col @ AdditionalColumnType::Timestamp(_)),
) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(
extract_cdc_meta_column(row_meta.meta, col, desc.name.as_str())?
.unwrap_or(None),
)),
Some(row_meta) => {
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta {
Ok(A::output_for(
extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())?
.unwrap_or(None),
))
} else {
Ok(A::output_for(
extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None),
))
}
}
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
Expand Down
17 changes: 9 additions & 8 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use risingwave_pb::plan_common::additional_column;
use risingwave_pb::plan_common::additional_column::ColumnType;

use crate::parser::{AccessError, AccessResult};
use crate::source::cdc::DebeziumCdcMeta;

/// get kafka topic name
pub(super) fn get_kafka_topic(props: &HashMap<String, String>) -> ConnectorResult<&String> {
Expand Down Expand Up @@ -132,18 +133,18 @@ pub(super) async fn bytes_from_url(
}
}

pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<Datum> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
_ => None,
}
}

pub fn extract_cdc_meta_column(
meta: &SourceMeta,
cdc_meta: &DebeziumCdcMeta,
column_type: &additional_column::ColumnType,
column_name: &str,
) -> AccessResult<Option<Datum>> {
assert_matches!(meta, &SourceMeta::DebeziumCdc(_));

let cdc_meta = match meta {
SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta,
_ => unreachable!(),
};

match column_type {
ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
Expand Down

0 comments on commit 417cd31

Please sign in to comment.