Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(connector): add PG CDC data compatibility test #13176

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions integration_tests/postgres-cdc/compatibility-pg.sql

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions integration_tests/postgres-cdc/compatibility-rw.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
DROP TABLE IF EXISTS postgres_all_types;
CREATE TABLE IF NOT EXISTS postgres_all_types(
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
PRIMARY KEY (c_boolean,c_bigint,c_date)
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'myuser',
password = '123456',
database.name = 'mydb',
schema.name = 'public',
table.name = 'postgres_all_types',
slot.name = 'postgres_all_types'
);
2 changes: 1 addition & 1 deletion integration_tests/postgres-cdc/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
person,city_population,nexmark_q8
person,city_population,nexmark_q8,postgres_all_types
11 changes: 8 additions & 3 deletions integration_tests/postgres-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,25 @@ services:
command:
- /bin/sh
- -c
- "psql postgresql://myuser:123456@postgres:5432/mydb < postgres_prepare.sql"
- "psql postgresql://myuser:123456@postgres:5432/mydb < postgres_prepare.sql &&
psql postgresql://myuser:123456@postgres:5432/mydb < compatibility-pg.sql &&
sleep 5 &&
psql postgresql://root:@frontend-node-0:4566/dev < compatibility-rw.sql"
volumes:
- "./postgres_prepare.sql:/postgres_prepare.sql"
- "./compatibility-pg.sql:/compatibility-pg.sql"
- "./compatibility-rw.sql:/compatibility-rw.sql"
container_name: postgres_prepare
restart: on-failure
datagen_tpch:
image: ghcr.io/risingwavelabs/go-tpc:v0.1
depends_on: [postgres]
depends_on: [ postgres ]
command: tpch prepare --sf 1 --threads 4 -d postgres -U myuser -p '123456' -H postgres -D mydb -P 5432 --conn-params sslmode=disable
container_name: datagen_tpch
restart: on-failure
datagen_kafka:
build: ../datagen
depends_on: [message_queue]
depends_on: [ message_queue ]
command:
- /bin/sh
- -c
Expand Down
86 changes: 86 additions & 0 deletions integration_tests/postgres-cdc/postgresql-datatypes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
pk_types:
- boolean
- bigint
- date
datatypes:
- name: boolean
aliases:
- bool
zero: false
minimum: false
maximum: true
rw_type: boolean
- name: smallint
zero: 0
minimum: -32767
maximum: 32767
rw_type:
- name: integer
aliases:
- int
zero: 0
minimum: -2147483647
maximum: 2147483647
- name: bigint
zero: 0
minimum: -9223372036854775807
maximum: 9223372036854775807
- name: decimal
aliases:
- numeric
zero: 0
minimum: -9.9999999999999999999999999999999
maximum: -9.9999999999999999999999999999999
- name: real
zero: 0
minimum: -9999.999999
maximum: 9999.999999
- name: double precision
zero: 0
minimum: -9999.99999999999999
maximum: 9999.99999999999999
- name: varchar
aliases:
- character varying
- string
zero: "''"
minimum: "''"
maximum_gen_py: "\"'{}'\".format('z'*65535)"
- name: bytea
zero: "'\\x00'"
minimum: "'\\x00'"
maximum_gen_py: "\"'{}'\".format('\\\\x'+'f'*65534)"
- name: date
zero: "'0001-01-01'"
minimum: "'0001-01-01'"
maximum: "'9999-12-31'"
- name: time
aliases:
- time without time zone
zero: "'00:00:00'"
minimum: "'00:00:00'"
maximum: "'23:59:59'"
- name: timestamp
aliases:
- timestamp without time zone
zero: "'0001-01-01 00:00:00'::timestamp"
minimum: "'0001-01-01 00:00:00'::timestamp"
maximum: "'9999-12-31 23:59:59'::timestamp"
- name: timestamptz
aliases:
- timestamp with time zone
zero: "'0001-01-01 00:00:00'::timestamptz"
minimum: "'0001-01-01 00:00:00'::timestamptz"
maximum: "'9999-12-31 23:59:59'::timestamptz"
- name: interval
zero: "interval '0 second'"
minimum: "interval '0 second'"
maximum: "interval '9990 year'"
- name: jsonb
zero: "'{}'"
minimum: "'{}'"
maximum: "'{\"whatever\":\"meaningless\"}'"




82 changes: 82 additions & 0 deletions integration_tests/scripts/compatibility/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import click
from compatibility import *


@click.group()
def cli():
pass


@click.command()
@click.option("--datatype-file", default="./compatibility/risingwave-datatypes.yml", help="data type file")
@click.option("--database-type", default="postgres", help="database type")
def gen_select_sql(datatype_file: str, database_type: str):
database_type = database_type.lower()
with open(datatype_file) as f:
datatypes_map = yaml.safe_load(f)
datatype_list = []
for data_type in datatypes_map["datatypes"]:
new_datatype = DataType(**data_type)
if database_type == "mysql":
new_datatype = MysqlDataType(**data_type)
datatype_list.append(new_datatype)
print(new_datatype.select_zero_sql())
print(new_datatype.select_min_sql())
print(new_datatype.select_max_sql())
if data_type in ["postgres", "risingwave"]:
print(new_datatype.select_array_zero_sql())
print(new_datatype.select_array_min_sql())
print(new_datatype.select_array_max_sql())


@click.command()
@click.option("--datatype-file", default="./compatibility/risingwave-datatypes.yml", help="data type file")
@click.option("--database-type", default="postgres", help="database type")
def gen_ddl_dml(datatype_file: str, database_type: str):
database_type = database_type.lower()
with open(datatype_file) as f:
datatypes_map = yaml.safe_load(f)
datatype_list = []
for data_type in datatypes_map["datatypes"]:
new_datatype = DataType(**data_type)
if database_type == "mysql":
new_datatype = MysqlDataType(**data_type)
datatype_list.append(new_datatype)
table_sql_generator = TableSqlGenerator(
name='{}_all_types'.format(database_type),
enable_array=False,
enable_struct=False,
pk_types=datatypes_map.get("pk_types", []),
datatypes=datatype_list
)
if database_type == "mysql":
pass
elif database_type == "postgres":
table_sql_generator = PostgresTableSqlGenerator(
name='{}_all_types'.format(database_type),
enable_array=True,
enable_struct=False,
pk_types=datatypes_map.get("pk_types", []),
datatypes=datatype_list
)
elif database_type == "risingwave":
table_sql_generator = RisingwaveTableSqlGenerator(
name='{}_all_types'.format(database_type),
enable_array=True,
enable_struct=True,
pk_types=datatypes_map.get("pk_types", []),
datatypes=datatype_list
)

print(table_sql_generator.drop_table_sql())
print(table_sql_generator.create_table_sql())
print(table_sql_generator.insert_zero_sql())
print(table_sql_generator.insert_min_sql())
print(table_sql_generator.insert_max_sql())


cli.add_command(gen_select_sql)
cli.add_command(gen_ddl_dml)

if __name__ == '__main__':
cli()
Loading
Loading