-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc): support dynamodb and dynamodb cdc #17650
Conversation
a96e036
to
304a0e4
Compare
304a0e4
to
e5ee68b
Compare
if let Some(mode) = config.timestamptz_handling { | ||
json_parse_options.timestamptz_handling = mode; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the timestamp format is a fixed one, not arbitrary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, this part is not well-designed. 🤔Since we enforce users to use single_blob_column, we don't need to really parse the timestamp data. The only usage for this is when user want to parse timestamp as primary keys or in the future they may want to include system info like dynamodb.ApproximateCreationDateTime
as additional include columns. I choose to use GuessNumberUnit
because it accepts more format, including dynamodb.ApproximateCreationDateTime
's format.
writer.do_insert(|column| { | ||
if column.name == self.single_blob_column { | ||
assert_matches!(column.data_type, DataType::Jsonb); | ||
payload_accessor.access(&[ITEM], &column.data_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about this: In the result jsonb, user will need to data->field->S/N
to access the field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes.
data->field can either be an S or N.
->S/N can be viewed as a manual type casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes.
data->field can either be an S or N.
->S/N can be viewed as a manual type casting.
I have concerns about value range. Can you help confirm that data from DynamoDB can be correctly represented in JSON?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's inevitable anyway
Why it's "inevitable"? Actually I had thought introducing FROMAT DYNAMODB
is because we want to treat it specially. (like no need to access type, but parser does it implicitly) But now it seems it's not different from FORMAT PLAIN
(just syntactic sugar?).
BTW, how does other systems' dynamodb CDC's behavior? Will they also need to access the type field?
I'm not arguing we should do that. Just express confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.youtube.com/live/JwDbdsQPWak by tinybird
at 27:05
You can see they just keep the how event as a json.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As they are not a database, they didn't try to understand the json though, basically they only parsed the top layer into the table. We can also parse the system info (like eventID
and awsRegion
) besides the primary keys and main events, if neccessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xxchan You are right in some senses. Thanks for bringing this up.
Essentially every field in dynamodb is like a jsonb
, since they are not strictly typed.
If the user wants to make sure the returned result is a numeric, a manual cast is inevitable, i.e, v::numeric
, it's basically equivalent with ->>'N'
.
However, I agree that mapping every field internally to JSONB instead of ->>'N'
or ->>'S'
can avoid exposing details to users. A downside of this approach is that it involves a copy and rewrite of the whole record, but I think it's acceptable in terms of performance. cc @KeXiangWang
Alternatively, we could:
- Define a schema beforehand in RW, and skip records that do not match the schema. But I think this goes against dynamodb's nosql design. (Decodable)
- Automatically infer the schema from historical snapshot, and skip records that do not match the schema. Anti-pattern as well. (Airbyte, Hevo)
- Inherently allow multiple types in one column. This is technically hard to implement for now in our current type system. (Rockset)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Numbers can be positive, negative, or zero. Numbers can have up to 38 digits of precision. Exceeding this results in an exception. If you need greater precision than 38 digits, you can use strings.
Thanks @tabVersion for reminding this.
It indeed has a higher precision compared to our decimals (28 digits). 😢
How did we typically address such cases? cc @xiangjinwu
It is why we did not encode the whole row into a single jsonb column.
JSON has lower precision in its standard. We can do little about it as an application.
Why it's "inevitable"? Actually I had thought introducing FROMAT DYNAMODB is because we want to treat it specially. (like no need to access type, but parser does it implicitly) But now it seems it's not different from FORMAT PLAIN (just syntactic sugar?).
Agree, I prefer letting users define the schema themselves. Wrapping it as an individual format is overkill and I don't see a great advantage from it.
We may pivot to support tools generating a create source
statement from one message instead.
Define a schema beforehand in RW, and skip records that do not match the schema. I think this goes against dynamodb's nosql design. (Decodable)
But RisingWave serves as a relational database for streaming jobs and analysis. How do users run a streaming job without a clear definition of what the job does? If the user is unsure about a column's content, keeping it as bytes may keep the best compatibility, which we already supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that DynamoDB doesn't have a strict schema, the user is required to specify at least one primary key that corresponds to Dynamo's partition key
Don't fully get this. What's the relationship between "strict schema" and "require pk"?
pub const DYNAMODB: JsonParseOptions = JsonParseOptions { | ||
bytea_handling: ByteaHandling::Standard, | ||
time_handling: TimeHandling::Micro, | ||
timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible | ||
json_value_handling: JsonValueHandling::AsValue, | ||
numeric_handling: NumericHandling::Relax { | ||
string_parsing: true, | ||
}, | ||
boolean_handling: BooleanHandling::Relax { | ||
string_parsing: true, | ||
string_integer_parsing: true, | ||
}, | ||
varchar_handling: VarcharHandling::Strict, | ||
struct_handling: StructHandling::Strict, | ||
ignoring_keycase: true, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you verify the configs are correct? (At least the "backward-compatible" comment is wrong 😂)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add some comments to demonstrate each choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly no specific reasons, as we don't really need to parse the data and we cannot predict the real formats of the data: in dynamodb and kinesis, they are just strings. The only thing I am trying to ensure is that all these handling modes accept strings as input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't understand what these configs do, and why do we need a specific config (like found an example data where the default doesn't work), perhaps you should just use DEFAULT
. @xiangjinwu how do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just took a quick attempt. DEFAULT
work as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hard work on the urgent feature! I really hope we could have more time to discuss and review though...
Problems commented inline, others are not too harmful to me.
Sorry if I didn't express it clearly. I've updated the explanation.
|
BTW we still have time to discuss the syntax. No worries. cc @xxchan |
// "OldImage":{ | ||
// "order_status":{ | ||
// "N":"3" | ||
// }, | ||
// "order_date":{ | ||
// "N":"1720037677" | ||
// }, | ||
// "order_id":{ | ||
// "N":"3" | ||
// }, | ||
// "price":{ | ||
// "N":"63.06" | ||
// }, | ||
// "product_id":{ | ||
// "N":"2060" | ||
// }, | ||
// "customer_name":{ | ||
// "S":"Bob Lee" | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a chance that they just keep the fields in Keys
instead of the whole row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They don't seem to provide this option.
Things are different here. MongoDB stores JSON internally (actually BSON but almost the same), which means RisingWave is not responsible for the data precision. |
^ This is the more important issue (Less important but continuing the previous discussion:)
Example: {
"large_number": {"N": "1152921504606846976"},
"bytes": {"B": "3q2+7w=="},
"str": {"S": "hello"},
"ok": {"BOOL": true},
"unknown": {"NULL": true},
"list": {"L": [{"N": "12.5"}, {"S": "hello"}, {"B": "wN4="}]},
"map": {"M": {"foo":{"N": "-3"}, "bar": {"S": "dummy"}}},
"num_set": {"NS": ["7.4", "8.9"]}
} Note that
|
Do we plan to ad support for dynamodb streams as well, since it does-not require an extra kinesis setup ? |
@himanshpal Supporting dynamodb streams requires an extra source implementation, while, for kinesis stream, we can reuse the existing kinesis source. We are recently discussing internally for dynamodb, the final solution will be post later. |
This PR has been open for 60 days with no activity. If it's blocked by code review, feel free to ping a reviewer or ask someone else to review it. If you think it is still relevant today, and have time to work on it in the near future, you can comment to update the status, or just manually remove the You can also confidently close this PR to keep our backlog clean. (If no further action taken, the PR will be automatically closed after 7 days. Sorry! 🙏) |
Close this PR as there's no further actions taken after it is marked as stale for 7 days. Sorry! 🙏 You can reopen it when you have time to continue working on it. |
We basically have two solutions for this type of requirement, i.e., ingest both bounded historical data and unbounded stream data in one source:
If we decide to use the second solution, the above PR can help resolve the dynamodb feature request. However, we still cannot make an agreement on the final solution. So we suspend this PR for now. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Edited by @neverchanje
Below is the overall workflow that comprises 3 SQL commands. First, the user will create a s3 table to load historical snapshot exported from DynamoDB. Second, the user create a Kinesis table for incremental data. Finally, user merges the incremental data with the snapshot using CREATE SINK INTO.
This PR introduces serveral new syntaxes to support dynamodb cdc.
single_blob_column
that enables user to specify a column to store the whole JSON payload. As DynamoDB doesn't have a strict schema, the user will only be required to specify at least one primary key that corresponds to Dynamo's partition key and a single JSONB column as the value. DynamoDB's sort key is optional and thus it's not always required to have 2 primary keys.Note that we also plan to support
single_blob_column
for plain JSON sources to enable schemaless definition. This will address the requirement of https://www.notion.so/risingwave-labs/Source-a-data-JSONB-field-to-store-the-whole-record-03324d1bd06a40d8af0147de03d02351?pvs=4DYNAMODB_CDC
andDYNAMODB
.DYNAMODB is used to parse the s3 snapshot dumped from dynamodb:
DYNAMODB_CDC is used parse dynamodb's CDC events in Kinesis.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.