Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: support protobuf for nats source #15378

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integration_tests/datagen/gen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"datagen/sink/kafka"
"datagen/sink/kinesis"
"datagen/sink/mysql"
"datagen/sink/nats"
"datagen/sink/postgres"
"datagen/sink/pulsar"
"datagen/sink/s3"
Expand All @@ -21,6 +22,7 @@ type GeneratorConfig struct {
Pulsar pulsar.PulsarConfig
Kinesis kinesis.KinesisConfig
S3 s3.S3Config
Nats nats.NatsConfig

// Whether to print the content of every event.
PrintInsert bool
Expand Down
12 changes: 8 additions & 4 deletions integration_tests/datagen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/go-sql-driver/mysql v1.7.0
github.com/lib/pq v1.10.7
github.com/linkedin/goavro/v2 v2.9.8
github.com/nats-io/nats.go v1.33.1
github.com/urfave/cli v1.22.10
go.uber.org/ratelimit v0.2.0
gonum.org/v1/gonum v0.12.0
Expand Down Expand Up @@ -48,10 +49,12 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -67,12 +70,13 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
23 changes: 15 additions & 8 deletions integration_tests/datagen/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNr
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -326,6 +326,12 @@ github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down Expand Up @@ -449,8 +455,8 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -620,12 +626,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -636,6 +642,7 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/datagen/load_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"datagen/sink/kafka"
"datagen/sink/kinesis"
"datagen/sink/mysql"
"datagen/sink/nats"
"datagen/sink/postgres"
"datagen/sink/pulsar"
"datagen/sink/s3"
Expand All @@ -39,6 +40,8 @@ func createSink(ctx context.Context, cfg gen.GeneratorConfig) (sink.Sink, error)
return kinesis.OpenKinesisSink(cfg.Kinesis)
} else if cfg.Sink == "s3" {
return s3.OpenS3Sink(cfg.S3)
} else if cfg.Sink == "nats" {
return nats.OpenNatsSink(cfg.Nats)
} else {
return nil, fmt.Errorf("invalid sink type: %s", cfg.Sink)
}
Expand Down
22 changes: 22 additions & 0 deletions integration_tests/datagen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,28 @@ func main() {
},
HelpName: "datagen s3",
},
{
Name: "nats",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "jetstream",
Usage: "Whether to use JetStream",
Required: false,
Destination: &cfg.Nats.JetStream,
},
cli.StringFlag{
Name: "url",
Usage: "The URL of the NATS server",
Required: true,
Destination: &cfg.Nats.Url,
},
},
Action: func(c *cli.Context) error {
cfg.Sink = "nats"
return runCommand()
},
HelpName: "datagen nats",
},
},
Flags: []cli.Flag{
cli.BoolFlag{
Expand Down
76 changes: 76 additions & 0 deletions integration_tests/datagen/sink/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package nats

import (
"context"
"datagen/sink"
"fmt"
"time"

"github.com/nats-io/nats.go"
_ "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

type NatsConfig struct {
Url string
JetStream bool
}

type NatsSink struct {
nc *nats.Conn
config NatsConfig
js jetstream.JetStream
}

func OpenNatsSink(config NatsConfig) (*NatsSink, error) {
nc, err := nats.Connect(config.Url)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS server: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("failed to create JetStream instance: %v", err)
}
return &NatsSink{nc, config, js}, nil
}

func (p *NatsSink) Prepare(topics []string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if p.config.JetStream {
_, err := p.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "risingwave",
Subjects: topics,
})
if err != nil {
return fmt.Errorf("failed to create JetStream stream: %v", err)
}
}
return nil
}

func (p *NatsSink) Close() error {
p.nc.Close()
return nil
}

func (p *NatsSink) WriteRecord(ctx context.Context, format string, record sink.SinkRecord) error {
data := sink.Encode(record, format)

if p.config.JetStream {
_, err := p.js.Publish(ctx, record.Topic(), data)
if err != nil {
return fmt.Errorf("failed to publish record to JetStream: %v", err)
}
} else {
err := p.nc.Publish(record.Topic(), data)
if err != nil {
return fmt.Errorf("failed to request NATS server: %v", err)
}
}
return nil
}

