From 6fb4a1107903ab6c45c9e2254e44ed427f937657 Mon Sep 17 00:00:00 2001 From: xfz Date: Wed, 17 Apr 2024 15:59:08 +0800 Subject: [PATCH 1/3] test: add kafka-cdc compatible test --- ci/scripts/gen-integration-test-yaml.py | 81 +++---- ci/scripts/notify.py | 1 + integration_tests/datagen/Dockerfile | 2 + .../compatible_data/compatible_data.go | 199 ++++++++++++++++++ integration_tests/datagen/gen/generator.go | 3 + integration_tests/datagen/load_gen.go | 10 + integration_tests/datagen/main.go | 9 +- integration_tests/kafka-cdc/create_source.sql | 39 ++++ integration_tests/kafka-cdc/data_check | 1 + .../kafka-cdc/docker-compose.yml | 51 +++++ 10 files changed, 355 insertions(+), 41 deletions(-) create mode 100644 integration_tests/datagen/compatible_data/compatible_data.go create mode 100644 integration_tests/kafka-cdc/create_source.sql create mode 100644 integration_tests/kafka-cdc/data_check create mode 100644 integration_tests/kafka-cdc/docker-compose.yml diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 6332b98ecca9..43cfecaec59c 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -3,46 +3,47 @@ import subprocess CASES_MAP = { - 'ad-click': ['json'], - 'ad-ctr': ['json'], - 'cdn-metrics': ['json'], - 'clickstream': ['json'], - 'livestream': ['json', 'protobuf'], - 'prometheus': ['json'], - 'schema-registry': ['json'], - 'mysql-cdc': ['json'], - 'postgres-cdc': ['json'], - 'mongodb-cdc': ['json'], - 'mysql-sink': ['json'], - 'postgres-sink': ['json'], - 'iceberg-cdc': ['json'], - 'iceberg-sink': ['none'], - 'iceberg-source': ['none'], - 'twitter': ['json', 'protobuf'], - 'twitter-pulsar': ['json'], - 'debezium-mysql': ['json'], - 'debezium-postgres': ['json'], - 'debezium-sqlserver': ['json'], - 'tidb-cdc-sink': ['json'], - 'citus-cdc': ['json'], - 'kinesis-s3-source': ['json'], - 'clickhouse-sink': ['json'], - 'cockroach-sink': ['json'], - 'kafka-cdc-sink': ['json'], - 'cassandra-and-scylladb-sink': ['json'], - 'elasticsearch-sink': ['json'], - 'redis-sink': ['json'], - 'big-query-sink': ['json'], - 'mindsdb': ['json'], - 'vector': ['json'], - 'nats': ['json', 'protobuf'], - 'mqtt': ['json'], - 'doris-sink': ['json'], - 'starrocks-sink': ['json'], - 'deltalake-sink': ['json'], - 'pinot-sink': ['json'], - 'presto-trino': ['json'], - 'client-library': ['none'], + # 'ad-click': ['json'], + # 'ad-ctr': ['json'], + # 'cdn-metrics': ['json'], + # 'clickstream': ['json'], + # 'livestream': ['json', 'protobuf'], + # 'prometheus': ['json'], + # 'schema-registry': ['json'], + # 'mysql-cdc': ['json'], + # 'postgres-cdc': ['json'], + # 'mongodb-cdc': ['json'], + # 'mysql-sink': ['json'], + # 'postgres-sink': ['json'], + # 'iceberg-cdc': ['json'], + # 'iceberg-sink': ['none'], + # 'iceberg-source': ['none'], + # 'twitter': ['json', 'protobuf'], + # 'twitter-pulsar': ['json'], + # 'debezium-mysql': ['json'], + # 'debezium-postgres': ['json'], + # 'debezium-sqlserver': ['json'], + # 'tidb-cdc-sink': ['json'], + # 'citus-cdc': ['json'], + # 'kinesis-s3-source': ['json'], + # 'clickhouse-sink': ['json'], + # 'cockroach-sink': ['json'], + # 'kafka-cdc-sink': ['json'], + # 'cassandra-and-scylladb-sink': ['json'], + # 'elasticsearch-sink': ['json'], + # 'redis-sink': ['json'], + # 'big-query-sink': ['json'], + # 'mindsdb': ['json'], + # 'vector': ['json'], + # 'nats': ['json', 'protobuf'], + # 'mqtt': ['json'], + # 'doris-sink': ['json'], + # 'starrocks-sink': ['json'], + # 'deltalake-sink': ['json'], + # 'pinot-sink': ['json'], + # 'presto-trino': ['json'], + # 'client-library': ['none'], + 'kafka-cdc': ['json'], } def gen_pipeline_steps(): diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 0ebd01f8621e..8823c2f0ee92 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -103,6 +103,7 @@ "pinot-sink-json": ["yiming"], "presto-trino-json": ["wutao"], "client-library-none": ["wutao"], + "kafka-cdc-json": ["bohan"], } def get_failed_tests(get_test_status, test_map): diff --git a/integration_tests/datagen/Dockerfile b/integration_tests/datagen/Dockerfile index 97947c23f45e..fa5cf1c8a850 100644 --- a/integration_tests/datagen/Dockerfile +++ b/integration_tests/datagen/Dockerfile @@ -1,5 +1,7 @@ FROM golang as builder +ENV http_proxy=http://host.docker.internal:7890 +ENV https_proxy=http://host.docker.internal:7890 ADD . /datagen-src RUN cd /datagen-src && gofmt -s -w . && go build diff --git a/integration_tests/datagen/compatible_data/compatible_data.go b/integration_tests/datagen/compatible_data/compatible_data.go new file mode 100644 index 000000000000..a04441cb4366 --- /dev/null +++ b/integration_tests/datagen/compatible_data/compatible_data.go @@ -0,0 +1,199 @@ +package compatible_data + +import ( + "context" + "datagen/gen" + "datagen/sink" + "encoding/json" + "fmt" + "math" + "strings" +) + +type Struct struct { + S_int32 int32 `json:"s_int32"` + S_bool bool `json:"s_bool"` +} + +type compatibleData struct { + sink.BaseSinkRecord + + Id int32 `json:"id"` + C_boolean bool `json:"c_boolean"` + C_smallint int16 `json:"c_smallint"` + C_integer int32 `json:"c_integer"` + C_bigint int64 `json:"c_bigint"` + C_decimal string `json:"c_decimal"` + C_real float32 `json:"c_real"` + C_double_precision float64 `json:"c_double_precision"` + C_varchar string `json:"c_varchar"` + C_bytea string `json:"c_bytea"` + C_date string `json:"c_date"` + C_time string `json:"c_time"` + C_timestamp string `json:"c_timestamp"` + C_timestamptz string `json:"c_timestamptz"` + C_interval string `json:"c_interval"` + C_jsonb string `json:"c_jsonb"` + C_boolean_array []bool `json:"c_boolean_array"` + C_smallint_array []int16 `json:"c_smallint_array"` + C_integer_array []int32 `json:"c_integer_array"` + C_bigint_array []int64 `json:"c_bigint_array"` + C_decimal_array []string `json:"c_decimal_array"` + C_real_array []float32 `json:"c_real_array"` + C_double_precision_array []float64 `json:"c_double_precision_array"` + C_varchar_array []string `json:"c_varchar_array"` + C_bytea_array []string `json:"c_bytea_array"` + C_date_array []string `json:"c_date_array"` + C_time_array []string `json:"c_time_array"` + C_timestamp_array []string `json:"c_timestamp_array"` + C_timestamptz_array []string `json:"c_timestamptz_array"` + C_interval_array []string `json:"c_interval_array"` + C_jsonb_array []string `json:"c_jsonb_array"` + C_struct Struct `json:"c_struct"` +} + +func (c *compatibleData) Topic() string { + return "compatible_data" +} + +func (c *compatibleData) Key() string { + return fmt.Sprintf("%d", c.Id) +} + +func (c *compatibleData) ToPostgresSql() string { + panic("unimplemented") +} + +func (c *compatibleData) ToJson() []byte { + data, err := json.Marshal(c) + if err != nil { + panic("failed to marshal compatible data to JSON") + } + return data +} + +func (c *compatibleData) ToProtobuf() []byte { + panic("unimplemented") +} + +func (c *compatibleData) ToAvro() []byte { + panic("unimplemented") +} + +type compatibleDataGen struct { + recordSum int32 +} + +func NewCompatibleDataGen() gen.LoadGenerator { + return &compatibleDataGen{} +} + +func (g *compatibleDataGen) GenData() compatibleData { + g.recordSum++ + recordType := g.recordSum % 3 + if recordType == 0 { + return compatibleData{ + Id: g.recordSum, + C_boolean: true, + C_smallint: 0, + C_integer: 0, + C_bigint: 0, + C_decimal: "nan", + C_real: 0, + C_double_precision: 0, + C_varchar: "", + C_bytea: "", + C_date: "0001-01-01", + C_time: "00:00:00", + C_timestamp: "0001-01-01 00:00:00", + C_timestamptz: "0001-01-01 00:00:00Z", + C_interval: "P0Y0M0DT0H0M0S", + C_jsonb: "{}", + } + } else if recordType == 1 { + return compatibleData{ + Id: g.recordSum, + C_boolean: false, + C_smallint: math.MinInt16, + C_integer: math.MinInt32, + C_bigint: math.MinInt64, + C_decimal: "-123456789.123456789", + C_real: -9999.999999, + C_double_precision: -10000.0, + C_varchar: "a", + C_bytea: "a", + C_date: "1970-01-01", + C_time: "00:00:00.123456", + C_timestamp: "1970-01-01 00:00:00.123456", + C_timestamptz: "1970-01-01 00:00:00.123456Z", + C_interval: "P1Y2M3DT4H5M6S", + C_jsonb: "{}", + C_boolean_array: []bool{true, false}, + C_smallint_array: []int16{1}, + C_integer_array: []int32{1}, + C_bigint_array: []int64{1}, + C_decimal_array: []string{"1.0"}, + C_real_array: []float32{1.0}, + C_double_precision_array: []float64{1.0}, + C_varchar_array: []string{"aa"}, + C_bytea_array: []string{"aa"}, + C_date_array: []string{"1970-01-01"}, + C_time_array: []string{"00:00:00.123456"}, + C_timestamp_array: []string{"1970-01-01 00:00:00.123456"}, + C_timestamptz_array: []string{"1970-01-01 00:00:00.123456Z"}, + C_interval_array: []string{"P0Y0M0DT0H0M2S"}, + C_jsonb_array: []string{"{}"}, + C_struct: Struct{1, true}, + } + } else { + return compatibleData{ + Id: g.recordSum, + C_boolean: true, + C_smallint: math.MaxInt16, + C_integer: math.MaxInt32, + C_bigint: math.MaxInt64, + C_decimal: "123456789.123456789", + C_real: 9999.999999, + C_double_precision: 10000.0, + C_varchar: strings.Repeat("a", 100), + C_bytea: strings.Repeat("b", 100), + C_date: "9999-12-31", + C_time: "23:59:59.999999", + C_timestamp: "9999-12-31 23:59:59.999999", + C_timestamptz: "9999-12-31 23:59:59.999999Z", + C_interval: "P1Y2M3DT4H5M6S", + C_jsonb: "{\"mean\":1}", + C_boolean_array: []bool{true, false}, + C_smallint_array: []int16{1}, + C_integer_array: []int32{1}, + C_bigint_array: []int64{1}, + C_decimal_array: []string{"1.0"}, + C_real_array: []float32{1.0}, + C_double_precision_array: []float64{1.0}, + C_varchar_array: []string{"aa"}, + C_bytea_array: []string{"aa"}, + C_date_array: []string{"1970-01-01"}, + C_time_array: []string{"00:00:00.123456"}, + C_timestamp_array: []string{"1970-01-01 00:00:00.123456"}, + C_timestamptz_array: []string{"1970-01-01 00:00:00.123456Z"}, + C_interval_array: []string{"P1Y2M3DT4H5M6S"}, + C_jsonb_array: []string{"{}"}, + C_struct: Struct{-1, false}, + } + } +} + +func (g *compatibleDataGen) KafkaTopics() []string { + return []string{"compatible_data"} +} + +func (g *compatibleDataGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord) { + for { + record := g.GenData() + select { + case <-ctx.Done(): + return + case outCh <- &record: + } + } +} diff --git a/integration_tests/datagen/gen/generator.go b/integration_tests/datagen/gen/generator.go index 5cc7dfd2acef..28b5245cdf9a 100644 --- a/integration_tests/datagen/gen/generator.go +++ b/integration_tests/datagen/gen/generator.go @@ -42,6 +42,9 @@ type GeneratorConfig struct { // The topic to filter. If not specified, all topics will be used. Topic string + + // The total number of events to generate. + TotalEvents int64 } type LoadGenerator interface { diff --git a/integration_tests/datagen/load_gen.go b/integration_tests/datagen/load_gen.go index 77735983fb6d..a0df429482b0 100644 --- a/integration_tests/datagen/load_gen.go +++ b/integration_tests/datagen/load_gen.go @@ -6,6 +6,7 @@ import ( "datagen/ad_ctr" "datagen/cdn_metrics" "datagen/clickstream" + "datagen/compatible_data" "datagen/delivery" "datagen/ecommerce" "datagen/gen" @@ -67,6 +68,8 @@ func newGen(cfg gen.GeneratorConfig) (gen.LoadGenerator, error) { return livestream.NewLiveStreamMetricsGen(cfg), nil } else if cfg.Mode == "nexmark" { return nexmark.NewNexmarkGen(cfg), nil + } else if cfg.Mode == "compatible-data" { + return compatible_data.NewCompatibleDataGen(), nil } else { return nil, fmt.Errorf("invalid mode: %s", cfg.Mode) } @@ -148,6 +151,13 @@ func generateLoad(ctx context.Context, cfg gen.GeneratorConfig) error { return err } } + if cfg.TotalEvents > 0 && count >= cfg.TotalEvents { + if err := sinkImpl.Flush(ctx); err != nil { + return err + } + log.Printf("Sent %d records in total (Elapsed: %s)", count, time.Since(initTime).String()) + return nil + } } } } diff --git a/integration_tests/datagen/main.go b/integration_tests/datagen/main.go index 135e470ea204..cd3335925a04 100644 --- a/integration_tests/datagen/main.go +++ b/integration_tests/datagen/main.go @@ -244,7 +244,7 @@ func main() { }, cli.StringFlag{ Name: "mode", - Usage: "ad-click | ad-ctr | twitter | cdn-metrics | clickstream | ecommerce | delivery | livestream", + Usage: "ad-click | ad-ctr | twitter | cdn-metrics | clickstream | ecommerce | delivery | livestream | compatible-data", Required: true, Destination: &cfg.Mode, }, @@ -267,6 +267,13 @@ func main() { Required: false, Destination: &cfg.Topic, }, + cli.Int64Flag{ + Name: "total_event", + Usage: "The total number of events to generate. If not specified, the generator will run indefinitely.", + Value: 0, + Required: false, + Destination: &cfg.TotalEvents, + }, }, } err := app.Run(os.Args) diff --git a/integration_tests/kafka-cdc/create_source.sql b/integration_tests/kafka-cdc/create_source.sql new file mode 100644 index 000000000000..cbc64ad7362d --- /dev/null +++ b/integration_tests/kafka-cdc/create_source.sql @@ -0,0 +1,39 @@ +CREATE TABLE compatible_data ( + id integer PRIMARY KEY, + c_boolean boolean, + c_smallint smallint, + c_integer integer, + c_bigint bigint, + c_decimal decimal, + c_real real, + c_double_precision double precision, + c_varchar varchar, + c_bytea bytea, + c_date date, + c_time time, + c_timestamp timestamp, + c_timestamptz timestamptz, + c_interval interval, + c_jsonb jsonb, + c_boolean_array boolean[], + c_smallint_array smallint[], + c_integer_array integer[], + c_bigint_array bigint[], + c_decimal_array decimal[], + c_real_array real[], + c_double_precision_array double precision[], + c_varchar_array varchar[], + c_bytea_array bytea[], + c_date_array date[], + c_time_array time[], + c_timestamp_array timestamp[], + c_timestamptz_array timestamptz[], + c_interval_array interval[], + c_jsonb_array jsonb[], + c_struct STRUCT, +) WITH ( + connector = 'kafka', + topic = 'compatible_data', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; diff --git a/integration_tests/kafka-cdc/data_check b/integration_tests/kafka-cdc/data_check new file mode 100644 index 000000000000..3c3ac8c12f43 --- /dev/null +++ b/integration_tests/kafka-cdc/data_check @@ -0,0 +1 @@ +compatible_data diff --git a/integration_tests/kafka-cdc/docker-compose.yml b/integration_tests/kafka-cdc/docker-compose.yml new file mode 100644 index 000000000000..d62fa2acd9df --- /dev/null +++ b/integration_tests/kafka-cdc/docker-compose.yml @@ -0,0 +1,51 @@ +--- +version: "3" +services: + risingwave-standalone: + extends: + file: ../../docker/docker-compose.yml + service: risingwave-standalone + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode compatible-data --qps 2 --total_event 3 kafka --brokers message_queue:29092 + restart: always + container_name: datagen + +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose From 933bdf124d4db548f504b35b426c5531110b45bf Mon Sep 17 00:00:00 2001 From: xfz Date: Wed, 17 Apr 2024 16:03:03 +0800 Subject: [PATCH 2/3] fix dockerfile --- integration_tests/datagen/Dockerfile | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration_tests/datagen/Dockerfile b/integration_tests/datagen/Dockerfile index fa5cf1c8a850..97947c23f45e 100644 --- a/integration_tests/datagen/Dockerfile +++ b/integration_tests/datagen/Dockerfile @@ -1,7 +1,5 @@ FROM golang as builder -ENV http_proxy=http://host.docker.internal:7890 -ENV https_proxy=http://host.docker.internal:7890 ADD . /datagen-src RUN cd /datagen-src && gofmt -s -w . && go build From 0295ca854bc7078e11e7edc889189e0bfabaf1b3 Mon Sep 17 00:00:00 2001 From: xfz Date: Wed, 17 Apr 2024 16:13:23 +0800 Subject: [PATCH 3/3] update --- ci/scripts/gen-integration-test-yaml.py | 80 ++++++++++++------------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 43cfecaec59c..61fd7721acc3 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -3,46 +3,46 @@ import subprocess CASES_MAP = { - # 'ad-click': ['json'], - # 'ad-ctr': ['json'], - # 'cdn-metrics': ['json'], - # 'clickstream': ['json'], - # 'livestream': ['json', 'protobuf'], - # 'prometheus': ['json'], - # 'schema-registry': ['json'], - # 'mysql-cdc': ['json'], - # 'postgres-cdc': ['json'], - # 'mongodb-cdc': ['json'], - # 'mysql-sink': ['json'], - # 'postgres-sink': ['json'], - # 'iceberg-cdc': ['json'], - # 'iceberg-sink': ['none'], - # 'iceberg-source': ['none'], - # 'twitter': ['json', 'protobuf'], - # 'twitter-pulsar': ['json'], - # 'debezium-mysql': ['json'], - # 'debezium-postgres': ['json'], - # 'debezium-sqlserver': ['json'], - # 'tidb-cdc-sink': ['json'], - # 'citus-cdc': ['json'], - # 'kinesis-s3-source': ['json'], - # 'clickhouse-sink': ['json'], - # 'cockroach-sink': ['json'], - # 'kafka-cdc-sink': ['json'], - # 'cassandra-and-scylladb-sink': ['json'], - # 'elasticsearch-sink': ['json'], - # 'redis-sink': ['json'], - # 'big-query-sink': ['json'], - # 'mindsdb': ['json'], - # 'vector': ['json'], - # 'nats': ['json', 'protobuf'], - # 'mqtt': ['json'], - # 'doris-sink': ['json'], - # 'starrocks-sink': ['json'], - # 'deltalake-sink': ['json'], - # 'pinot-sink': ['json'], - # 'presto-trino': ['json'], - # 'client-library': ['none'], + 'ad-click': ['json'], + 'ad-ctr': ['json'], + 'cdn-metrics': ['json'], + 'clickstream': ['json'], + 'livestream': ['json', 'protobuf'], + 'prometheus': ['json'], + 'schema-registry': ['json'], + 'mysql-cdc': ['json'], + 'postgres-cdc': ['json'], + 'mongodb-cdc': ['json'], + 'mysql-sink': ['json'], + 'postgres-sink': ['json'], + 'iceberg-cdc': ['json'], + 'iceberg-sink': ['none'], + 'iceberg-source': ['none'], + 'twitter': ['json', 'protobuf'], + 'twitter-pulsar': ['json'], + 'debezium-mysql': ['json'], + 'debezium-postgres': ['json'], + 'debezium-sqlserver': ['json'], + 'tidb-cdc-sink': ['json'], + 'citus-cdc': ['json'], + 'kinesis-s3-source': ['json'], + 'clickhouse-sink': ['json'], + 'cockroach-sink': ['json'], + 'kafka-cdc-sink': ['json'], + 'cassandra-and-scylladb-sink': ['json'], + 'elasticsearch-sink': ['json'], + 'redis-sink': ['json'], + 'big-query-sink': ['json'], + 'mindsdb': ['json'], + 'vector': ['json'], + 'nats': ['json', 'protobuf'], + 'mqtt': ['json'], + 'doris-sink': ['json'], + 'starrocks-sink': ['json'], + 'deltalake-sink': ['json'], + 'pinot-sink': ['json'], + 'presto-trino': ['json'], + 'client-library': ['none'], 'kafka-cdc': ['json'], }