Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(source): add kafka-cdc compatibility test #16360

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
'pinot-sink': ['json'],
'presto-trino': ['json'],
'client-library': ['none'],
'kafka-cdc': ['json'],
}

def gen_pipeline_steps():
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
"pinot-sink-json": ["yiming"],
"presto-trino-json": ["wutao"],
"client-library-none": ["wutao"],
"kafka-cdc-json": ["bohan"],
}

def get_failed_tests(get_test_status, test_map):
Expand Down
199 changes: 199 additions & 0 deletions integration_tests/datagen/compatible_data/compatible_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package compatible_data

import (
"context"
"datagen/gen"
"datagen/sink"
"encoding/json"
"fmt"
"math"
"strings"
)

type Struct struct {
S_int32 int32 `json:"s_int32"`
S_bool bool `json:"s_bool"`
}

type compatibleData struct {
sink.BaseSinkRecord

Id int32 `json:"id"`
C_boolean bool `json:"c_boolean"`
C_smallint int16 `json:"c_smallint"`
C_integer int32 `json:"c_integer"`
C_bigint int64 `json:"c_bigint"`
C_decimal string `json:"c_decimal"`
C_real float32 `json:"c_real"`
C_double_precision float64 `json:"c_double_precision"`
C_varchar string `json:"c_varchar"`
C_bytea string `json:"c_bytea"`
C_date string `json:"c_date"`
C_time string `json:"c_time"`
C_timestamp string `json:"c_timestamp"`
C_timestamptz string `json:"c_timestamptz"`
C_interval string `json:"c_interval"`
C_jsonb string `json:"c_jsonb"`
C_boolean_array []bool `json:"c_boolean_array"`
C_smallint_array []int16 `json:"c_smallint_array"`
C_integer_array []int32 `json:"c_integer_array"`
C_bigint_array []int64 `json:"c_bigint_array"`
C_decimal_array []string `json:"c_decimal_array"`
C_real_array []float32 `json:"c_real_array"`
C_double_precision_array []float64 `json:"c_double_precision_array"`
C_varchar_array []string `json:"c_varchar_array"`
C_bytea_array []string `json:"c_bytea_array"`
C_date_array []string `json:"c_date_array"`
C_time_array []string `json:"c_time_array"`
C_timestamp_array []string `json:"c_timestamp_array"`
C_timestamptz_array []string `json:"c_timestamptz_array"`
C_interval_array []string `json:"c_interval_array"`
C_jsonb_array []string `json:"c_jsonb_array"`
C_struct Struct `json:"c_struct"`
}

func (c *compatibleData) Topic() string {
return "compatible_data"
}

func (c *compatibleData) Key() string {
return fmt.Sprintf("%d", c.Id)
}

func (c *compatibleData) ToPostgresSql() string {
panic("unimplemented")
}

func (c *compatibleData) ToJson() []byte {
data, err := json.Marshal(c)
if err != nil {
panic("failed to marshal compatible data to JSON")
}
return data
}

func (c *compatibleData) ToProtobuf() []byte {
panic("unimplemented")
}

func (c *compatibleData) ToAvro() []byte {
panic("unimplemented")
}

type compatibleDataGen struct {
recordSum int32
}

func NewCompatibleDataGen() gen.LoadGenerator {
return &compatibleDataGen{}
}

func (g *compatibleDataGen) GenData() compatibleData {
g.recordSum++
recordType := g.recordSum % 3
if recordType == 0 {
return compatibleData{
Id: g.recordSum,
C_boolean: true,
C_smallint: 0,
C_integer: 0,
C_bigint: 0,
C_decimal: "nan",
C_real: 0,
C_double_precision: 0,
C_varchar: "",
C_bytea: "",
C_date: "0001-01-01",
C_time: "00:00:00",
C_timestamp: "0001-01-01 00:00:00",
C_timestamptz: "0001-01-01 00:00:00Z",
C_interval: "P0Y0M0DT0H0M0S",
C_jsonb: "{}",
}
} else if recordType == 1 {
return compatibleData{
Id: g.recordSum,
C_boolean: false,
C_smallint: math.MinInt16,
C_integer: math.MinInt32,
C_bigint: math.MinInt64,
C_decimal: "-123456789.123456789",
C_real: -9999.999999,
C_double_precision: -10000.0,
C_varchar: "a",
C_bytea: "a",
C_date: "1970-01-01",
C_time: "00:00:00.123456",
C_timestamp: "1970-01-01 00:00:00.123456",
C_timestamptz: "1970-01-01 00:00:00.123456Z",
C_interval: "P1Y2M3DT4H5M6S",
C_jsonb: "{}",
C_boolean_array: []bool{true, false},
C_smallint_array: []int16{1},
C_integer_array: []int32{1},
C_bigint_array: []int64{1},
C_decimal_array: []string{"1.0"},
C_real_array: []float32{1.0},
C_double_precision_array: []float64{1.0},
C_varchar_array: []string{"aa"},
C_bytea_array: []string{"aa"},
C_date_array: []string{"1970-01-01"},
C_time_array: []string{"00:00:00.123456"},
C_timestamp_array: []string{"1970-01-01 00:00:00.123456"},
C_timestamptz_array: []string{"1970-01-01 00:00:00.123456Z"},
C_interval_array: []string{"P0Y0M0DT0H0M2S"},
C_jsonb_array: []string{"{}"},
C_struct: Struct{1, true},
}
} else {
return compatibleData{
Id: g.recordSum,
C_boolean: true,
C_smallint: math.MaxInt16,
C_integer: math.MaxInt32,
C_bigint: math.MaxInt64,
C_decimal: "123456789.123456789",
C_real: 9999.999999,
C_double_precision: 10000.0,
C_varchar: strings.Repeat("a", 100),
C_bytea: strings.Repeat("b", 100),
C_date: "9999-12-31",
C_time: "23:59:59.999999",
C_timestamp: "9999-12-31 23:59:59.999999",
C_timestamptz: "9999-12-31 23:59:59.999999Z",
C_interval: "P1Y2M3DT4H5M6S",
C_jsonb: "{\"mean\":1}",
C_boolean_array: []bool{true, false},
C_smallint_array: []int16{1},
C_integer_array: []int32{1},
C_bigint_array: []int64{1},
C_decimal_array: []string{"1.0"},
C_real_array: []float32{1.0},
C_double_precision_array: []float64{1.0},
C_varchar_array: []string{"aa"},
C_bytea_array: []string{"aa"},
C_date_array: []string{"1970-01-01"},
C_time_array: []string{"00:00:00.123456"},
C_timestamp_array: []string{"1970-01-01 00:00:00.123456"},
C_timestamptz_array: []string{"1970-01-01 00:00:00.123456Z"},
C_interval_array: []string{"P1Y2M3DT4H5M6S"},
C_jsonb_array: []string{"{}"},
C_struct: Struct{-1, false},
}
}
}

