From 10eb5b2bc5b7d25d0d36622274b46e7fdaad60e5 Mon Sep 17 00:00:00 2001 From: Tao Wu Date: Fri, 1 Mar 2024 15:00:41 +0800 Subject: [PATCH] test: support nats for datagen --- integration_tests/datagen/gen/generator.go | 2 + integration_tests/datagen/go.mod | 12 ++-- integration_tests/datagen/go.sum | 23 ++++--- integration_tests/datagen/load_gen.go | 3 + integration_tests/datagen/main.go | 22 ++++++ integration_tests/datagen/sink/nats/nats.go | 76 +++++++++++++++++++++ integration_tests/nats/create_source.sql | 75 ++++++++++++-------- integration_tests/nats/data_check | 2 +- integration_tests/nats/docker-compose.yml | 9 +++ integration_tests/nats/pb/create_source.sql | 12 ++++ integration_tests/nats/schema | 18 +++++ src/frontend/src/handler/create_source.rs | 2 +- 12 files changed, 213 insertions(+), 43 deletions(-) create mode 100644 integration_tests/datagen/sink/nats/nats.go create mode 100644 integration_tests/nats/pb/create_source.sql create mode 100644 integration_tests/nats/schema diff --git a/integration_tests/datagen/gen/generator.go b/integration_tests/datagen/gen/generator.go index f84ffe3fcdea..5cc7dfd2acef 100644 --- a/integration_tests/datagen/gen/generator.go +++ b/integration_tests/datagen/gen/generator.go @@ -6,6 +6,7 @@ import ( "datagen/sink/kafka" "datagen/sink/kinesis" "datagen/sink/mysql" + "datagen/sink/nats" "datagen/sink/postgres" "datagen/sink/pulsar" "datagen/sink/s3" @@ -21,6 +22,7 @@ type GeneratorConfig struct { Pulsar pulsar.PulsarConfig Kinesis kinesis.KinesisConfig S3 s3.S3Config + Nats nats.NatsConfig // Whether to print the content of every event. PrintInsert bool diff --git a/integration_tests/datagen/go.mod b/integration_tests/datagen/go.mod index 89299416c084..2cf89ae8d350 100644 --- a/integration_tests/datagen/go.mod +++ b/integration_tests/datagen/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/lib/pq v1.10.7 github.com/linkedin/goavro/v2 v2.9.8 + github.com/nats-io/nats.go v1.33.1 github.com/urfave/cli v1.22.10 go.uber.org/ratelimit v0.2.0 gonum.org/v1/gonum v0.12.0 @@ -48,10 +49,12 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect - github.com/klauspost/compress v1.15.11 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -67,12 +70,13 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/testify v1.8.0 // indirect go.uber.org/atomic v1.7.0 // indirect - golang.org/x/crypto v0.17.0 // indirect + golang.org/x/crypto v0.18.0 // indirect golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect - golang.org/x/sys v0.15.0 // indirect - golang.org/x/term v0.15.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/term v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/integration_tests/datagen/go.sum b/integration_tests/datagen/go.sum index 0a9025ec0a9d..e35807572608 100644 --- a/integration_tests/datagen/go.sum +++ b/integration_tests/datagen/go.sum @@ -285,8 +285,8 @@ github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNr github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= -github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -326,6 +326,12 @@ github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= +github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -449,8 +455,8 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -620,12 +626,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -636,6 +642,7 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/integration_tests/datagen/load_gen.go b/integration_tests/datagen/load_gen.go index 6a67500077b3..77735983fb6d 100644 --- a/integration_tests/datagen/load_gen.go +++ b/integration_tests/datagen/load_gen.go @@ -15,6 +15,7 @@ import ( "datagen/sink/kafka" "datagen/sink/kinesis" "datagen/sink/mysql" + "datagen/sink/nats" "datagen/sink/postgres" "datagen/sink/pulsar" "datagen/sink/s3" @@ -39,6 +40,8 @@ func createSink(ctx context.Context, cfg gen.GeneratorConfig) (sink.Sink, error) return kinesis.OpenKinesisSink(cfg.Kinesis) } else if cfg.Sink == "s3" { return s3.OpenS3Sink(cfg.S3) + } else if cfg.Sink == "nats" { + return nats.OpenNatsSink(cfg.Nats) } else { return nil, fmt.Errorf("invalid sink type: %s", cfg.Sink) } diff --git a/integration_tests/datagen/main.go b/integration_tests/datagen/main.go index 9c4d35226980..135e470ea204 100644 --- a/integration_tests/datagen/main.go +++ b/integration_tests/datagen/main.go @@ -205,6 +205,28 @@ func main() { }, HelpName: "datagen s3", }, + { + Name: "nats", + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "jetstream", + Usage: "Whether to use JetStream", + Required: false, + Destination: &cfg.Nats.JetStream, + }, + cli.StringFlag{ + Name: "url", + Usage: "The URL of the NATS server", + Required: true, + Destination: &cfg.Nats.Url, + }, + }, + Action: func(c *cli.Context) error { + cfg.Sink = "nats" + return runCommand() + }, + HelpName: "datagen nats", + }, }, Flags: []cli.Flag{ cli.BoolFlag{ diff --git a/integration_tests/datagen/sink/nats/nats.go b/integration_tests/datagen/sink/nats/nats.go new file mode 100644 index 000000000000..90df55a516fd --- /dev/null +++ b/integration_tests/datagen/sink/nats/nats.go @@ -0,0 +1,76 @@ +package nats + +import ( + "context" + "datagen/sink" + "fmt" + "time" + + "github.com/nats-io/nats.go" + _ "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type NatsConfig struct { + Url string + JetStream bool +} + +type NatsSink struct { + nc *nats.Conn + config NatsConfig + js jetstream.JetStream +} + +func OpenNatsSink(config NatsConfig) (*NatsSink, error) { + nc, err := nats.Connect(config.Url) + if err != nil { + return nil, fmt.Errorf("failed to connect to NATS server: %v", err) + } + js, err := jetstream.New(nc) + if err != nil { + return nil, fmt.Errorf("failed to create JetStream instance: %v", err) + } + return &NatsSink{nc, config, js}, nil +} + +func (p *NatsSink) Prepare(topics []string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if p.config.JetStream { + _, err := p.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "risingwave", + Subjects: topics, + }) + if err != nil { + return fmt.Errorf("failed to create JetStream stream: %v", err) + } + } + return nil +} + +func (p *NatsSink) Close() error { + p.nc.Close() + return nil +} + +func (p *NatsSink) WriteRecord(ctx context.Context, format string, record sink.SinkRecord) error { + data := sink.Encode(record, format) + + if p.config.JetStream { + _, err := p.js.Publish(ctx, record.Topic(), data) + if err != nil { + return fmt.Errorf("failed to publish record to JetStream: %v", err) + } + } else { + err := p.nc.Publish(record.Topic(), data) + if err != nil { + return fmt.Errorf("failed to request NATS server: %v", err) + } + } + return nil +} + +func (p *NatsSink) Flush(ctx context.Context) error { + return p.nc.Flush() +} diff --git a/integration_tests/nats/create_source.sql b/integration_tests/nats/create_source.sql index 0b15d9d859f6..633f7a6d41bd 100644 --- a/integration_tests/nats/create_source.sql +++ b/integration_tests/nats/create_source.sql @@ -1,21 +1,24 @@ -CREATE TABLE personnel (id integer, name varchar,); +CREATE TABLE + personnel (id integer, name varchar); -CREATE TABLE nats_source_table -( - id integer, - name varchar, -) -WITH ( - connector='nats', - server_url='nats-server:4222', - subject='subject1', + +CREATE TABLE + nats_source_table (id integer, name varchar) +WITH + ( + connector = 'nats', + server_url = 'nats-server:4222', + subject = 'subject1', stream = 'my_stream', - connect_mode='plain' -) FORMAT PLAIN ENCODE JSON; + connect_mode = 'plain' + ) FORMAT PLAIN ENCODE JSON; + CREATE SINK nats_sink FROM - personnel WITH ( + personnel +WITH + ( connector = 'nats', server_url = 'nats-server:4222', subject = 'subject1', @@ -24,28 +27,42 @@ FROM connect_mode = 'plain' ); -INSERT INTO - personnel -VALUES - (1, 'Alice'), - (2, 'Bob'); INSERT INTO personnel VALUES + (1, 'Alice'), + (2, 'Bob'), (3, 'Tom'), - (4, 'Jerry'); - -INSERT INTO - personnel -VALUES + (4, 'Jerry'), (5, 'Araminta'), - (6, 'Clover'); - -INSERT INTO - personnel -VALUES + (6, 'Clover'), (7, 'Posey'), (8, 'Waverly'); -FLUSH; \ No newline at end of file + +FLUSH; + + +CREATE TABLE live_stream_metrics ( + client_ip VARCHAR, + user_agent VARCHAR, + user_id VARCHAR, + room_id VARCHAR, + video_bps BIGINT, + video_fps BIGINT, + video_rtt BIGINT, + video_lost_pps BIGINT, + video_longest_freeze_duration BIGINT, + video_total_freeze_duration BIGINT, + report_timestamp TIMESTAMPTZ, + country VARCHAR +) +WITH + ( + connector = 'nats', + server_url = 'nats-server:4222', + subject = 'live_stream_metrics', + stream = 'risingwave', + connect_mode = 'plain' + ) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/nats/data_check b/integration_tests/nats/data_check index d95978e1a873..27fd2594faf6 100644 --- a/integration_tests/nats/data_check +++ b/integration_tests/nats/data_check @@ -1 +1 @@ -personnel,nats_source_table \ No newline at end of file +personnel,nats_source_table,live_stream_metrics \ No newline at end of file diff --git a/integration_tests/nats/docker-compose.yml b/integration_tests/nats/docker-compose.yml index 7c70f79e0ab1..d1f8d747d355 100644 --- a/integration_tests/nats/docker-compose.yml +++ b/integration_tests/nats/docker-compose.yml @@ -30,6 +30,15 @@ services: extends: file: ../../docker/docker-compose.yml service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode livestream --qps 10 nats --url nats-server:4222 --jetstream + restart: always + container_name: datagen volumes: compute-node-0: external: false diff --git a/integration_tests/nats/pb/create_source.sql b/integration_tests/nats/pb/create_source.sql new file mode 100644 index 000000000000..b37c5634b00a --- /dev/null +++ b/integration_tests/nats/pb/create_source.sql @@ -0,0 +1,12 @@ +CREATE TABLE live_stream_metrics +WITH + ( + connector = 'nats', + server_url = 'nats-server:4222', + subject = 'live_stream_metrics', + stream = 'risingwave', + connect_mode = 'plain' + ) FORMAT PLAIN ENCODE PROTOBUF ( + message = 'livestream.schema.LiveStreamMetrics', + schema.location = 'http://file_server:8080/schema' + ); \ No newline at end of file diff --git a/integration_tests/nats/schema b/integration_tests/nats/schema new file mode 100644 index 000000000000..08b5cd4852c7 --- /dev/null +++ b/integration_tests/nats/schema @@ -0,0 +1,18 @@ + +‰ +livestream.protolivestream.schema"Å +LiveStreamMetrics + client_ip ( RclientIp + +user_agent ( R userAgent +user_id ( RuserId +room_id ( RroomId + video_bps (RvideoBps + video_fps (RvideoFps + video_rtt (RvideoRtt$ +video_lost_pps (R videoLostPpsA +video_longest_freeze_duration (RvideoLongestFreezeDuration= +video_total_freeze_duration + (RvideoTotalFreezeDuration) +report_timestamp (RreportTimestamp +country ( RcountryBZlivestream/protobproto3 \ No newline at end of file diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 501f64205048..364ae3cb80d9 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -992,7 +992,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), NATS_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Json], + Format::Plain => vec![Encode::Json, Encode::Protobuf], ), TEST_CONNECTOR => hashmap!( Format::Plain => vec![Encode::Json],