Skip to content

Commit

Permalink
test(connector): add debezium-pg compatible test (#14862)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored Feb 1, 2024
1 parent 9749e4c commit 4c70f3d
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 58 deletions.
25 changes: 23 additions & 2 deletions integration_tests/debezium-postgres/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
CREATE TABLE orders (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'postgres.public.orders',
kafka.topic = 'pg.public.orders',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');

CREATE TABLE pg_all_data_types (PRIMARY KEY(id)) with (
connector = 'kafka',
kafka.topic = 'pg.public.pg_all_data_types',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');

CREATE TABLE pg_types2 (PRIMARY KEY(id)) with (
connector = 'kafka',
kafka.topic = 'pg.public.pg_types2',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');

CREATE TABLE pg_types3 (PRIMARY KEY(id)) with (
connector = 'kafka',
kafka.topic = 'pg.public.pg_types3',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081');
2 changes: 1 addition & 1 deletion integration_tests/debezium-postgres/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
orders,product_count
orders,product_count,pg_all_data_types,pg_types2,pg_types3
49 changes: 14 additions & 35 deletions integration_tests/debezium-postgres/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
service: message_queue

postgres:
image: debezium/postgres:13
image: debezium/postgres:16-alpine
ports:
- 5432:5432
environment:
Expand All @@ -46,9 +46,11 @@ services:
container_name: postgres
volumes:
- ./postgres/postgres_bootstrap.sql:/docker-entrypoint-initdb.d/postgres_bootstrap.sql
- "./postgres_prepare.sql:/postgres_prepare.sql"

debezium:
image: debezium/connect:1.9
image: debezium-connect
build: .
environment:
BOOTSTRAP_SERVERS: message_queue:29092
GROUP_ID: 1
Expand All @@ -69,40 +71,17 @@ services:
postgres: { condition: service_healthy }
container_name: debezium

debezium_deploy:
image: debezium/connect:1.9
depends_on:
debezium:
condition: service_healthy
volumes:
- ./postgres/postgres_dbz.sh:/postgres_dbz.sh
entrypoint: [ bash, -c, /postgres_dbz.sh ]
container_name: debezium_deploy
restart: on-failure

datagen:
image: postgres
depends_on:
postgres: { condition: service_healthy }
command:
- /bin/sh
- -c
- psql "postgresql://postgresuser:postgrespw@postgres:5432/mydb" -f ./postgres_prepare.sql
volumes:
- "./postgres_prepare.sql:/postgres_prepare.sql"
container_name: datagen
restart: on-failure

# Check out the connectors via 127.0.0.1:8000
kafka-connect-ui:
image: landoop/kafka-connect-ui
ports:
- 8000:8000
environment:
CONNECT_URL: http://debezium:8083
container_name: kafka-connect-ui
depends_on:
message_queue: { condition: service_healthy }
# kafka-connect-ui:
# image: landoop/kafka-connect-ui
# platform: linux/amd64
# ports:
# - 8000:8000
# environment:
# CONNECT_URL: http://debezium:8083
# container_name: kafka-connect-ui
# depends_on:
# message_queue: { condition: service_healthy }

volumes:
message_queue:
Expand Down
18 changes: 18 additions & 0 deletions integration_tests/debezium-postgres/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM quay.io/debezium/connect:2.5.0.Final

ENV AVRO_PLUGION_DIR=$KAFKA_CONNECT_PLUGINS_DIR/confluent-avro-plugin

ARG CONFLUENT_PLUGIN_VERSION=5.3.0

RUN mkdir $AVRO_PLUGION_DIR && cd $AVRO_PLUGION_DIR && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/$CONFLUENT_PLUGIN_VERSION/kafka-connect-avro-converter-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/$CONFLUENT_PLUGIN_VERSION/kafka-connect-avro-data-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/common-config/$CONFLUENT_PLUGIN_VERSION/common-config-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/common-utils/$CONFLUENT_PLUGIN_VERSION/common-utils-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/$CONFLUENT_PLUGIN_VERSION/kafka-schema-serializer-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/$CONFLUENT_PLUGIN_VERSION/kafka-schema-registry-client-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/$CONFLUENT_PLUGIN_VERSION/kafka-avro-serializer-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://repo1.maven.org/maven2/com/google/guava/guava/31.0.1-jre/guava-31.0.1-jre.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar && \
curl -sO https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar && \
curl -sO https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar
19 changes: 0 additions & 19 deletions integration_tests/debezium-postgres/postgres/postgres_dbz.sh

This file was deleted.

81 changes: 80 additions & 1 deletion integration_tests/debezium-postgres/postgres_prepare.sql

Large diffs are not rendered by default.

83 changes: 83 additions & 0 deletions integration_tests/debezium-postgres/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/bin/bash

set -euo pipefail

# setup postgres
docker compose exec postgres bash -c "psql postgresql://postgresuser:postgrespw@postgres:5432/mydb < postgres_prepare.sql"

echo "Deploying Debezium Postgres connector"

# default handling mode
curl -S -X PUT -H "Content-Type: application/json" http://localhost:8083/connectors/pg-default/config \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "pg",
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "mydb",
"database.server.name": "postgres",
"database.schema": "public",
"database.history.kafka.bootstrap.servers": "message_queue:29092",
"database.history.kafka.topic": "postgres-history",
"time.precision.mode": "adaptive",
"decimal.handling.mode": "precise",
"interval.handling.mode": "numeric",
"include.schema.changes": false,
"slot.name": "debezium_1",
"table.include.list": "public.orders,public.pg_all_data_types"
}'

echo "Deploying Debezium Postgres connector"
# time: adaptive_time_microseconds
# interval: string
# decimal: double
curl -S -X PUT -H "Content-Type: application/json" http://localhost:8083/connectors/pg-double-microseconds/config \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "pg",
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "mydb",
"database.server.name": "postgres",
"database.schema": "public",
"database.history.kafka.bootstrap.servers": "message_queue:29092",
"database.history.kafka.topic": "postgres-history",
"time.precision.mode": "adaptive_time_microseconds",
"interval.handling.mode": "string",
"decimal.handling.mode": "double",
"include.schema.changes": false,
"slot.name": "debezium_2",
"table.include.list": "public.pg_types2"
}'

echo "Deploying Debezium Postgres connector"
# time: connect
# decimal: string
curl -S -X PUT -H "Content-Type: application/json" http://localhost:8083/connectors/pg-connect-string/config \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "pg",
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "mydb",
"database.server.name": "postgres",
"database.schema": "public",
"database.history.kafka.bootstrap.servers": "message_queue:29092",
"database.history.kafka.topic": "postgres-history",
"time.precision.mode": "connect",
"decimal.handling.mode": "string",
"include.schema.changes": false,
"slot.name": "debezium_3",
"table.include.list": "public.pg_types3"
}'

echo 'sleep two minutes wait for debezium create all topics.'
sleep 120

echo 'Done'

0 comments on commit 4c70f3d

Please sign in to comment.