Skip to content

Commit

Permalink
feat(cdc): support INCLUDE TIMESTAMP for MySQL, PG and MongoDB cdc ta…
Browse files Browse the repository at this point in the history
…ble (#16833)
  • Loading branch information
StrikeW authored May 27, 2024
1 parent a0d7323 commit 1401d56
Show file tree
Hide file tree
Showing 28 changed files with 487 additions and 171 deletions.
3 changes: 2 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-inline-source-test
risedev slt './e2e_test/source_inline/**/*.slt'
echo "--- Kill cluster"
Expand All @@ -55,7 +56,7 @@ createdb
psql < ./e2e_test/source/cdc/postgres_cdc.sql

echo "--- starting risingwave cluster"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe-with-recovery

echo "--- mongodb cdc test"
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ select cnt from shipments_cnt;
4

query ITTTT
select * from person_new order by id;
SELECT id,name,email_address,credit_card,city from person_new order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
Expand Down Expand Up @@ -68,7 +68,7 @@ SELECT * from orders_test_cnt
5

query ITT
SELECT * FROM rw.products_test order by id limit 3
SELECT id,name,description FROM rw.products_test order by id limit 3
----
101 RW Small 2-wheel scooter
102 RW 12V car battery
Expand Down
37 changes: 33 additions & 4 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ create table rw.products_test ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) from mysql_mytest table 'mytest.products';
) include timestamp as commit_ts from mysql_mytest table 'mytest.products';

system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food');
Expand Down Expand Up @@ -145,12 +145,25 @@ SELECT * from orders_test_cnt
4

query ITT
SELECT * FROM rw.products_test order by id limit 3
SELECT id,name,description FROM rw.products_test order by id limit 3
----
101 scooter Small 2-wheel scooter
102 car battery 12V car battery
103 12-pack drill bits 12-pack of drill bits with sizes ranging from #40 to #3

# commit_ts of historical records should be '1970-01-01 00:00:00+00:00'
query I
SELECT count(*) as cnt from rw.products_test where commit_ts = '1970-01-01 00:00:00+00:00'
----
9

# commit_ts of new records should greater than '1970-01-01 00:00:00+00:00'
query TTT
SELECT name,description FROM rw.products_test where commit_ts > '1970-01-01 00:00:00+00:00' order by id
----
Milk Milk is a white liquid food
Juice 100ml Juice

query ITTT
SELECT order_id,order_date,customer_name,product_id FROM orders_test order by order_id limit 3
----
Expand Down Expand Up @@ -230,7 +243,7 @@ CREATE TABLE person_new (
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.person';
) INCLUDE TIMESTAMP AS commit_ts FROM pg_source TABLE 'public.person';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;
Expand Down Expand Up @@ -260,7 +273,7 @@ SELECT * from person_new_cnt
6

query ITTTT
SELECT * from person_new order by id;
SELECT id,name,email_address,credit_card,city from person_new order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
Expand All @@ -269,6 +282,22 @@ SELECT * from person_new order by id;
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles

# historical data
query ITTTT
SELECT id,name,email_address,credit_card,city from person_new where commit_ts = '1970-01-01 00:00:00+00:00' order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles

# incremental data
query ITTTT
SELECT id,name,email_address,credit_card,city from person_new where commit_ts > '1970-01-01 00:00:00+00:00' order by id;
----
1100 noris [email protected] 1864 2539 enne
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles

statement ok
CREATE TABLE numeric_to_rw_int256_shared (
id int,
Expand Down
8 changes: 7 additions & 1 deletion e2e_test/source/cdc/mongodb/mongodb_basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
control substitution on

statement ok
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) INCLUDE TIMESTAMP as commit_ts WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'random_data.*'
Expand All @@ -24,5 +24,11 @@ select count(*) from normalized_users;
----
55

# historical data
query I
select count(*) from users where commit_ts = '1970-01-01 00:00:00+00:00';
----
55

statement ok
DROP TABLE users cascade
7 changes: 7 additions & 0 deletions e2e_test/source/cdc_inline/postgres_create_drop.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ create table tt1 (v1 int,
slot.name = 'tt1_slot',
);

sleep 3s

query IT
SELECT * FROM tt1;
----
1 2023-10-23 10:00:00+00:00

statement ok
drop table tt1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import com.risingwave.proto.Data.DataType.TypeName;
import com.risingwave.proto.PlanCommon;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableSchema {

static final Logger LOG = LoggerFactory.getLogger(TableSchema.class);

private final List<String> columnNames;
private final Map<String, TypeName> columns;
private final Map<String, Integer> columnIndices;
Expand Down Expand Up @@ -80,16 +86,20 @@ public Object getFromRow(String columnName, SinkRow row) {
}

public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchema) {
return new TableSchema(
tableSchema.getColumnsList().stream()
.map(PlanCommon.ColumnDesc::getName)
.collect(Collectors.toList()),
tableSchema.getColumnsList().stream()
.map(PlanCommon.ColumnDesc::getColumnType)
.collect(Collectors.toList()),
tableSchema.getPkIndicesList().stream()
.map(i -> tableSchema.getColumns(i).getName())
.collect(Collectors.toList()));
// filter out additional columns
var instance =
new TableSchema(
tableSchema.getColumnsList().stream()
.map(PlanCommon.ColumnDesc::getName)
.collect(Collectors.toList()),
tableSchema.getColumnsList().stream()
.map(PlanCommon.ColumnDesc::getColumnType)
.collect(Collectors.toList()),
tableSchema.getPkIndicesList().stream()
.map(i -> tableSchema.getColumns(i).getName())
.collect(Collectors.toList()));
LOG.info("table column names: {}", Arrays.toString(instance.getColumnNames()));
return instance;
}

