Skip to content

Commit

Permalink
test(connector): add kafka-cdc-sink compatible test (#13607)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored Nov 23, 2023
1 parent 38a78d8 commit e2f7fdb
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 6 deletions.
12 changes: 11 additions & 1 deletion integration_tests/kafka-cdc-sink/Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies = ["clean-all"]
[tasks.start-cluster]
script = """
docker compose up -d;
sleep 20
sleep 10
"""
dependencies = ["build-image"]

Expand Down Expand Up @@ -75,3 +75,13 @@ dependencies = [
"check-mysql",
"check-flink-pg"
]

[tasks.check-com-pg]
script = '''
docker exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from types"'
'''

[tasks.check-flink-com-pg]
script = '''
docker exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from flink_types"'
'''
27 changes: 27 additions & 0 deletions integration_tests/kafka-cdc-sink/compatibility-rw-flink.sql

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions integration_tests/kafka-cdc-sink/compatibility-rw.sql

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions integration_tests/kafka-cdc-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ WITH (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'counts',
type = 'debezium',
primary_key = 'id'
);
) FORMAT DEBEZIUM ENCODE JSON;
3 changes: 2 additions & 1 deletion integration_tests/kafka-cdc-sink/flink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ FROM flink:1.17.0-scala_2.12
COPY flink-sql-connector-kafka-1.17.0.jar /opt/flink/lib
COPY flink-connector-jdbc-3.0.0-1.16.jar /opt/flink/lib
COPY postgresql-42.6.0.jar /opt/flink/lib
COPY flink.sql /tmp
COPY flink.sql /tmp
COPY compatibility-flink.sql /tmp
41 changes: 41 additions & 0 deletions integration_tests/kafka-cdc-sink/flink/compatibility-flink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CREATE TABLE topic_types (
id INT,
c_boolean BOOLEAN,
c_smallint SMALLINT,
c_integer INT,
c_bigint BIGINT,
c_decimal decimal,
c_real FLOAT,
c_double_precision DOUBLE,
c_varchar VARCHAR,
c_bytea BYTES
) WITH (
'connector' = 'kafka',
'topic' = 'flinktypes',
'properties.bootstrap.servers' = 'message_queue:29092',
'properties.group.id' = 'test-flink-client-1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
);

CREATE TABLE pg_types (
id INT,
c_boolean BOOLEAN,
c_smallint SMALLINT,
c_integer INT,
c_bigint BIGINT,
c_decimal decimal,
c_real FLOAT,
c_double_precision DOUBLE,
c_varchar VARCHAR,
c_bytea BYTES,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456',
'table-name' = 'flink_types'
);

INSERT INTO pg_types
SELECT * FROM topic_types;
2 changes: 1 addition & 1 deletion integration_tests/kafka-cdc-sink/jsons/pg-sink.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "counts",
"topics": "counts, types",
"connection.url": "jdbc:postgresql://postgres:5432/mydb",
"connection.username": "myuser",
"connection.password": "123456",
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kafka-cdc-sink/pg_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
counts,flinkcounts
counts,flinkcounts,types,flink_types
34 changes: 34 additions & 0 deletions integration_tests/kafka-cdc-sink/postgres_prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,37 @@ CREATE TABLE flinkcounts (
sum bigint,
primary key (id)
);

CREATE TABLE types (
id int,
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal text,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval text,
c_jsonb jsonb,
primary key (id)
);

CREATE TABLE flink_types (
id int,
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
primary key (id)
);
12 changes: 12 additions & 0 deletions integration_tests/kafka-cdc-sink/prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,21 @@ set -euo pipefail
docker compose exec message_queue \
/usr/bin/rpk topic create -p 2 counts

docker compose exec message_queue \
/usr/bin/rpk topic create -p 1 types

docker compose exec message_queue \
/usr/bin/rpk topic create -p 1 flinktypes

# setup flink
docker compose run flink-sql-client \
/opt/flink/bin/sql-client.sh -f /tmp/flink.sql
docker compose run flink-sql-client \
/opt/flink/bin/sql-client.sh -f /tmp/compatibility-flink.sql

# setup connect
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jsons/pg-sink.json

# set rw for compatibility test
psql -h localhost -p 4566 -d dev -U root -f compatibility-rw.sql
psql -h localhost -p 4566 -d dev -U root -f compatibility-rw-flink.sql

0 comments on commit e2f7fdb

Please sign in to comment.