Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support dynamodb and dynamodb cdc #17650

Closed
wants to merge 7 commits into from
Closed

Conversation

KeXiangWang
Copy link
Contributor

@KeXiangWang KeXiangWang commented Jul 10, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Edited by @neverchanje

Below is the overall workflow that comprises 3 SQL commands. First, the user will create a s3 table to load historical snapshot exported from DynamoDB. Second, the user create a Kinesis table for incremental data. Finally, user merges the incremental data with the snapshot using CREATE SINK INTO.

CREATE TABLE orders (
    order_id INT,
    customer_name VARCHAR,
    data JSONB,
    PRIMARY KEY (order_id, customer_name)
) WITH (
    connector = 's3_v2',
    match_pattern = 'orders/AWSDynamoDB/012345-abc/data/*',
    s3.region_name = 'us-east-1',
    s3.bucket_name = 'wkx-dynamodb',
    s3.credentials.access = 'ABCDEFG',
    s3.credentials.secret = 'abcdefg',
    s3.endpoint_url = 'https://s3.us-east-1.amazonaws.com/'
) FORMAT DYNAMODB ENCODE JSON (
    single_blob_column = 'data'
);

CREATE TABLE orders_kinesis (
    order_id INT,
    customer_name VARCHAR,
    data JSONB,
    PRIMARY KEY (order_id, customer_name)
) WITH (
  connector = 'kinesis',
  stream = 'wkx-dynamo-orders',
  scan.startup.mode='earliest',
  aws.region = 'us-east-1',
  kinesis.credentials.access = 'ABCDEFG',
  kinesis.credentials.secret = 'abcdefg',
) FORMAT DYNAMODB_CDC ENCODE JSON (
    single_blob_column = 'data'
);

CREATE SINK orders_kinesis_sink INTO orders FROM orders_kinesis;

This PR introduces serveral new syntaxes to support dynamodb cdc.

  1. Introduce a format property single_blob_column that enables user to specify a column to store the whole JSON payload. As DynamoDB doesn't have a strict schema, the user will only be required to specify at least one primary key that corresponds to Dynamo's partition key and a single JSONB column as the value. DynamoDB's sort key is optional and thus it's not always required to have 2 primary keys.
FORMAT DYNAMODB_CDC ENCODE JSON (
    single_blob_column = 'data'
)

Note that we also plan to support single_blob_column for plain JSON sources to enable schemaless definition. This will address the requirement of https://www.notion.so/risingwave-labs/Source-a-data-JSONB-field-to-store-the-whole-record-03324d1bd06a40d8af0147de03d02351?pvs=4

  1. Introduce two format parsers, DYNAMODB_CDC and DYNAMODB.

DYNAMODB is used to parse the s3 snapshot dumped from dynamodb:

{"Item":{"user_name":{"S":"charlie_brown"},"user_id":{"S":"003"},"created_at":{"S":"2024-03-20T11:15:00Z"},"email":{"S":"[email protected]"}}}
{"Item":{"user_name":{"S":"bob_jones"},"user_id":{"S":"002"},"created_at":{"S":"2024-02-15T09:30:00Z"},"email":{"S":"[email protected]"}}}
{"Item":{"user_name":{"S":"daisy_johnson"},"user_id":{"S":"004"},"created_at":{"S":"2024-04-25T14:00:00Z"},"email":{"S":"[email protected]"}}}

DYNAMODB_CDC is used parse dynamodb's CDC events in Kinesis.

{
  "awsRegion":"ap-southeast-1",
  "dynamodb":{
    "ApproximateCreationDateTime":1719569460436205,
    "ApproximateCreationDateTimePrecision":"MICROSECOND",
    "Keys":{
      "user_id":{
        "S":"003"
      },
      "user_name":{
        "S":"charlie_brown"
      }
    },
    "NewImage":{
      "created_at":{
        "S":"2024-03-20T11:15:00Z"
      },
      "email":{
        "S":"[email protected]"
      },
      "user_id":{
        "S":"003"
      },
      "user_name":{
        "S":"charlie_brown"
      }
    },
    "SizeBytes":124
  },
  "eventID":"e30bcac7-e409-4c98-b0c0-03ffce123a84",
  "eventName":"INSERT",
  "eventSource":"aws:dynamodb",
  "recordFormat":"application/json",
  "tableName":"website_users",
  "userIdentity":null
}

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

