Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): Add MongoDB CDC Source #14966

Merged
merged 27 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
support delete
  • Loading branch information
StrikeW committed Feb 3, 2024
commit 8e73db34aecab6156f135e382239ee55654aed7d
138 changes: 106 additions & 32 deletions src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@

use std::fmt::Debug;

use risingwave_common::error::ErrorCode::{self, ProtocolError};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use simd_json::prelude::MutableObject;
use simd_json::BorrowedValue;

use crate::only_parse_payload;
use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjection};
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::simd_json_parser::{DebeziumJsonAccessBuilder, DebeziumMongoJsonAccessBuilder};
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter};
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, JsonProperties, ParserFormat,
SourceStreamChunkRowWriter,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand All @@ -33,10 +33,30 @@ pub struct DebeziumMongoJsonParser {
id_column: SourceColumnDesc,
payload_column: SourceColumnDesc,
source_ctx: SourceContextRef,
key_builder: AccessBuilderImpl,
payload_builder: AccessBuilderImpl,
}

// key and payload in DEBEZIUM_MONGO format are accessed in different ways
async fn build_accessor_builder(config: EncodingProperties) -> Result<AccessBuilderImpl> {
match config {
EncodingProperties::Json(_) => Ok(AccessBuilderImpl::DebeziumJson(
DebeziumJsonAccessBuilder::new()?,
)),
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
EncodingProperties::MongoJson(_) => Ok(AccessBuilderImpl::DebeziumMongoJson(
DebeziumMongoJsonAccessBuilder::new()?,
)),
_ => Err(RwError::from(ProtocolError(
"unsupported encoding for DEBEZIUM_MONGO format".to_string(),
))),
}
}

impl DebeziumMongoJsonParser {
pub fn new(rw_columns: Vec<SourceColumnDesc>, source_ctx: SourceContextRef) -> Result<Self> {
pub async fn new(
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> Result<Self> {
let id_column = rw_columns
.iter()
.find(|desc| {
Expand Down Expand Up @@ -69,37 +89,54 @@ impl DebeziumMongoJsonParser {
)));
}

let key_builder =
build_accessor_builder(EncodingProperties::MongoJson(JsonProperties::default()))
.await?;
let payload_builder =
build_accessor_builder(EncodingProperties::MongoJson(JsonProperties::default()))
.await?;

Ok(Self {
rw_columns,
id_column,
payload_column,

source_ctx,
key_builder,
payload_builder,
})
}

#[allow(clippy::unused_async)]
pub async fn parse_inner(
&self,
mut payload: Vec<u8>,
&mut self,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<()> {
let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
let key_accessor = match key {
None => None,
Some(data) => Some(self.key_builder.generate_accessor(data).await?),
};
let payload_accessor = match payload {
None => None,
Some(data) => Some(self.payload_builder.generate_accessor(data).await?),
};

// let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload)
// .map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
//
// Event can be configured with and without the "payload" field present.
// See https://github.com/risingwavelabs/risingwave/issues/10178

let payload = if let Some(payload) = event.get_mut("payload") {
std::mem::take(payload)
} else {
event
};

let accessor = JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM);

let row_op = DebeziumChangeEvent::with_value(MongoProjection::new(accessor));
// let payload = if let Some(payload) = event.get_mut("payload") {
// std::mem::take(payload)
// } else {
// event
// };
//
// let payload_accessor = JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM);
// let row_op = DebeziumChangeEvent::with_value(MongoProjection::new(payload_accessor));

let row_op = DebeziumChangeEvent::new_mongodb_event(key_accessor, payload_accessor);
apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
}
}
Expand All @@ -119,11 +156,12 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser {

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<()> {
only_parse_payload!(self, payload, writer)
// only_parse_payload!(self, payload, writer)
self.parse_inner(key, payload, writer).await
}
}

Expand Down Expand Up @@ -160,13 +198,44 @@ mod tests {
let a = extract_bson_id(&DataType::Varchar, &pld).unwrap();
assert_eq!(a, Some(ScalarImpl::Utf8("5d505646cf6d4fe581014ab2".into())));
}
fn get_columns() -> Vec<SourceColumnDesc> {
let descs = vec![
SourceColumnDesc::simple("_id", DataType::Int64, ColumnId::from(0)),

#[tokio::test]
async fn test_parse_delete_message() {
let (key, payload) = (
// key
br#"{"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}"#.to_vec(),
// payload
br#"{"schema":null,"payload":{"before":null,"after":null,"updateDescription":null,"source":{"version":"2.4.2.Final","connector":"mongodb","name":"RW_CDC_3001","ts_ms":1706968217000,"snapshot":"false","db":"dev","sequence":null,"rs":"rs0","collection":"test","ord":2,"lsid":null,"txnNumber":null,"wallTime":null},"op":"d","ts_ms":1706968217377,"transaction":null}}"#.to_vec()
);

let columns = vec![
SourceColumnDesc::simple("_id", DataType::Varchar, ColumnId::from(0)),
SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
];
let mut parser = DebeziumMongoJsonParser::new(columns.clone(), Default::default())
.await
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3);

let writer = builder.row_writer();
parser
.parse_inner(Some(key), Some(payload), writer)
.await
.unwrap();
let chunk = builder.finish();
let mut rows = chunk.rows();

descs
let (op, row) = rows.next().unwrap();
assert_eq!(op, Op::Delete);
// oid
assert_eq!(
row.datum_at(0).to_owned_datum(),
(Some(ScalarImpl::Utf8("65bc9fb6c485f419a7a877fe".into())))
);

// payload should be null
assert_eq!(row.datum_at(1).to_owned_datum(), None);
}

#[tokio::test]
Expand All @@ -177,14 +246,19 @@ mod tests {
// data without payload and schema field
br#"{"before":null,"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}","patch":null,"filter":null,"updateDescription":null,"source":{"version":"2.1.4.Final","connector":"mongodb","name":"dbserver1","ts_ms":1681879044000,"snapshot":"last","db":"inventory","sequence":null,"rs":"rs0","collection":"customers","ord":1,"lsid":null,"txnNumber":null},"op":"r","ts_ms":1681879054736,"transaction":null}"#.to_vec()];

let columns = get_columns();
let columns = vec![
SourceColumnDesc::simple("_id", DataType::Int64, ColumnId::from(0)),
SourceColumnDesc::simple("payload", DataType::Jsonb, ColumnId::from(1)),
];
for data in input {
let parser = DebeziumMongoJsonParser::new(columns.clone(), Default::default()).unwrap();
let mut parser = DebeziumMongoJsonParser::new(columns.clone(), Default::default())
.await
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3);

let writer = builder.row_writer();
parser.parse_inner(data, writer).await.unwrap();
parser.parse_inner(None, Some(data), writer).await.unwrap();
let chunk = builder.finish();
let mut rows = chunk.rows();
let (op, row) = rows.next().unwrap();
Expand Down
32 changes: 32 additions & 0 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError};
use simd_json::prelude::MutableObject;
use simd_json::BorrowedValue;

use crate::parser::unified::debezium::MongoJsonAccess;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::AccessBuilder;
Expand Down Expand Up @@ -54,6 +55,37 @@ impl AccessBuilder for DebeziumJsonAccessBuilder {
}
}

#[derive(Debug)]
pub struct DebeziumMongoJsonAccessBuilder {
value: Option<Vec<u8>>,
}

impl DebeziumMongoJsonAccessBuilder {
pub fn new() -> Result<Self> {
Ok(Self { value: None })
}
}

impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> Result<AccessImpl<'_, '_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
.map_err(|e| RwError::from(ErrorCode::ProtocolError(e.to_string())))?;

let payload = if let Some(payload) = event.get_mut("payload") {
std::mem::take(payload)
} else {
event
};

Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM),
)))
}
}