public List<String> getPrimaryKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ private static boolean isStreamingRunning(String connector, String server, Strin
mbeanServer.getAttribute(
getStreamingMetricsObjectName(connector, server, contextName),
"Connected");
} catch (JMException ex) {
LOG.warn("Failed to get streaming metrics", ex);
} catch (JMException _ex) {
// ignore the exception, as it is expected when the streaming source
// (aka. binlog client) is not ready
}
return false;
}
Expand Down
2 changes: 0 additions & 2 deletions src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ pub struct CdcTableDesc {
/// Column indices for primary keys.
pub stream_key: Vec<usize>,

pub value_indices: Vec<usize>,

/// properties will be passed into the `StreamScanNode`
pub connect_properties: BTreeMap<String, String>,
}
Expand Down
7 changes: 6 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ async fn test_cdc_backfill() -> StreamResult<()> {
table_schema.clone(),
table_pk_order_types,
table_pk_indices.clone(),
vec![0, 1],
);

let actor_id = 0x1a;
Expand Down Expand Up @@ -214,6 +213,11 @@ async fn test_cdc_backfill() -> StreamResult<()> {
)
.await;

let output_columns = vec![
ColumnDesc::named("id", ColumnId::new(1), DataType::Int64), // primary key
ColumnDesc::named("price", ColumnId::new(2), DataType::Float64),
];

let cdc_backfill = StreamExecutor::new(
ExecutorInfo {
schema: table_schema.clone(),
Expand All @@ -225,6 +229,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
external_table,
mock_offset_executor,
vec![0, 1],
output_columns,
None,
Arc::new(StreamingMetrics::unused()),
state_table,
Expand Down
27 changes: 25 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_pb::plan_common::{
};

use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
Expand Down Expand Up @@ -55,9 +56,29 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
HashSet::from(["timestamp", "partition", "offset"]),
),
])
});

// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
LazyLock::new(|| Some(HashSet::from(["timestamp"])));

pub fn get_supported_additional_columns(
connector_name: &str,
is_cdc_backfill: bool,
) -> Option<&HashSet<&'static str>> {
if is_cdc_backfill {
CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
} else {
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
}
}

pub fn gen_default_addition_col_name(
connector_name: &str,
additional_col_type: &str,
Expand Down Expand Up @@ -87,9 +108,10 @@ pub fn build_additional_column_catalog(
inner_field_name: Option<&str>,
data_type: Option<&str>,
reject_unknown_connector: bool,
is_cdc_backfill_table: bool,
) -> ConnectorResult<ColumnCatalog> {
let compatible_columns = match (
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name),
get_supported_additional_columns(connector_name, is_cdc_backfill_table),
reject_unknown_connector,
) {
(Some(compat_cols), _) => compat_cols,
Expand Down Expand Up @@ -190,7 +212,7 @@ pub fn build_additional_column_catalog(
/// ## Returns
/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
pub fn add_partition_offset_cols(
pub fn source_add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
Expand Down Expand Up @@ -219,6 +241,7 @@ pub fn add_partition_offset_cols(
None,
None,
false,
false,
)
.unwrap(),
)
Expand Down
68 changes: 68 additions & 0 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ mod tests {
use std::sync::Arc;

use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::row::Row;
use risingwave_common::types::Timestamptz;
use risingwave_pb::plan_common::{
additional_column, AdditionalColumn, AdditionalColumnTimestamp,
};

use super::*;
use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
Expand Down Expand Up @@ -266,4 +271,67 @@ mod tests {
_ => panic!("unexpected parse result: {:?}", res),
}
}

#[tokio::test]
async fn test_parse_additional_columns() {
let columns = vec![
ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
ColumnDesc::named_with_additional_column(
"commit_ts",
ColumnId::new(6),
DataType::Timestamptz,
AdditionalColumn {
column_type: Some(additional_column::ColumnType::Timestamp(
AdditionalColumnTimestamp {},
)),
},
),
];

let columns = columns
.iter()
.map(SourceColumnDesc::from)
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..SourceContext::dummy()
};
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;

let res = parser
.parse_one_with_txn(
None,
Some(payload.as_bytes().to_vec()),
builder.row_writer(),
)
.await;
match res {
Ok(ParseResult::Rows) => {
let chunk = builder.finish();
for (_, row) in chunk.rows() {
let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
}
}
_ => panic!("unexpected parse result: {:?}", res),
}
}
}
Loading

0 comments on commit 1401d56

Please sign in to comment.