proto/catalog.proto Outdated Show resolved Hide resolved
@KeXiangWang KeXiangWang requested review from hzxa21 and fuyufjh July 11, 2024 03:16
Comment on lines +86 to +88
if let Some(mode) = config.timestamptz_handling {
json_parse_options.timestamptz_handling = mode;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the timestamp format is a fixed one, not arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this part is not well-designed. 🤔Since we enforce users to use single_blob_column, we don't need to really parse the timestamp data. The only usage for this is when user want to parse timestamp as primary keys or in the future they may want to include system info like dynamodb.ApproximateCreationDateTime as additional include columns. I choose to use GuessNumberUnit because it accepts more format, including dynamodb.ApproximateCreationDateTime's format.

writer.do_insert(|column| {
if column.name == self.single_blob_column {
assert_matches!(column.data_type, DataType::Jsonb);
payload_accessor.access(&[ITEM], &column.data_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about this: In the result jsonb, user will need to data->field->S/N to access the field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes.
data->field can either be an S or N.
->S/N can be viewed as a manual type casting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's inevitable anyway, as DynamoDB doesn't not enforce the type of attributes.
data->field can either be an S or N.
->S/N can be viewed as a manual type casting.

I have concerns about value range. Can you help confirm that data from DynamoDB can be correctly represented in JSON?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's inevitable anyway

Why it's "inevitable"? Actually I had thought introducing FROMAT DYNAMODB is because we want to treat it specially. (like no need to access type, but parser does it implicitly) But now it seems it's not different from FORMAT PLAIN (just syntactic sugar?).

BTW, how does other systems' dynamodb CDC's behavior? Will they also need to access the type field?

I'm not arguing we should do that. Just express confusion.

Copy link
Contributor Author

@KeXiangWang KeXiangWang Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.youtube.com/live/JwDbdsQPWak by tinybird
at 27:05
You can see they just keep the how event as a json.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As they are not a database, they didn't try to understand the json though, basically they only parsed the top layer into the table. We can also parse the system info (like eventID and awsRegion) besides the primary keys and main events, if neccessary.

Copy link
Contributor

@neverchanje neverchanje Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xxchan You are right in some senses. Thanks for bringing this up.

Essentially every field in dynamodb is like a jsonb, since they are not strictly typed.
If the user wants to make sure the returned result is a numeric, a manual cast is inevitable, i.e, v::numeric, it's basically equivalent with ->>'N'.
However, I agree that mapping every field internally to JSONB instead of ->>'N' or ->>'S' can avoid exposing details to users. A downside of this approach is that it involves a copy and rewrite of the whole record, but I think it's acceptable in terms of performance. cc @KeXiangWang

Alternatively, we could:

  • Define a schema beforehand in RW, and skip records that do not match the schema. But I think this goes against dynamodb's nosql design. (Decodable)
  • Automatically infer the schema from historical snapshot, and skip records that do not match the schema. Anti-pattern as well. (Airbyte, Hevo)
  • Inherently allow multiple types in one column. This is technically hard to implement for now in our current type system. (Rockset)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Numbers can be positive, negative, or zero. Numbers can have up to 38 digits of precision. Exceeding this results in an exception. If you need greater precision than 38 digits, you can use strings.

Thanks @tabVersion for reminding this.
It indeed has a higher precision compared to our decimals (28 digits). 😢
How did we typically address such cases? cc @xiangjinwu

It is why we did not encode the whole row into a single jsonb column.
JSON has lower precision in its standard. We can do little about it as an application.

Why it's "inevitable"? Actually I had thought introducing FROMAT DYNAMODB is because we want to treat it specially. (like no need to access type, but parser does it implicitly) But now it seems it's not different from FORMAT PLAIN (just syntactic sugar?).

Agree, I prefer letting users define the schema themselves. Wrapping it as an individual format is overkill and I don't see a great advantage from it.
We may pivot to support tools generating a create source statement from one message instead.

Define a schema beforehand in RW, and skip records that do not match the schema. I think this goes against dynamodb's nosql design. (Decodable)

But RisingWave serves as a relational database for streaming jobs and analysis. How do users run a streaming job without a clear definition of what the job does? If the user is unsure about a column's content, keeping it as bytes may keep the best compatibility, which we already supported.

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that DynamoDB doesn't have a strict schema, the user is required to specify at least one primary key that corresponds to Dynamo's partition key

Don't fully get this. What's the relationship between "strict schema" and "require pk"?

Comment on lines +184 to +199
pub const DYNAMODB: JsonParseOptions = JsonParseOptions {
bytea_handling: ByteaHandling::Standard,
time_handling: TimeHandling::Micro,
timestamptz_handling: TimestamptzHandling::GuessNumberUnit, // backward-compatible
json_value_handling: JsonValueHandling::AsValue,
numeric_handling: NumericHandling::Relax {
string_parsing: true,
},
boolean_handling: BooleanHandling::Relax {
string_parsing: true,
string_integer_parsing: true,
},
varchar_handling: VarcharHandling::Strict,
struct_handling: StructHandling::Strict,
ignoring_keycase: true,
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you verify the configs are correct? (At least the "backward-compatible" comment is wrong 😂)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add some comments to demonstrate each choice.

Copy link
Contributor Author

@KeXiangWang KeXiangWang Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly no specific reasons, as we don't really need to parse the data and we cannot predict the real formats of the data: in dynamodb and kinesis, they are just strings. The only thing I am trying to ensure is that all these handling modes accept strings as input.

Copy link
Member

@xxchan xxchan Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't understand what these configs do, and why do we need a specific config (like found an example data where the default doesn't work), perhaps you should just use DEFAULT. @xiangjinwu how do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just took a quick attempt. DEFAULT work as well.

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hard work on the urgent feature! I really hope we could have more time to discuss and review though...

Problems commented inline, others are not too harmful to me.

@neverchanje
Copy link
Contributor

neverchanje commented Jul 11, 2024

Given that DynamoDB doesn't have a strict schema, the user is required to specify at least one primary key that corresponds to Dynamo's partition key

Don't fully get this. What's the relationship between "strict schema" and "require pk"?

Sorry if I didn't express it clearly. I've updated the explanation.
I was trying to clarify two points:

  • For schemaless databases like MongoDB, RisingWave typically maps records to jsonb. We can follow the same approach for DynamoDB.
  • DynamoDB uses two keys: the partition key and the sort key. Both should be defined in RisingWave.

@neverchanje
Copy link
Contributor

neverchanje commented Jul 11, 2024

I really hope we could have more time to discuss and review though...

BTW we still have time to discuss the syntax. No worries. cc @xxchan

Comment on lines +68 to +86
// "OldImage":{
// "order_status":{
// "N":"3"
// },
// "order_date":{
// "N":"1720037677"
// },
// "order_id":{
// "N":"3"
// },
// "price":{
// "N":"63.06"
// },
// "product_id":{
// "N":"2060"
// },
// "customer_name":{
// "S":"Bob Lee"
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that they just keep the fields in Keys instead of the whole row?

Copy link
Contributor Author

@KeXiangWang KeXiangWang Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't seem to provide this option.

@tabVersion
Copy link
Contributor

For schemaless databases like MongoDB, RisingWave typically maps records to jsonb. We can follow the same approach for DynamoDB.

Things are different here. MongoDB stores JSON internally (actually BSON but almost the same), which means RisingWave is not responsible for the data precision.
But clearly, DynamoDB has a wider data range and does not claim it is compatible with JSON. We may need another way.

@xiangjinwu
Copy link
Contributor

  • Offline discussion found another issue of this orders + orders_kinesis via sink into table approach. A later delete from kinesis would not be able to find the historical row from s3. We will have a discussion about it.

^ This is the more important issue

(Less important but continuing the previous discussion:)

  • Regarding data types, I have checked DynamoDB docs and agree to keep the S/N annotations as-is, and require users to cast later in SQL. Specifically:
    • N is encoded as a JSON string. There would be no precision issue for large numbers during ingestion. In the case of 28 to 38 digit N values, the cast would raise a normal overflow error.
    • B is also encoded as a JSON string using base64. User can just decode("Item" ->> 'foo', 'base64') to get a bytea.
    • S / BOOL corresponds to JSON string and boolean trivially.
    • NULL: see example directly; non trivial but easy to handle
    • L/M corresponds to JSON array and object trivially. Specially, array elements and object values can be of heterogeneous types. Object keys are always JSON string.
    • NS/BS/SS: encoded as JSON array. It is the producer's responsibly to handle ordering and uniqueness.

Example:

{
  "large_number": {"N": "1152921504606846976"},
  "bytes": {"B": "3q2+7w=="},
  "str": {"S": "hello"},
  "ok": {"BOOL": true},
  "unknown": {"NULL": true},
  "list": {"L": [{"N": "12.5"}, {"S": "hello"}, {"B": "wN4="}]},
  "map": {"M": {"foo":{"N": "-3"}, "bar": {"S": "dummy"}}},
  "num_set": {"NS": ["7.4", "8.9"]}
}

Note that S/N/B is necessary to differentiate the values when they are in a heterogeneous list:

  • {"L": [{"N": "12.5"}, {"S": "hello"}, {"B": "wN4="}]} -> [12.5, "hello", "wN4="] JSON precision loss > 2^53-1
  • {"L": [{"N": "12.5"}, {"S": "hello"}, {"B": "wN4="}]} -> ["12.5", "hello", "wN4="] indistinguishable from below
  • {"L": [{"S": "12.5"}, {"S": "hello"}, {"B": "wN4="}]} -> ["12.5", "hello", "wN4="]

@himanshpal
Copy link

himanshpal commented Jul 16, 2024

Do we plan to ad support for dynamodb streams as well, since it does-not require an extra kinesis setup ?

@KeXiangWang
Copy link
Contributor Author

Do we plan to ad support for dynamodb streams as well, since it does-not require an extra kinesis setup ?

@himanshpal Supporting dynamodb streams requires an extra source implementation, while, for kinesis stream, we can reuse the existing kinesis source. We are recently discussing internally for dynamodb, the final solution will be post later.

Copy link
Contributor

This PR has been open for 60 days with no activity.

If it's blocked by code review, feel free to ping a reviewer or ask someone else to review it.

If you think it is still relevant today, and have time to work on it in the near future, you can comment to update the status, or just manually remove the no-pr-activity label.

You can also confidently close this PR to keep our backlog clean. (If no further action taken, the PR will be automatically closed after 7 days. Sorry! 🙏)
Don't worry if you think the PR is still valuable to continue in the future.
It's searchable and can be reopened when it's time. 😄

Copy link
Contributor

Close this PR as there's no further actions taken after it is marked as stale for 7 days. Sorry! 🙏

You can reopen it when you have time to continue working on it.

@github-actions github-actions bot closed this Sep 22, 2024
@KeXiangWang
Copy link
Contributor Author

KeXiangWang commented Sep 22, 2024

We basically have two solutions for this type of requirement, i.e., ingest both bounded historical data and unbounded stream data in one source:

  1. Design a new source paradigm which can ingest from two types of data sources.
  2. Use pause when create and sink into to implement.

If we decide to use the second solution, the above PR can help resolve the dynamodb feature request. However, we still cannot make an agreement on the final solution. So we suspend this PR for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants