-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc): support dynamodb and dynamodb cdc #17650
Changes from all commits
a90f80b
5e785a1
e5ee68b
6ff6228
f807721
cf192fc
0406cbe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use assert_matches::assert_matches; | ||
use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl, ToDatumRef}; | ||
|
||
use crate::error::ConnectorResult; | ||
use crate::only_parse_payload; | ||
use crate::parser::dynamodb::{build_dynamodb_json_accessor_builder, map_rw_type_to_dynamodb_type}; | ||
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; | ||
use crate::parser::unified::{ChangeEvent, ChangeEventOperation}; | ||
use crate::parser::{ | ||
Access, AccessBuilderImpl, AccessError, ByteStreamSourceParser, ParserFormat, | ||
SourceStreamChunkRowWriter, SpecificParserConfig, | ||
}; | ||
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; | ||
|
||
// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html | ||
// An example of a DynamoDB change event from Kinesis: | ||
// { | ||
// "awsRegion":"us-east-1", | ||
// "eventID":"26b1b865-1ee7-47f9-816e-d51cdeef9dc5", | ||
// "eventName":"MODIFY", | ||
// "userIdentity":null, | ||
// "recordFormat":"application/json", | ||
// "tableName":"wkx-test-orders", | ||
// "dynamodb":{ | ||
// "ApproximateCreationDateTime":1720046486486729, | ||
// "Keys":{ | ||
// "customer_name":{ | ||
// "S":"Bob Lee" | ||
// }, | ||
// "order_id":{ | ||
// "N":"3" | ||
// } | ||
// }, | ||
// "NewImage":{ | ||
// "order_status":{ | ||
// "N":"3" | ||
// }, | ||
// "order_date":{ | ||
// "N":"1720046486" | ||
// }, | ||
// "order_id":{ | ||
// "N":"3" | ||
// }, | ||
// "price":{ | ||
// "N":"63.06" | ||
// }, | ||
// "product_id":{ | ||
// "N":"2060" | ||
// }, | ||
// "customer_name":{ | ||
// "S":"Bob Lee" | ||
// } | ||
// }, | ||
// "OldImage":{ | ||
// "order_status":{ | ||
// "N":"3" | ||
// }, | ||
// "order_date":{ | ||
// "N":"1720037677" | ||
// }, | ||
// "order_id":{ | ||
// "N":"3" | ||
// }, | ||
// "price":{ | ||
// "N":"63.06" | ||
// }, | ||
// "product_id":{ | ||
// "N":"2060" | ||
// }, | ||
// "customer_name":{ | ||
// "S":"Bob Lee" | ||
// } | ||
// }, | ||
// "SizeBytes":192, | ||
// "ApproximateCreationDateTimePrecision":"MICROSECOND" | ||
// }, | ||
// "eventSource":"aws:dynamodb" | ||
// } | ||
#[derive(Debug)] | ||
pub struct DynamodbCdcJsonParser { | ||
KeXiangWang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
payload_builder: AccessBuilderImpl, | ||
pub(crate) rw_columns: Vec<SourceColumnDesc>, | ||
source_ctx: SourceContextRef, | ||
single_jsonb_column: String, | ||
} | ||
|
||
impl DynamodbCdcJsonParser { | ||
pub async fn new( | ||
props: SpecificParserConfig, | ||
rw_columns: Vec<SourceColumnDesc>, | ||
source_ctx: SourceContextRef, | ||
) -> ConnectorResult<Self> { | ||
// the key of Dynamodb CDC are embedded value of primary key and partition key, which is not used here. | ||
let (payload_builder, single_jsonb_column) = | ||
build_dynamodb_json_accessor_builder(props.encoding_config).await?; | ||
Ok(Self { | ||
payload_builder, | ||
rw_columns, | ||
source_ctx, | ||
single_jsonb_column, | ||
}) | ||
} | ||
|
||
pub async fn parse_inner( | ||
&mut self, | ||
payload: Vec<u8>, | ||
mut writer: SourceStreamChunkRowWriter<'_>, | ||
) -> ConnectorResult<()> { | ||
let payload_accessor = self.payload_builder.generate_accessor(payload).await?; | ||
let row_op = DynamodbChangeEvent::new(payload_accessor, self.single_jsonb_column.clone()); | ||
match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) { | ||
Ok(_) => Ok(()), | ||
Err(err) => Err(err)?, | ||
} | ||
} | ||
} | ||
|
||
impl ByteStreamSourceParser for DynamodbCdcJsonParser { | ||
fn columns(&self) -> &[SourceColumnDesc] { | ||
&self.rw_columns | ||
} | ||
|
||
fn source_ctx(&self) -> &SourceContext { | ||
&self.source_ctx | ||
} | ||
|
||
fn parser_format(&self) -> ParserFormat { | ||
ParserFormat::DynamodbCdcJson | ||
} | ||
|
||
async fn parse_one<'a>( | ||
&'a mut self, | ||
_key: Option<Vec<u8>>, | ||
payload: Option<Vec<u8>>, | ||
writer: SourceStreamChunkRowWriter<'a>, | ||
) -> ConnectorResult<()> { | ||
only_parse_payload!(self, payload, writer) | ||
} | ||
} | ||
|
||
struct DynamodbChangeEvent<A> { | ||
value_accessor: A, | ||
single_jsonb_column: String, | ||
} | ||
|
||
const OLD_IMAGE: &str = "OldImage"; | ||
const NEW_IMAGE: &str = "NewImage"; | ||
const DYNAMODB: &str = "dynamodb"; | ||
|
||
const DYNAMODB_CREATE_OP: &str = "INSERT"; | ||
const DYNAMODB_MODIFY_OP: &str = "MODIFY"; | ||
const DYNAMODB_REMOVE_OP: &str = "REMOVE"; | ||
|
||
const OP: &str = "eventName"; | ||
|
||
impl<A> DynamodbChangeEvent<A> | ||
where | ||
A: Access, | ||
{ | ||
pub fn new(value_accessor: A, single_jsonb_column: String) -> Self { | ||
Self { | ||
value_accessor, | ||
single_jsonb_column, | ||
} | ||
} | ||
} | ||
|
||
impl<A> ChangeEvent for DynamodbChangeEvent<A> | ||
where | ||
A: Access, | ||
{ | ||
fn access_field(&self, desc: &SourceColumnDesc) -> crate::parser::AccessResult<DatumCow<'_>> { | ||
if desc.name == self.single_jsonb_column { | ||
assert_matches!(desc.data_type, DataType::Jsonb); | ||
match self.op()? { | ||
ChangeEventOperation::Delete => self | ||
.value_accessor | ||
.access(&[DYNAMODB, OLD_IMAGE], &desc.data_type), | ||
ChangeEventOperation::Upsert => self | ||
.value_accessor | ||
.access(&[DYNAMODB, NEW_IMAGE], &desc.data_type), | ||
} | ||
} else { | ||
let dynamodb_type = map_rw_type_to_dynamodb_type(&desc.data_type)?; | ||
match self.op()? { | ||
ChangeEventOperation::Delete => self.value_accessor.access( | ||
&[DYNAMODB, OLD_IMAGE, &desc.name, dynamodb_type.as_str()], | ||
&desc.data_type, | ||
), | ||
ChangeEventOperation::Upsert => self.value_accessor.access( | ||
&[DYNAMODB, NEW_IMAGE, &desc.name, dynamodb_type.as_str()], | ||
&desc.data_type, | ||
), | ||
} | ||
} | ||
} | ||
|
||
fn op(&self) -> Result<ChangeEventOperation, AccessError> { | ||
if let Some(ScalarRefImpl::Utf8(op)) = self | ||
.value_accessor | ||
.access(&[OP], &DataType::Varchar)? | ||
.to_datum_ref() | ||
{ | ||
match op { | ||
DYNAMODB_CREATE_OP | DYNAMODB_MODIFY_OP => return Ok(ChangeEventOperation::Upsert), | ||
DYNAMODB_REMOVE_OP => return Ok(ChangeEventOperation::Delete), | ||
_ => panic!("Unknown dynamodb event operation: {}", op), | ||
} | ||
} | ||
Err(super::AccessError::Undefined { | ||
name: "op".into(), | ||
path: String::from(OP), | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use assert_matches::assert_matches; | ||
use risingwave_common::types::DataType; | ||
use risingwave_connector_codec::decoder::Access; | ||
|
||
use crate::error::ConnectorResult; | ||
use crate::only_parse_payload; | ||
use crate::parser::dynamodb::{build_dynamodb_json_accessor_builder, map_rw_type_to_dynamodb_type}; | ||
use crate::parser::{ | ||
AccessBuilderImpl, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter, | ||
SpecificParserConfig, | ||
}; | ||
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; | ||
|
||
const ITEM: &str = "Item"; | ||
|
||
// An example of a DynamoDB event on S3: | ||
// { | ||
// "Item":{ | ||
// "customer_name":{ | ||
// "S":"Bob Lee" | ||
// }, | ||
// "order_id":{ | ||
// "N":"3" | ||
// }, | ||
// "price":{ | ||
// "N":"63.06" | ||
// }, | ||
// "order_status":{ | ||
// "N":"3" | ||
// }, | ||
// "product_id":{ | ||
// "N":"2060" | ||
// }, | ||
// "order_date":{ | ||
// "N":"1720037677" | ||
// } | ||
// } | ||
// } | ||
#[derive(Debug)] | ||
pub struct DynamodbJsonParser { | ||
payload_builder: AccessBuilderImpl, | ||
pub(crate) rw_columns: Vec<SourceColumnDesc>, | ||
source_ctx: SourceContextRef, | ||
single_jsonb_column: String, | ||
} | ||
|
||
impl DynamodbJsonParser { | ||
pub async fn new( | ||
props: SpecificParserConfig, | ||
rw_columns: Vec<SourceColumnDesc>, | ||
source_ctx: SourceContextRef, | ||
) -> ConnectorResult<Self> { | ||
let (payload_builder, single_jsonb_column) = | ||
build_dynamodb_json_accessor_builder(props.encoding_config).await?; | ||
Ok(Self { | ||
payload_builder, | ||
rw_columns, | ||
source_ctx, | ||
single_jsonb_column, | ||
}) | ||
} | ||
|
||
pub async fn parse_inner( | ||
&mut self, | ||
payload: Vec<u8>, | ||
mut writer: SourceStreamChunkRowWriter<'_>, | ||
) -> ConnectorResult<()> { | ||
let payload_accessor = self.payload_builder.generate_accessor(payload).await?; | ||
writer.do_insert(|column| { | ||
if column.name == self.single_jsonb_column { | ||
assert_matches!(column.data_type, DataType::Jsonb); | ||
payload_accessor.access(&[ITEM], &column.data_type) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little concerned about this: In the result jsonb, user will need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, correct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I have concerns about value range. Can you help confirm that data from DynamoDB can be correctly represented in JSON? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why it's "inevitable"? Actually I had thought introducing BTW, how does other systems' dynamodb CDC's behavior? Will they also need to access the type field? I'm not arguing we should do that. Just express confusion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://www.youtube.com/live/JwDbdsQPWak by tinybird There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As they are not a database, they didn't try to understand the json though, basically they only parsed the top layer into the table. We can also parse the system info (like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xxchan You are right in some senses. Thanks for bringing this up. Essentially every field in dynamodb is like a Alternatively, we could:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is why we did not encode the whole row into a single jsonb column.
Agree, I prefer letting users define the schema themselves. Wrapping it as an individual format is overkill and I don't see a great advantage from it.
But RisingWave serves as a relational database for streaming jobs and analysis. How do users run a streaming job without a clear definition of what the job does? If the user is unsure about a column's content, keeping it as bytes may keep the best compatibility, which we already supported. |
||
} else { | ||
let dynamodb_type = map_rw_type_to_dynamodb_type(&column.data_type)?; | ||
payload_accessor.access( | ||
&[ITEM, &column.name, dynamodb_type.as_str()], | ||
&column.data_type, | ||
) | ||
} | ||
})?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl ByteStreamSourceParser for DynamodbJsonParser { | ||
fn columns(&self) -> &[SourceColumnDesc] { | ||
&self.rw_columns | ||
} | ||
|
||
fn source_ctx(&self) -> &SourceContext { | ||
&self.source_ctx | ||
} | ||
|
||
fn parser_format(&self) -> ParserFormat { | ||
ParserFormat::DynamodbJson | ||
} | ||
|
||
async fn parse_one<'a>( | ||
&'a mut self, | ||
_key: Option<Vec<u8>>, | ||
payload: Option<Vec<u8>>, | ||
writer: SourceStreamChunkRowWriter<'a>, | ||
) -> ConnectorResult<()> { | ||
only_parse_payload!(self, payload, writer) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a chance that they just keep the fields in
Keys
instead of the whole row?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They don't seem to provide this option.