func (g *compatibleDataGen) KafkaTopics() []string {
return []string{"compatible_data"}
}

func (g *compatibleDataGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord) {
for {
record := g.GenData()
select {
case <-ctx.Done():
return
case outCh <- &record:
}
}
}
3 changes: 3 additions & 0 deletions integration_tests/datagen/gen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type GeneratorConfig struct {

// The topic to filter. If not specified, all topics will be used.
Topic string

// The total number of events to generate.
TotalEvents int64
}

type LoadGenerator interface {
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/datagen/load_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"datagen/ad_ctr"
"datagen/cdn_metrics"
"datagen/clickstream"
"datagen/compatible_data"
"datagen/delivery"
"datagen/ecommerce"
"datagen/gen"
Expand Down Expand Up @@ -67,6 +68,8 @@ func newGen(cfg gen.GeneratorConfig) (gen.LoadGenerator, error) {
return livestream.NewLiveStreamMetricsGen(cfg), nil
} else if cfg.Mode == "nexmark" {
return nexmark.NewNexmarkGen(cfg), nil
} else if cfg.Mode == "compatible-data" {
return compatible_data.NewCompatibleDataGen(), nil
} else {
return nil, fmt.Errorf("invalid mode: %s", cfg.Mode)
}
Expand Down Expand Up @@ -148,6 +151,13 @@ func generateLoad(ctx context.Context, cfg gen.GeneratorConfig) error {
return err
}
}
if cfg.TotalEvents > 0 && count >= cfg.TotalEvents {
if err := sinkImpl.Flush(ctx); err != nil {
return err
}
log.Printf("Sent %d records in total (Elapsed: %s)", count, time.Since(initTime).String())
return nil
}
}
}
}
9 changes: 8 additions & 1 deletion integration_tests/datagen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func main() {
},
cli.StringFlag{
Name: "mode",
Usage: "ad-click | ad-ctr | twitter | cdn-metrics | clickstream | ecommerce | delivery | livestream",
Usage: "ad-click | ad-ctr | twitter | cdn-metrics | clickstream | ecommerce | delivery | livestream | compatible-data",
Required: true,
Destination: &cfg.Mode,
},
Expand All @@ -267,6 +267,13 @@ func main() {
Required: false,
Destination: &cfg.Topic,
},
cli.Int64Flag{
Name: "total_event",
Usage: "The total number of events to generate. If not specified, the generator will run indefinitely.",
Value: 0,
Required: false,
Destination: &cfg.TotalEvents,
},
},
}
err := app.Run(os.Args)
Expand Down
39 changes: 39 additions & 0 deletions integration_tests/kafka-cdc/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
CREATE TABLE compatible_data (
id integer PRIMARY KEY,
c_boolean boolean,
c_smallint smallint,
c_integer integer,
c_bigint bigint,
c_decimal decimal,
c_real real,
c_double_precision double precision,
c_varchar varchar,
c_bytea bytea,
c_date date,
c_time time,
c_timestamp timestamp,
c_timestamptz timestamptz,
c_interval interval,
c_jsonb jsonb,
c_boolean_array boolean[],
c_smallint_array smallint[],
c_integer_array integer[],
c_bigint_array bigint[],
c_decimal_array decimal[],
c_real_array real[],
c_double_precision_array double precision[],
c_varchar_array varchar[],
c_bytea_array bytea[],
c_date_array date[],
c_time_array time[],
c_timestamp_array timestamp[],
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_struct STRUCT<s_int32 INTEGER, s_bool boolean>,
) WITH (
connector = 'kafka',
topic = 'compatible_data',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
1 change: 1 addition & 0 deletions integration_tests/kafka-cdc/data_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
compatible_data
51 changes: 51 additions & 0 deletions integration_tests/kafka-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
version: "3"
services:
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
message_queue:
extends:
file: ../../docker/docker-compose.yml
service: message_queue
datagen:
build: ../datagen
depends_on: [message_queue]
command:
- /bin/sh
- -c
- /datagen --mode compatible-data --qps 2 --total_event 3 kafka --brokers message_queue:29092
restart: always
container_name: datagen

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
Loading