Skip to content

Commit

Permalink
feat: add new option ignore_option for debeizum format (#15304) (#1…
Browse files Browse the repository at this point in the history
…5460)

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Mar 7, 2024
1 parent ac93d43 commit 5809732
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 14 deletions.
20 changes: 20 additions & 0 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,18 @@ create table source_with_rdkafka_props (v1 int, v2 varchar) with (
properties.queued.max.messages.kbytes = 65536
) FORMAT PLAIN ENCODE JSON

statement ok
CREATE TABLE debezium_ignore_key (
order_id int primary key
) with (
connector = 'kafka',
topic = 'debezium_mess_key',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest')
FORMAT DEBEZIUM ENCODE JSON (
ignore_key = 'true'
)

statement ok
flush;

Expand Down Expand Up @@ -799,6 +811,11 @@ select * from s27
----
(9.5,7,12) 12.5 1 An ice sculpture {cold,ice}

query I
select count(order_id) from debezium_ignore_key
----
3

statement ok
drop materialized view source_mv1

Expand Down Expand Up @@ -888,3 +905,6 @@ drop table dbz_ignore_case_json;

statement ok
drop table source_with_rdkafka_props;

statement ok
drop table debezium_ignore_key;
4 changes: 4 additions & 0 deletions scripts/source/test_data/debezium_mess_key.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
****^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","price":null,"product_id":1,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424096,"transaction":null}
****^{"before":null,"after":{"order_id":2,"order_date":1558430840001,"customer_name":"Alice","price":null,"product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424102,"transaction":null}
****^{"before":null,"after":{"order_id":3,"order_date":1558430840002,"customer_name":"Alice","price":null,"product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"last","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424103,"transaction":null}
****^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","price":null,"product_id":3,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029203809,"snapshot":"false","db":"mydb","sequence":"[null,\"23911952\"]","schema":"public","table":"orders","txId":491,"lsn":23911952,"xmin":null},"op":"u","ts_ms":1686029204058,"transaction":null}
43 changes: 38 additions & 5 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};

Expand All @@ -33,6 +35,27 @@ pub struct DebeziumParser {
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,

props: DebeziumProps,
}

pub const DEBEZIUM_IGNORE_KEY: &str = "ignore_key";

#[derive(Debug, Clone, Default)]
pub struct DebeziumProps {
// Ignore the key part of the message.
// If enabled, we don't take the key part into message accessor.
pub ignore_key: bool,
}

impl DebeziumProps {
pub fn from(props: &BTreeMap<String, String>) -> Self {
let ignore_key = props
.get(DEBEZIUM_IGNORE_KEY)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
Self { ignore_key }
}
}

async fn build_accessor_builder(
Expand Down Expand Up @@ -68,11 +91,20 @@ impl DebeziumParser {
let key_builder = build_accessor_builder(key_config, key_type).await?;
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
props
} else {
unreachable!(
"expecting Debezium protocol properties but got {:?}",
props.protocol_config
)
};
Ok(Self {
key_builder,
payload_builder,
rw_columns,
source_ctx,
props: debezium_props,
})
}

Expand All @@ -82,7 +114,7 @@ impl DebeziumParser {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Self::new(props, rw_columns, Default::default()).await
}
Expand All @@ -94,9 +126,10 @@ impl DebeziumParser {
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<ParseResult> {
// tombetone messages are handled implicitly by these accessors
let key_accessor = match key {
None => None,
Some(data) => Some(self.key_builder.generate_accessor(data).await?),
let key_accessor = match (key, self.props.ignore_key) {
(None, false) => None,
(Some(data), false) => Some(self.key_builder.generate_accessor(data).await?),
(_, true) => None,
};
let payload_accessor = match payload {
None => None,
Expand Down Expand Up @@ -186,7 +219,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ mod tests {
use serde_json::Value;

use crate::parser::{
DebeziumParser, EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder, SpecificParserConfig,
DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceContextRef;
fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
Expand All @@ -94,7 +94,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
DebeziumParser::new(props, rw_columns, source_ctx)
.await
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ mod unified;
mod upsert_parser;
mod util;

pub use debezium::DEBEZIUM_IGNORE_KEY;

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
pub struct SourceStreamChunkBuilder {
descs: Vec<SourceColumnDesc>,
Expand Down Expand Up @@ -861,7 +863,7 @@ impl ByteStreamSourceParserImpl {
PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
Ok(Self::Plain(parser))
}
(ProtocolProperties::Debezium, _) => {
(ProtocolProperties::Debezium(_), _) => {
let parser =
DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
Ok(Self::Debezium(parser))
Expand Down Expand Up @@ -967,7 +969,7 @@ pub enum EncodingProperties {

#[derive(Debug, Default, Clone)]
pub enum ProtocolProperties {
Debezium,
Debezium(DebeziumProps),
DebeziumMongo,
Maxwell,
Canal,
Expand All @@ -988,7 +990,10 @@ impl SpecificParserConfig {
// in the future
let protocol_config = match format {
SourceFormat::Native => ProtocolProperties::Native,
SourceFormat::Debezium => ProtocolProperties::Debezium,
SourceFormat::Debezium => {
let debezium_props = DebeziumProps::from(&info.format_encode_options);
ProtocolProperties::Debezium(debezium_props)
}
SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
SourceFormat::Maxwell => ProtocolProperties::Maxwell,
SourceFormat::Canal => ProtocolProperties::Canal,
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_connector::parser::additional_columns::{
};
use risingwave_connector::parser::{
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
SpecificParserConfig,
SpecificParserConfig, DEBEZIUM_IGNORE_KEY,
};
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
Expand Down Expand Up @@ -314,6 +314,10 @@ pub(crate) async fn bind_columns_from_source(
..Default::default()
};

if source_schema.format == Format::Debezium {
try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
}

let columns = match (&source_schema.format, &source_schema.row_encode) {
(Format::Native, Encode::Native)
| (Format::Plain, Encode::Bytes)
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::parser::{
DebeziumParser, EncodingProperties, JsonProperties, ProtocolProperties,
DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::cdc::external::CdcOffset;
Expand Down Expand Up @@ -510,7 +510,8 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
// the cdc message is generated internally so the key must exist.
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
let mut parser = DebeziumParser::new(
props,
Expand Down

0 comments on commit 5809732

Please sign in to comment.