Skip to content

Commit

Permalink
Merge branch 'main' into wrj/parser-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
TennyZhuang authored May 31, 2024
2 parents e36bdd4 + 669358e commit a35cd37
Show file tree
Hide file tree
Showing 63 changed files with 1,357 additions and 782 deletions.
26 changes: 26 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Usage: install pre-commit, and then run `pre-commit install` to install git hooks
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: local
hooks:
- id: rustfmt
name: rustfmt
entry: rustfmt --edition 2021
language: system
types: [rust]
- id: typos
name: typos
entry: typos -w
language: system
- id: cargo sort
name: cargo sort
entry: cargo sort -g -w
language: system
files: 'Cargo.toml'
pass_filenames: false
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ inout = "inout" # This is a SQL keyword!
numer = "numer" # numerator
nd = "nd" # N-dimentional / 2nd
steam = "stream" # You played with Steam games too much.
ser = "ser" # Serialization
# Some weird short variable names
ot = "ot"
bui = "bui" # BackwardUserIterator
Expand Down
2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ENV LANG en_US.utf8
RUN sed -i 's|http://archive.ubuntu.com/ubuntu|http://us-east-2.ec2.archive.ubuntu.com/ubuntu/|g' /etc/apt/sources.list
RUN apt-get update -yy && \
DEBIAN_FRONTEND=noninteractive apt-get -y install sudo make build-essential cmake protobuf-compiler curl parallel python3 python3-pip python3-venv software-properties-common \
openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config bash openjdk-11-jdk wget unzip git tmux lld postgresql-client kcat netcat-openbsd mysql-client \
openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config bash openjdk-17-jdk wget unzip git tmux lld postgresql-client kcat netcat-openbsd mysql-client \
maven zstd libzstd-dev locales \
python3.12 python3.12-dev \
&& rm -rf /var/lib/{apt,dpkg,cache,log}/
Expand Down
4 changes: 3 additions & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ services:
- "29092:29092"
- "9092:9092"
- "9644:9644"
- "8081:8081"
# Don't use Redpanda's schema registry, use the separated service instead
# - "8081:8081"
environment: {}
container_name: message_queue
healthcheck:
Expand Down Expand Up @@ -89,6 +90,7 @@ services:
- mysql
- db
- message_queue
- schemaregistry
- elasticsearch
- clickhouse-server
- redis-server
Expand Down
8 changes: 4 additions & 4 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,16 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
rpk topic delete test-rw-sink-upsert-avro
8 changes: 4 additions & 4 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
python3 -m pip install --break-system-packages requests protobuf confluent-kafka
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 20 user
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user
echo "make sure google/protobuf/source_context.proto is NOT in schema registry"
curl --silent 'http://message_queue:8081/subjects'; echo
# curl --silent --head -X GET 'http://message_queue:8081/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://message_queue:8081/subjects' | grep -v 'google/protobuf/source_context.proto'
curl --silent 'http://schemaregistry:8082/subjects'; echo
# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto'
risedev slt './e2e_test/schema_registry/pb.slt'
risedev slt './e2e_test/schema_registry/alter_sr.slt'

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ echo "--- Install dependencies"
dnf install -y perl-core wget python3 python3-devel cyrus-sasl-devel rsync openssl-devel