#[cfg(test)]
mod tests {
use std::convert::TryInto;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use self::upsert_parser::UpsertParser;
use self::util::get_kafka_topic;
use crate::common::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
use crate::parser::unified::AccessError;
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
Expand Down Expand Up @@ -724,6 +725,7 @@ pub enum AccessBuilderImpl {
Bytes(BytesAccessBuilder),
DebeziumAvro(DebeziumAvroAccessBuilder),
DebeziumJson(DebeziumJsonAccessBuilder),
DebeziumMongoJson(DebeziumMongoJsonAccessBuilder),
}

impl AccessBuilderImpl {
Expand Down Expand Up @@ -756,6 +758,7 @@ impl AccessBuilderImpl {
Self::Bytes(builder) => builder.generate_accessor(payload).await?,
Self::DebeziumAvro(builder) => builder.generate_accessor(payload).await?,
Self::DebeziumJson(builder) => builder.generate_accessor(payload).await?,
Self::DebeziumMongoJson(builder) => builder.generate_accessor(payload).await?,
};
Ok(accessor)
}
Expand Down Expand Up @@ -803,7 +806,9 @@ impl ByteStreamSourceParserImpl {
CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
}
(ProtocolProperties::DebeziumMongo, EncodingProperties::Json(_)) => {
DebeziumMongoJsonParser::new(rw_columns, source_ctx).map(Self::DebeziumMongoJson)
DebeziumMongoJsonParser::new(rw_columns, source_ctx)
.await
.map(Self::DebeziumMongoJson)
}
(ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
Expand Down Expand Up @@ -917,6 +922,7 @@ pub enum EncodingProperties {
Protobuf(ProtobufProperties),
Csv(CsvProperties),
Json(JsonProperties),
MongoJson(JsonProperties),
Bytes(BytesProperties),
Native,
#[default]
Expand Down
Loading