Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Aug 22, 2024
1 parent c0192d5 commit 69a688a
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 23 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl PlainParser {
.generate_accessor(data)
.await?;

return match parse_schema_change(&accessor, &self.source_ctx.connector_props) {
return match parse_schema_change(
&accessor,
self.source_ctx.source_id.into(),
&self.source_ctx.connector_props,
) {
Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
Err(err) => Err(err)?,
};
Expand Down
15 changes: 4 additions & 11 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ const BEFORE: &str = "before";
const AFTER: &str = "after";

const UPSTREAM_DDL: &str = "ddl";
const CDC_SOURCE_NAME_PREFIX: &str = "RW_CDC_";
const SOURCE: &str = "source";
const SOURCE_TS_MS: &str = "ts_ms";
const SOURCE_DB: &str = "db";
Expand Down Expand Up @@ -156,8 +155,12 @@ macro_rules! jsonb_access_field {
};
}

/// Parse the schema change message from Debezium.
/// The layout of MySQL schema change message can refer to
/// https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic
pub fn parse_schema_change(
accessor: &impl Access,
source_id: u32,
connector_props: &ConnectorProperties,
) -> AccessResult<SchemaChangeEnvelope> {
let mut schema_changes = vec![];
Expand All @@ -169,16 +172,6 @@ pub fn parse_schema_change(
.as_utf8()
.to_string();

let source_id = if let Some(ScalarRefImpl::Jsonb(source_field)) =
accessor.access(&[SOURCE], &DataType::Jsonb)?.to_datum_ref()
{
let name: String = jsonb_access_field!(source_field, "name", string);
let id = name.strip_prefix(CDC_SOURCE_NAME_PREFIX).unwrap_or("0");
id.parse::<u32>().unwrap_or_default()
} else {
0
};

if let Some(ScalarRefImpl::List(table_changes)) = accessor
.access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
.to_datum_ref()
Expand Down
1 change: 0 additions & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ prometheus = "0.13"
prometheus-http-query = "0.8"
prost = { workspace = true }
rand = { workspace = true }
regex = "1.10"
risingwave_backup = { workspace = true }
risingwave_common = { workspace = true }
risingwave_common_heap_profiling = { workspace = true }
Expand Down
9 changes: 0 additions & 9 deletions src/tests/simulation/tests/integration_tests/sink/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,12 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> {
Ok(())
}

fn init_logger() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.try_init();
}

#[tokio::test]
async fn test_sink_recovery() -> Result<()> {
init_logger();
recovery_test_inner(false).await
}

#[tokio::test]
async fn test_sink_decouple_recovery() -> Result<()> {
init_logger();
recovery_test_inner(true).await
}

0 comments on commit 69a688a

Please sign in to comment.