echo "--- Install java and maven"
dnf install -y java-11-openjdk java-11-openjdk-devel
dnf install -y java-17-openjdk java-17-openjdk-devel
pip3 install toml-cli
wget https://rw-ci-deps-dist.s3.amazonaws.com/apache-maven-3.9.3-bin.tar.gz && tar -zxvf apache-maven-3.9.3-bin.tar.gz
export PATH="${REPO_ROOT}/apache-maven-3.9.3/bin:$PATH"
Expand Down
6 changes: 3 additions & 3 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TEST_DIR=$PWD/e2e_test
BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
CLUSTER_PROFILE='ci-1cn-1fe-user-kafka-with-recovery'
echo "--- Configuring cluster profiles"
if [[ -n "${BUILDKITE:-}" ]]; then
echo "Running in buildkite"
Expand Down Expand Up @@ -187,14 +187,14 @@ test_sink_backfill_recovery() {

# Restart
restart_cluster
sleep 3
sleep 5

# Sink back into rw
run_sql "CREATE TABLE table_kafka (v1 int primary key)
WITH (
connector = 'kafka',
topic = 's_kafka',
properties.bootstrap.server = 'localhost:29092',
properties.bootstrap.server = 'message_queue:29092',
) FORMAT DEBEZIUM ENCODE JSON;"

sleep 10
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ steps:
- "build"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ steps:
- "build"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ubuntu:24.04 AS base
ENV LANG en_US.utf8

RUN apt-get update \
&& apt-get -y install ca-certificates build-essential libsasl2-dev openjdk-11-jdk software-properties-common python3.12 python3.12-dev openssl pkg-config
&& apt-get -y install ca-certificates build-essential libsasl2-dev openjdk-17-jdk software-properties-common python3.12 python3.12-dev openssl pkg-config

FROM base AS rust-base

Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ubuntu:24.04 AS base
ENV LANG en_US.utf8

RUN apt-get update \
&& apt-get -y install ca-certificates build-essential libsasl2-dev openjdk-11-jdk software-properties-common python3.12 python3.12-dev openssl pkg-config
&& apt-get -y install ca-certificates build-essential libsasl2-dev openjdk-17-jdk software-properties-common python3.12 python3.12-dev openssl pkg-config

FROM base AS dashboard-builder

Expand Down Expand Up @@ -113,7 +113,7 @@ RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Dno-build-rust
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node

FROM ubuntu:24.04 as image-base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk wget libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-17-jdk wget libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ from t x join t y
on x.v1 = y.v1
with (
connector='kafka',
properties.bootstrap.server='localhost:29092',
properties.bootstrap.server='message_queue:29092',
topic='s_kafka',
primary_key='v1',
allow.auto.create.topics=true,
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE SOURCE src_user WITH (
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.User'
);

Expand All @@ -24,7 +24,7 @@ CREATE TABLE t_user WITH (
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.User'
);

Expand All @@ -36,7 +36,7 @@ SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields

sleep 5s

Expand All @@ -58,7 +58,7 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields

sleep 5s

Expand Down
4 changes: 2 additions & 2 deletions e2e_test/schema_registry/pb.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ create table sr_pb_test with (
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.User'
);

Expand All @@ -21,7 +21,7 @@ create table sr_pb_test_bk with (
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081,http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082',
message = 'test.User'
);

Expand Down
12 changes: 6 additions & 6 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ with (
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

statement ok
create table into_kafka (
Expand Down Expand Up @@ -40,7 +40,7 @@ create sink sink0 from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

sleep 2s

Expand Down Expand Up @@ -72,7 +72,7 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

statement error field not in avro
create sink sink_err as select 1 as extra_column, * from into_kafka with (
Expand All @@ -81,7 +81,7 @@ create sink sink_err as select 1 as extra_column, * from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');
schema.registry = 'http://schemaregistry:8082');

statement error unrecognized
create sink sink_err from into_kafka with (
Expand All @@ -90,7 +90,7 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'typo');

statement error empty field key.message
Expand All @@ -100,7 +100,7 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
schema.registry.name.strategy = 'record_name_strategy');

statement ok
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ create table from_kafka_csr_trivial with (
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

statement ok
Expand All @@ -22,7 +22,7 @@ create table from_kafka_csr_nested with (
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageH.MessageI');

statement ok
Expand Down Expand Up @@ -68,7 +68,7 @@ create sink sink_csr_trivial as select string_field as field_a from into_kafka w
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

statement ok
Expand All @@ -78,7 +78,7 @@ create sink sink_csr_nested as select sint32_field as field_i from into_kafka wi
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageH.MessageI');

sleep 2s
Expand Down
Loading

0 comments on commit a35cd37

Please sign in to comment.