func (p *NatsSink) Flush(ctx context.Context) error {
return p.nc.Flush()
}
75 changes: 46 additions & 29 deletions integration_tests/nats/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
CREATE TABLE personnel (id integer, name varchar,);
CREATE TABLE
personnel (id integer, name varchar);

CREATE TABLE nats_source_table
(
id integer,
name varchar,
)
WITH (
connector='nats',
server_url='nats-server:4222',
subject='subject1',

CREATE TABLE
nats_source_table (id integer, name varchar)
WITH
(
connector = 'nats',
server_url = 'nats-server:4222',
subject = 'subject1',
stream = 'my_stream',
connect_mode='plain'
) FORMAT PLAIN ENCODE JSON;
connect_mode = 'plain'
) FORMAT PLAIN ENCODE JSON;


CREATE SINK nats_sink
FROM
personnel WITH (
personnel
WITH
(
connector = 'nats',
server_url = 'nats-server:4222',
subject = 'subject1',
Expand All @@ -24,28 +27,42 @@ FROM
connect_mode = 'plain'
);

INSERT INTO
personnel
VALUES
(1, 'Alice'),
(2, 'Bob');

INSERT INTO
personnel
VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Tom'),
(4, 'Jerry');

INSERT INTO
personnel
VALUES
(4, 'Jerry'),
(5, 'Araminta'),
(6, 'Clover');

INSERT INTO
personnel
VALUES
(6, 'Clover'),
(7, 'Posey'),
(8, 'Waverly');

FLUSH;

FLUSH;


CREATE TABLE live_stream_metrics (
client_ip VARCHAR,
user_agent VARCHAR,
user_id VARCHAR,
room_id VARCHAR,
video_bps BIGINT,
video_fps BIGINT,
video_rtt BIGINT,
video_lost_pps BIGINT,
video_longest_freeze_duration BIGINT,
video_total_freeze_duration BIGINT,
report_timestamp TIMESTAMPTZ,
country VARCHAR
)
WITH
(
connector = 'nats',
server_url = 'nats-server:4222',
subject = 'live_stream_metrics',
stream = 'risingwave',
connect_mode = 'plain'
) FORMAT PLAIN ENCODE JSON;
2 changes: 1 addition & 1 deletion integration_tests/nats/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
personnel,nats_source_table
personnel,nats_source_table,live_stream_metrics
9 changes: 9 additions & 0 deletions integration_tests/nats/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ services:
extends:
file: ../../docker/docker-compose.yml
service: message_queue
datagen:
build: ../datagen
depends_on: [message_queue]
command:
- /bin/sh
- -c
- /datagen --mode livestream --qps 10 nats --url nats-server:4222 --jetstream
restart: always
container_name: datagen
volumes:
compute-node-0:
external: false
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/nats/pb/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE live_stream_metrics
WITH
(
connector = 'nats',
server_url = 'nats-server:4222',
subject = 'live_stream_metrics',
stream = 'risingwave',
connect_mode = 'plain'
) FORMAT PLAIN ENCODE PROTOBUF (
message = 'livestream.schema.LiveStreamMetrics',
schema.location = 'http://file_server:8080/schema'
);
18 changes: 18 additions & 0 deletions integration_tests/nats/schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

‰
livestream.protolivestream.schema"Å
LiveStreamMetrics
client_ip ( RclientIp

user_agent ( R userAgent
user_id ( RuserId
room_id ( RroomId
video_bps (RvideoBps
video_fps (RvideoFps
video_rtt (RvideoRtt$
video_lost_pps (R videoLostPpsA
video_longest_freeze_duration (RvideoLongestFreezeDuration=
video_total_freeze_duration
(RvideoTotalFreezeDuration)
report_timestamp (RreportTimestamp
country ( RcountryBZlivestream/protobproto3
Loading
Loading