Skip to content

Commit

Permalink
test(connector): add PG CDC data compatibility test (#13176)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyliu0 authored Nov 2, 2023
1 parent 6bd797e commit fdfbf53
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 4 deletions.
37 changes: 37 additions & 0 deletions integration_tests/postgres-cdc/compatibility-pg.sql

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions integration_tests/postgres-cdc/compatibility-rw.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
DROP TABLE IF EXISTS postgres_all_types;
CREATE TABLE IF NOT EXISTS postgres_all_types(
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'myuser',
password = '123456',
database.name = 'mydb',
schema.name = 'public',
table.name = 'postgres_all_types',
slot.name = 'postgres_all_types'
);
2 changes: 1 addition & 1 deletion integration_tests/postgres-cdc/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
person,city_population,nexmark_q8
person,city_population,nexmark_q8,postgres_all_types
11 changes: 8 additions & 3 deletions integration_tests/postgres-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,25 @@ services:
command:
- /bin/sh
- -c
- "psql postgresql://myuser:123456@postgres:5432/mydb < postgres_prepare.sql"
- "psql postgresql://myuser:123456@postgres:5432/mydb < postgres_prepare.sql &&
psql postgresql://myuser:123456@postgres:5432/mydb < compatibility-pg.sql &&
sleep 5 &&
psql postgresql://root:@frontend-node-0:4566/dev < compatibility-rw.sql"
volumes:
- "./postgres_prepare.sql:/postgres_prepare.sql"
- "./compatibility-pg.sql:/compatibility-pg.sql"
- "./compatibility-rw.sql:/compatibility-rw.sql"
container_name: postgres_prepare
restart: on-failure
datagen_tpch:
image: ghcr.io/risingwavelabs/go-tpc:v0.1
depends_on: [postgres]
depends_on: [ postgres ]
command: tpch prepare --sf 1 --threads 4 -d postgres -U myuser -p '123456' -H postgres -D mydb -P 5432 --conn-params sslmode=disable
container_name: datagen_tpch
restart: on-failure
datagen_kafka:
build: ../datagen
depends_on: [message_queue]
depends_on: [ message_queue ]
command:
- /bin/sh
- -c
Expand Down
86 changes: 86 additions & 0 deletions integration_tests/postgres-cdc/postgresql-datatypes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
pk_types:
- boolean
- bigint
- date
datatypes:
- name: boolean
aliases:
- bool
zero: false
minimum: false
maximum: true
rw_type: boolean
- name: smallint
zero: 0
minimum: -32767
maximum: 32767
rw_type:
- name: integer
aliases:
- int
zero: 0
minimum: -2147483647
maximum: 2147483647
- name: bigint
zero: 0
minimum: -9223372036854775807
maximum: 9223372036854775807
- name: decimal
aliases:
- numeric
zero: 0
minimum: -9.9999999999999999999999999999999
maximum: -9.9999999999999999999999999999999
- name: real
zero: 0
minimum: -9999.999999
maximum: 9999.999999
- name: double precision
zero: 0
minimum: -9999.99999999999999
maximum: 9999.99999999999999
- name: varchar
aliases:
- character varying
- string
zero: "''"
minimum: "''"
maximum_gen_py: "\"'{}'\".format('z'*65535)"
- name: bytea
zero: "'\\x00'"
minimum: "'\\x00'"
maximum_gen_py: "\"'{}'\".format('\\\\x'+'f'*65534)"
- name: date
zero: "'0001-01-01'"
minimum: "'0001-01-01'"
maximum: "'9999-12-31'"
- name: time
aliases:
- time without time zone
zero: "'00:00:00'"
minimum: "'00:00:00'"
maximum: "'23:59:59'"
- name: timestamp
aliases:
- timestamp without time zone
zero: "'0001-01-01 00:00:00'::timestamp"
minimum: "'0001-01-01 00:00:00'::timestamp"
maximum: "'9999-12-31 23:59:59'::timestamp"
- name: timestamptz
aliases:
- timestamp with time zone
zero: "'0001-01-01 00:00:00'::timestamptz"
minimum: "'0001-01-01 00:00:00'::timestamptz"
maximum: "'9999-12-31 23:59:59'::timestamptz"
- name: interval
zero: "interval '0 second'"
minimum: "interval '0 second'"
maximum: "interval '9990 year'"
- name: jsonb
zero: "'{}'"
minimum: "'{}'"
maximum: "'{\"whatever\":\"meaningless\"}'"




82 changes: 82 additions & 0 deletions integration_tests/scripts/compatibility/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import click
from compatibility import *


@click.group()
def cli():
pass


@click.command()
@click.option("--datatype-file", default="./compatibility/risingwave-datatypes.yml", help="data type file")
@click.option("--database-type", default="postgres", help="database type")
def gen_select_sql(datatype_file: str, database_type: str):
database_type = database_type.lower()
with open(datatype_file) as f:
datatypes_map = yaml.safe_load(f)
datatype_list = []
for data_type in datatypes_map["datatypes"]:
new_datatype = DataType(**data_type)
if database_type == "mysql":
new_datatype = MysqlDataType(**data_type)
datatype_list.append(new_datatype)
print(new_datatype.select_zero_sql())
print(new_datatype.select_min_sql())
print(new_datatype.select_max_sql())
if data_type in ["postgres", "risingwave"]:
print(new_datatype.select_array_zero_sql())
print(new_datatype.select_array_min_sql())
print(new_datatype.select_array_max_sql())


@click.command()
@click.option("--datatype-file", default="./compatibility/risingwave-datatypes.yml", help="data type file")
@click.option("--database-type", default="postgres", help="database type")
def gen_ddl_dml(datatype_file: str, database_type: str):
database_type = database_type.lower()
with open(datatype_file) as f:
datatypes_map = yaml.safe_load(f)
datatype_list = []
for data_type in datatypes_map["datatypes"]:
new_datatype = DataType(**data_type)
if database_type == "mysql":
new_datatype = MysqlDataType(**data_type)
datatype_list.append(new_datatype)
table_sql_generator = TableSqlGenerator(
name='{}_all_types'.format(database_type),
enable_array=False,
enable_struct=False,
pk_types=datatypes_map.get("pk_types", []),
datatypes=datatype_list
)
if database_type == "mysql":
pass
elif database_type == "postgres":
table_sql_generator = PostgresTableSqlGenerator(
name='{}_all_types'.format(database_type),
enable_array=True,
enable_struct=False,
pk_types=datatypes_map.get("pk_types", []),
datatypes=datatype_list
)
elif database_type == "risingwave":
table_sql_generator = RisingwaveTableSqlGenerator(
name='{}_all_types'.format(database_type),
enable_array=True,
enable_struct=True,
pk_types=datatypes_map.get("pk_types", []),
datatypes=datatype_list
)

print(table_sql_generator.drop_table_sql())
print(table_sql_generator.create_table_sql())
print(table_sql_generator.insert_zero_sql())
print(table_sql_generator.insert_min_sql())
print(table_sql_generator.insert_max_sql())


cli.add_command(gen_select_sql)
cli.add_command(gen_ddl_dml)

if __name__ == '__main__':
cli()
Loading

0 comments on commit fdfbf53

Please sign in to comment.