Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(connector): add kafka-cdc-sink compatible test #13607

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion integration_tests/kafka-cdc-sink/Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies = ["clean-all"]
[tasks.start-cluster]
script = """
docker compose up -d;
sleep 20
sleep 10
"""
dependencies = ["build-image"]

Expand Down Expand Up @@ -75,3 +75,13 @@ dependencies = [
"check-mysql",
"check-flink-pg"
]

[tasks.check-com-pg]
script = '''
docker exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from types"'
'''

[tasks.check-flink-com-pg]
script = '''
docker exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from flink_types"'
'''
27 changes: 27 additions & 0 deletions integration_tests/kafka-cdc-sink/compatibility-rw-flink.sql

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions integration_tests/kafka-cdc-sink/compatibility-rw.sql

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions integration_tests/kafka-cdc-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ WITH (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'counts',
type = 'debezium',
primary_key = 'id'
);
) FORMAT DEBEZIUM ENCODE JSON;
3 changes: 2 additions & 1 deletion integration_tests/kafka-cdc-sink/flink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ FROM flink:1.17.0-scala_2.12
COPY flink-sql-connector-kafka-1.17.0.jar /opt/flink/lib
COPY flink-connector-jdbc-3.0.0-1.16.jar /opt/flink/lib
COPY postgresql-42.6.0.jar /opt/flink/lib
COPY flink.sql /tmp
COPY flink.sql /tmp
COPY compatibility-flink.sql /tmp
41 changes: 41 additions & 0 deletions integration_tests/kafka-cdc-sink/flink/compatibility-flink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CREATE TABLE topic_types (
id INT,
c_boolean BOOLEAN,
c_smallint SMALLINT,
c_integer INT,
c_bigint BIGINT,
c_decimal decimal,
c_real FLOAT,
c_double_precision DOUBLE,
c_varchar VARCHAR,
c_bytea BYTES
) WITH (
'connector' = 'kafka',
'topic' = 'flinktypes',
'properties.bootstrap.servers' = 'message_queue:29092',
'properties.group.id' = 'test-flink-client-1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
);

CREATE TABLE pg_types (
id INT,
c_boolean BOOLEAN,
c_smallint SMALLINT,
c_integer INT,
c_bigint BIGINT,
c_decimal decimal,
c_real FLOAT,
c_double_precision DOUBLE,
c_varchar VARCHAR,
c_bytea BYTES,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456',
'table-name' = 'flink_types'
);

INSERT INTO pg_types
SELECT * FROM topic_types;
2 changes: 1 addition & 1 deletion integration_tests/kafka-cdc-sink/jsons/pg-sink.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "counts",
"topics": "counts, types",
"connection.url": "jdbc:postgresql://postgres:5432/mydb",
"connection.username": "myuser",
"connection.password": "123456",
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kafka-cdc-sink/pg_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
counts,flinkcounts
counts,flinkcounts,types,flink_types
34 changes: 34 additions & 0 deletions integration_tests/kafka-cdc-sink/postgres_prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,37 @@ CREATE TABLE flinkcounts (
sum bigint,
primary key (id)
);

CREATE TABLE types (
id int,
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal text,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval text,
c_jsonb jsonb,
primary key (id)
);

CREATE TABLE flink_types (
id int,
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
primary key (id)
);
12 changes: 12 additions & 0 deletions integration_tests/kafka-cdc-sink/prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,21 @@ set -euo pipefail
docker compose exec message_queue \
/usr/bin/rpk topic create -p 2 counts

docker compose exec message_queue \
/usr/bin/rpk topic create -p 1 types

docker compose exec message_queue \
/usr/bin/rpk topic create -p 1 flinktypes

# setup flink
docker compose run flink-sql-client \
/opt/flink/bin/sql-client.sh -f /tmp/flink.sql
docker compose run flink-sql-client \
/opt/flink/bin/sql-client.sh -f /tmp/compatibility-flink.sql

# setup connect
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jsons/pg-sink.json

# set rw for compatibility test
psql -h localhost -p 4566 -d dev -U root -f compatibility-rw.sql
psql -h localhost -p 4566 -d dev -U root -f compatibility-rw-flink.sql
Loading