Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support dynamodb and dynamodb cdc #17650

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ enum FormatType {
FORMAT_TYPE_UPSERT = 6;
FORMAT_TYPE_PLAIN = 7;
FORMAT_TYPE_NONE = 8;
FORMAT_TYPE_DYNAMODB = 9;
FORMAT_TYPE_DYNAMODB_CDC = 10;
}

enum EncodeType {
Expand All @@ -152,6 +154,7 @@ enum EncodeType {
ENCODE_TYPE_TEXT = 9;
}

// Deprecated. Use FormatType and EncodeType instead.
enum RowFormatType {
ROW_UNSPECIFIED = 0;
JSON = 1;
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl DebeziumParser {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_jsonb_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down Expand Up @@ -226,6 +227,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_jsonb_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down Expand Up @@ -298,6 +300,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_jsonb_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_jsonb_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down
229 changes: 229 additions & 0 deletions src/connector/src/parser/dynamodb/cdc_json_parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use assert_matches::assert_matches;
use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl, ToDatumRef};

use crate::error::ConnectorResult;
use crate::only_parse_payload;
use crate::parser::dynamodb::{build_dynamodb_json_accessor_builder, map_rw_type_to_dynamodb_type};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::unified::{ChangeEvent, ChangeEventOperation};
use crate::parser::{
Access, AccessBuilderImpl, AccessError, ByteStreamSourceParser, ParserFormat,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html
// An example of a DynamoDB change event from Kinesis:
// {
// "awsRegion":"us-east-1",
// "eventID":"26b1b865-1ee7-47f9-816e-d51cdeef9dc5",
// "eventName":"MODIFY",
// "userIdentity":null,
// "recordFormat":"application/json",
// "tableName":"wkx-test-orders",
// "dynamodb":{
// "ApproximateCreationDateTime":1720046486486729,
// "Keys":{
// "customer_name":{
// "S":"Bob Lee"
// },
// "order_id":{
// "N":"3"
// }
// },
// "NewImage":{
// "order_status":{
// "N":"3"
// },
// "order_date":{
// "N":"1720046486"
// },
// "order_id":{
// "N":"3"
// },
// "price":{
// "N":"63.06"
// },
// "product_id":{
// "N":"2060"
// },
// "customer_name":{
// "S":"Bob Lee"
// }
// },
// "OldImage":{
// "order_status":{
// "N":"3"
// },
// "order_date":{
// "N":"1720037677"
// },
// "order_id":{
// "N":"3"
// },
// "price":{
// "N":"63.06"
// },
// "product_id":{
// "N":"2060"
// },
// "customer_name":{
// "S":"Bob Lee"
// }
Comment on lines +68 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that they just keep the fields in Keys instead of the whole row?

Copy link
Contributor Author

@KeXiangWang KeXiangWang Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't seem to provide this option.

// },
// "SizeBytes":192,
// "ApproximateCreationDateTimePrecision":"MICROSECOND"
// },
// "eventSource":"aws:dynamodb"
// }
#[derive(Debug)]
pub struct DynamodbCdcJsonParser {
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
single_jsonb_column: String,
}

impl DynamodbCdcJsonParser {
pub async fn new(
props: SpecificParserConfig,
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
// the key of Dynamodb CDC are embedded value of primary key and partition key, which is not used here.
let (payload_builder, single_jsonb_column) =
build_dynamodb_json_accessor_builder(props.encoding_config).await?;
Ok(Self {
payload_builder,
rw_columns,
source_ctx,
single_jsonb_column,
})
}

pub async fn parse_inner(
&mut self,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
let row_op = DynamodbChangeEvent::new(payload_accessor, self.single_jsonb_column.clone());
match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
Ok(_) => Ok(()),
Err(err) => Err(err)?,
}
}
}

impl ByteStreamSourceParser for DynamodbCdcJsonParser {
fn columns(&self) -> &[SourceColumnDesc] {
&self.rw_columns
}

fn source_ctx(&self) -> &SourceContext {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::DynamodbCdcJson
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> ConnectorResult<()> {
only_parse_payload!(self, payload, writer)
}
}

struct DynamodbChangeEvent<A> {
value_accessor: A,
single_jsonb_column: String,
}

const OLD_IMAGE: &str = "OldImage";
const NEW_IMAGE: &str = "NewImage";
const DYNAMODB: &str = "dynamodb";

const DYNAMODB_CREATE_OP: &str = "INSERT";
const DYNAMODB_MODIFY_OP: &str = "MODIFY";
const DYNAMODB_REMOVE_OP: &str = "REMOVE";

const OP: &str = "eventName";

impl<A> DynamodbChangeEvent<A>
where
A: Access,
{
pub fn new(value_accessor: A, single_jsonb_column: String) -> Self {
Self {
value_accessor,
single_jsonb_column,
}
}
}

impl<A> ChangeEvent for DynamodbChangeEvent<A>
where
A: Access,
{
fn access_field(&self, desc: &SourceColumnDesc) -> crate::parser::AccessResult<DatumCow<'_>> {
if desc.name == self.single_jsonb_column {
assert_matches!(desc.data_type, DataType::Jsonb);
match self.op()? {
ChangeEventOperation::Delete => self
.value_accessor
.access(&[DYNAMODB, OLD_IMAGE], &desc.data_type),
ChangeEventOperation::Upsert => self
.value_accessor
.access(&[DYNAMODB, NEW_IMAGE], &desc.data_type),
}
} else {
let dynamodb_type = map_rw_type_to_dynamodb_type(&desc.data_type)?;
match self.op()? {
ChangeEventOperation::Delete => self.value_accessor.access(
&[DYNAMODB, OLD_IMAGE, &desc.name, dynamodb_type.as_str()],
&desc.data_type,
),
ChangeEventOperation::Upsert => self.value_accessor.access(
&[DYNAMODB, NEW_IMAGE, &desc.name, dynamodb_type.as_str()],
&desc.data_type,
),
}
}
}

fn op(&self) -> Result<ChangeEventOperation, AccessError> {
if let Some(ScalarRefImpl::Utf8(op)) = self
.value_accessor
.access(&[OP], &DataType::Varchar)?
.to_datum_ref()
{
match op {
DYNAMODB_CREATE_OP | DYNAMODB_MODIFY_OP => return Ok(ChangeEventOperation::Upsert),
DYNAMODB_REMOVE_OP => return Ok(ChangeEventOperation::Delete),
_ => panic!("Unknown dynamodb event operation: {}", op),
}
}
Err(super::AccessError::Undefined {
name: "op".into(),
path: String::from(OP),
})
}
}
120 changes: 120 additions & 0 deletions src/connector/src/parser/dynamodb/json_parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use assert_matches::assert_matches;
use risingwave_common::types::DataType;
use risingwave_connector_codec::decoder::Access;

use crate::error::ConnectorResult;
use crate::only_parse_payload;
use crate::parser::dynamodb::{build_dynamodb_json_accessor_builder, map_rw_type_to_dynamodb_type};
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter,
SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

const ITEM: &str = "Item";

// An example of a DynamoDB event on S3:
// {
// "Item":{
// "customer_name":{
// "S":"Bob Lee"
// },
// "order_id":{
// "N":"3"
// },
// "price":{
// "N":"63.06"
// },
// "order_status":{
// "N":"3"
// },
// "product_id":{
// "N":"2060"
// },
// "order_date":{
// "N":"1720037677"
// }
// }
// }
#[derive(Debug)]
pub struct DynamodbJsonParser {
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
single_jsonb_column: String,
}

impl DynamodbJsonParser {
pub async fn new(
props: SpecificParserConfig,
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let (payload_builder, single_jsonb_column) =
build_dynamodb_json_accessor_builder(props.encoding_config).await?;
Ok(Self {
payload_builder,
rw_columns,
source_ctx,
single_jsonb_column,
})
}

pub async fn parse_inner(
&mut self,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
writer.do_insert(|column| {
if column.name == self.single_jsonb_column {
assert_matches!(column.data_type, DataType::Jsonb);
payload_accessor.access(&[ITEM], &column.data_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about this: In the result jsonb, user will need to data->field->S/N to access the field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes.
data->field can either be an S or N.
->S/N can be viewed as a manual type casting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes.
data->field can either be an S or N.
->S/N can be viewed as a manual type casting.

I have concerns about value range. Can you help confirm that data from DynamoDB can be correctly represented in JSON?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's inevitable anyway

Why it's "inevitable"? Actually I had thought introducing FROMAT DYNAMODB is because we want to treat it specially. (like no need to access type, but parser does it implicitly) But now it seems it's not different from FORMAT PLAIN (just syntactic sugar?).

BTW, how does other systems' dynamodb CDC's behavior? Will they also need to access the type field?

I'm not arguing we should do that. Just express confusion.

Copy link
Contributor Author

@KeXiangWang KeXiangWang Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.youtube.com/live/JwDbdsQPWak by tinybird
at 27:05
You can see they just keep the how event as a json.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As they are not a database, they didn't try to understand the json though, basically they only parsed the top layer into the table. We can also parse the system info (like eventID and awsRegion) besides the primary keys and main events, if neccessary.

Copy link
Contributor

@neverchanje neverchanje Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xxchan You are right in some senses. Thanks for bringing this up.

Essentially every field in dynamodb is like a jsonb, since they are not strictly typed.
If the user wants to make sure the returned result is a numeric, a manual cast is inevitable, i.e, v::numeric, it's basically equivalent with ->>'N'.
However, I agree that mapping every field internally to JSONB instead of ->>'N' or ->>'S' can avoid exposing details to users. A downside of this approach is that it involves a copy and rewrite of the whole record, but I think it's acceptable in terms of performance. cc @KeXiangWang

Alternatively, we could:

  • Define a schema beforehand in RW, and skip records that do not match the schema. But I think this goes against dynamodb's nosql design. (Decodable)
  • Automatically infer the schema from historical snapshot, and skip records that do not match the schema. Anti-pattern as well. (Airbyte, Hevo)
  • Inherently allow multiple types in one column. This is technically hard to implement for now in our current type system. (Rockset)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Numbers can be positive, negative, or zero. Numbers can have up to 38 digits of precision. Exceeding this results in an exception. If you need greater precision than 38 digits, you can use strings.

Thanks @tabVersion for reminding this.
It indeed has a higher precision compared to our decimals (28 digits). 😢
How did we typically address such cases? cc @xiangjinwu

It is why we did not encode the whole row into a single jsonb column.
JSON has lower precision in its standard. We can do little about it as an application.

Why it's "inevitable"? Actually I had thought introducing FROMAT DYNAMODB is because we want to treat it specially. (like no need to access type, but parser does it implicitly) But now it seems it's not different from FORMAT PLAIN (just syntactic sugar?).

Agree, I prefer letting users define the schema themselves. Wrapping it as an individual format is overkill and I don't see a great advantage from it.
We may pivot to support tools generating a create source statement from one message instead.

Define a schema beforehand in RW, and skip records that do not match the schema. I think this goes against dynamodb's nosql design. (Decodable)

But RisingWave serves as a relational database for streaming jobs and analysis. How do users run a streaming job without a clear definition of what the job does? If the user is unsure about a column's content, keeping it as bytes may keep the best compatibility, which we already supported.

} else {
let dynamodb_type = map_rw_type_to_dynamodb_type(&column.data_type)?;
payload_accessor.access(
&[ITEM, &column.name, dynamodb_type.as_str()],
&column.data_type,
)
}
})?;
Ok(())
}
}

impl ByteStreamSourceParser for DynamodbJsonParser {
fn columns(&self) -> &[SourceColumnDesc] {
&self.rw_columns
}

fn source_ctx(&self) -> &SourceContext {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::DynamodbJson
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> ConnectorResult<()> {
only_parse_payload!(self, payload, writer)
}
}
Loading
Loading