diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 6332b98ecca9f..61fd7721acc3f 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -43,6 +43,7 @@ 'pinot-sink': ['json'], 'presto-trino': ['json'], 'client-library': ['none'], + 'kafka-cdc': ['json'], } def gen_pipeline_steps(): diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 0ebd01f8621eb..8823c2f0ee923 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -103,6 +103,7 @@ "pinot-sink-json": ["yiming"], "presto-trino-json": ["wutao"], "client-library-none": ["wutao"], + "kafka-cdc-json": ["bohan"], } def get_failed_tests(get_test_status, test_map): diff --git a/integration_tests/datagen/compatible_data/compatible_data.go b/integration_tests/datagen/compatible_data/compatible_data.go new file mode 100644 index 0000000000000..a04441cb43661 --- /dev/null +++ b/integration_tests/datagen/compatible_data/compatible_data.go @@ -0,0 +1,199 @@ +package compatible_data + +import ( + "context" + "datagen/gen" + "datagen/sink" + "encoding/json" + "fmt" + "math" + "strings" +) + +type Struct struct { + S_int32 int32 `json:"s_int32"` + S_bool bool `json:"s_bool"` +} + +type compatibleData struct { + sink.BaseSinkRecord + + Id int32 `json:"id"` + C_boolean bool `json:"c_boolean"` + C_smallint int16 `json:"c_smallint"` + C_integer int32 `json:"c_integer"` + C_bigint int64 `json:"c_bigint"` + C_decimal string `json:"c_decimal"` + C_real float32 `json:"c_real"` + C_double_precision float64 `json:"c_double_precision"` + C_varchar string `json:"c_varchar"` + C_bytea string `json:"c_bytea"` + C_date string `json:"c_date"` + C_time string `json:"c_time"` + C_timestamp string `json:"c_timestamp"` + C_timestamptz string `json:"c_timestamptz"` + C_interval string `json:"c_interval"` + C_jsonb string `json:"c_jsonb"` + C_boolean_array []bool `json:"c_boolean_array"` + C_smallint_array []int16 `json:"c_smallint_array"` + C_integer_array []int32 `json:"c_integer_array"` + C_bigint_array []int64 `json:"c_bigint_array"` + C_decimal_array []string `json:"c_decimal_array"` + C_real_array []float32 `json:"c_real_array"` + C_double_precision_array []float64 `json:"c_double_precision_array"` + C_varchar_array []string `json:"c_varchar_array"` + C_bytea_array []string `json:"c_bytea_array"` + C_date_array []string `json:"c_date_array"` + C_time_array []string `json:"c_time_array"` + C_timestamp_array []string `json:"c_timestamp_array"` + C_timestamptz_array []string `json:"c_timestamptz_array"` + C_interval_array []string `json:"c_interval_array"` + C_jsonb_array []string `json:"c_jsonb_array"` + C_struct Struct `json:"c_struct"` +} + +func (c *compatibleData) Topic() string { + return "compatible_data" +} + +func (c *compatibleData) Key() string { + return fmt.Sprintf("%d", c.Id) +} + +func (c *compatibleData) ToPostgresSql() string { + panic("unimplemented") +} + +func (c *compatibleData) ToJson() []byte { + data, err := json.Marshal(c) + if err != nil { + panic("failed to marshal compatible data to JSON") + } + return data +} + +func (c *compatibleData) ToProtobuf() []byte { + panic("unimplemented") +} + +func (c *compatibleData) ToAvro() []byte { + panic("unimplemented") +} + +type compatibleDataGen struct { + recordSum int32 +} + +func NewCompatibleDataGen() gen.LoadGenerator { + return &compatibleDataGen{} +} + +func (g *compatibleDataGen) GenData() compatibleData { + g.recordSum++ + recordType := g.recordSum % 3 + if recordType == 0 { + return compatibleData{ + Id: g.recordSum, + C_boolean: true, + C_smallint: 0, + C_integer: 0, + C_bigint: 0, + C_decimal: "nan", + C_real: 0, + C_double_precision: 0, + C_varchar: "", + C_bytea: "", + C_date: "0001-01-01", + C_time: "00:00:00", + C_timestamp: "0001-01-01 00:00:00", + C_timestamptz: "0001-01-01 00:00:00Z", + C_interval: "P0Y0M0DT0H0M0S", + C_jsonb: "{}", + } + } else if recordType == 1 { + return compatibleData{ + Id: g.recordSum, + C_boolean: false, + C_smallint: math.MinInt16, + C_integer: math.MinInt32, + C_bigint: math.MinInt64, + C_decimal: "-123456789.123456789", + C_real: -9999.999999, + C_double_precision: -10000.0, + C_varchar: "a", + C_bytea: "a", + C_date: "1970-01-01", + C_time: "00:00:00.123456", + C_timestamp: "1970-01-01 00:00:00.123456", + C_timestamptz: "1970-01-01 00:00:00.123456Z", + C_interval: "P1Y2M3DT4H5M6S", + C_jsonb: "{}", + C_boolean_array: []bool{true, false}, + C_smallint_array: []int16{1}, + C_integer_array: []int32{1}, + C_bigint_array: []int64{1}, + C_decimal_array: []string{"1.0"}, + C_real_array: []float32{1.0}, + C_double_precision_array: []float64{1.0}, + C_varchar_array: []string{"aa"}, + C_bytea_array: []string{"aa"}, + C_date_array: []string{"1970-01-01"}, + C_time_array: []string{"00:00:00.123456"}, + C_timestamp_array: []string{"1970-01-01 00:00:00.123456"}, + C_timestamptz_array: []string{"1970-01-01 00:00:00.123456Z"}, + C_interval_array: []string{"P0Y0M0DT0H0M2S"}, + C_jsonb_array: []string{"{}"}, + C_struct: Struct{1, true}, + } + } else { + return compatibleData{ + Id: g.recordSum, + C_boolean: true, + C_smallint: math.MaxInt16, + C_integer: math.MaxInt32, + C_bigint: math.MaxInt64, + C_decimal: "123456789.123456789", + C_real: 9999.999999, + C_double_precision: 10000.0, + C_varchar: strings.Repeat("a", 100), + C_bytea: strings.Repeat("b", 100), + C_date: "9999-12-31", + C_time: "23:59:59.999999", + C_timestamp: "9999-12-31 23:59:59.999999", + C_timestamptz: "9999-12-31 23:59:59.999999Z", + C_interval: "P1Y2M3DT4H5M6S", + C_jsonb: "{\"mean\":1}", + C_boolean_array: []bool{true, false}, + C_smallint_array: []int16{1}, + C_integer_array: []int32{1}, + C_bigint_array: []int64{1}, + C_decimal_array: []string{"1.0"}, + C_real_array: []float32{1.0}, + C_double_precision_array: []float64{1.0}, + C_varchar_array: []string{"aa"}, + C_bytea_array: []string{"aa"}, + C_date_array: []string{"1970-01-01"}, + C_time_array: []string{"00:00:00.123456"}, + C_timestamp_array: []string{"1970-01-01 00:00:00.123456"}, + C_timestamptz_array: []string{"1970-01-01 00:00:00.123456Z"}, + C_interval_array: []string{"P1Y2M3DT4H5M6S"}, + C_jsonb_array: []string{"{}"}, + C_struct: Struct{-1, false}, + } + } +} + +func (g *compatibleDataGen) KafkaTopics() []string { + return []string{"compatible_data"} +} + +func (g *compatibleDataGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord) { + for { + record := g.GenData() + select { + case <-ctx.Done(): + return + case outCh <- &record: + } + } +} diff --git a/integration_tests/datagen/gen/generator.go b/integration_tests/datagen/gen/generator.go index 5cc7dfd2acefa..28b5245cdf9a2 100644 --- a/integration_tests/datagen/gen/generator.go +++ b/integration_tests/datagen/gen/generator.go @@ -42,6 +42,9 @@ type GeneratorConfig struct { // The topic to filter. If not specified, all topics will be used. Topic string + + // The total number of events to generate. + TotalEvents int64 } type LoadGenerator interface { diff --git a/integration_tests/datagen/load_gen.go b/integration_tests/datagen/load_gen.go index 77735983fb6d1..a0df429482b09 100644 --- a/integration_tests/datagen/load_gen.go +++ b/integration_tests/datagen/load_gen.go @@ -6,6 +6,7 @@ import ( "datagen/ad_ctr" "datagen/cdn_metrics" "datagen/clickstream" + "datagen/compatible_data" "datagen/delivery" "datagen/ecommerce" "datagen/gen" @@ -67,6 +68,8 @@ func newGen(cfg gen.GeneratorConfig) (gen.LoadGenerator, error) { return livestream.NewLiveStreamMetricsGen(cfg), nil } else if cfg.Mode == "nexmark" { return nexmark.NewNexmarkGen(cfg), nil + } else if cfg.Mode == "compatible-data" { + return compatible_data.NewCompatibleDataGen(), nil } else { return nil, fmt.Errorf("invalid mode: %s", cfg.Mode) } @@ -148,6 +151,13 @@ func generateLoad(ctx context.Context, cfg gen.GeneratorConfig) error { return err } } + if cfg.TotalEvents > 0 && count >= cfg.TotalEvents { + if err := sinkImpl.Flush(ctx); err != nil { + return err + } + log.Printf("Sent %d records in total (Elapsed: %s)", count, time.Since(initTime).String()) + return nil + } } } } diff --git a/integration_tests/datagen/main.go b/integration_tests/datagen/main.go index 135e470ea204a..cd3335925a041 100644 --- a/integration_tests/datagen/main.go +++ b/integration_tests/datagen/main.go @@ -244,7 +244,7 @@ func main() { }, cli.StringFlag{ Name: "mode", - Usage: "ad-click | ad-ctr | twitter | cdn-metrics | clickstream | ecommerce | delivery | livestream", + Usage: "ad-click | ad-ctr | twitter | cdn-metrics | clickstream | ecommerce | delivery | livestream | compatible-data", Required: true, Destination: &cfg.Mode, }, @@ -267,6 +267,13 @@ func main() { Required: false, Destination: &cfg.Topic, }, + cli.Int64Flag{ + Name: "total_event", + Usage: "The total number of events to generate. If not specified, the generator will run indefinitely.", + Value: 0, + Required: false, + Destination: &cfg.TotalEvents, + }, }, } err := app.Run(os.Args) diff --git a/integration_tests/kafka-cdc/create_source.sql b/integration_tests/kafka-cdc/create_source.sql new file mode 100644 index 0000000000000..cbc64ad7362d5 --- /dev/null +++ b/integration_tests/kafka-cdc/create_source.sql @@ -0,0 +1,39 @@ +CREATE TABLE compatible_data ( + id integer PRIMARY KEY, + c_boolean boolean, + c_smallint smallint, + c_integer integer, + c_bigint bigint, + c_decimal decimal, + c_real real, + c_double_precision double precision, + c_varchar varchar, + c_bytea bytea, + c_date date, + c_time time, + c_timestamp timestamp, + c_timestamptz timestamptz, + c_interval interval, + c_jsonb jsonb, + c_boolean_array boolean[], + c_smallint_array smallint[], + c_integer_array integer[], + c_bigint_array bigint[], + c_decimal_array decimal[], + c_real_array real[], + c_double_precision_array double precision[], + c_varchar_array varchar[], + c_bytea_array bytea[], + c_date_array date[], + c_time_array time[], + c_timestamp_array timestamp[], + c_timestamptz_array timestamptz[], + c_interval_array interval[], + c_jsonb_array jsonb[], + c_struct STRUCT, +) WITH ( + connector = 'kafka', + topic = 'compatible_data', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; diff --git a/integration_tests/kafka-cdc/data_check b/integration_tests/kafka-cdc/data_check new file mode 100644 index 0000000000000..3c3ac8c12f438 --- /dev/null +++ b/integration_tests/kafka-cdc/data_check @@ -0,0 +1 @@ +compatible_data diff --git a/integration_tests/kafka-cdc/docker-compose.yml b/integration_tests/kafka-cdc/docker-compose.yml new file mode 100644 index 0000000000000..d62fa2acd9df3 --- /dev/null +++ b/integration_tests/kafka-cdc/docker-compose.yml @@ -0,0 +1,51 @@ +--- +version: "3" +services: + risingwave-standalone: + extends: + file: ../../docker/docker-compose.yml + service: risingwave-standalone + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode compatible-data --qps 2 --total_event 3 kafka --brokers message_queue:29092 + restart: always + container_name: datagen + +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose