Skip to content

Commit

Permalink
Merge branch 'main' into yiming/iter-with-kv-ref
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Mar 8, 2024
2 parents f34418a + 2ef0ff2 commit 3bd9ea9
Show file tree
Hide file tree
Showing 205 changed files with 3,754 additions and 1,605 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,17 @@ function filter_stack_trace() {
| sed -E '/ at ...cargo/d' > tmp
cp tmp "$1"
rm tmp
}
}

get_latest_kafka_version() {
local versions=$(curl -s https://downloads.apache.org/kafka/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+")
# Sort the version numbers and get the latest one
local latest_version=$(echo "$versions" | sort -V | tail -n1)
echo $latest_version
}

get_latest_kafka_download_url() {
local latest_version=$(get_latest_kafka_version)
local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz"
echo $download_url
}
4 changes: 2 additions & 2 deletions ci/scripts/gen-flamegraph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ install_all() {
promql --version

echo ">>> Installing Kafka"
wget https://archive.apache.org/dist/kafka/3.4.1/kafka_2.13-3.4.1.tgz
tar -zxvf kafka_2.13-3.4.1.tgz
wget $(get_latest_kafka_download_url) -O kafka_latest.tgz
tar -zxvf kafka_latest.tgz

echo ">>> Installing nexmark bench"
buildkite-agent artifact download nexmark-server /usr/local/bin
Expand Down
3 changes: 2 additions & 1 deletion ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
'postgres-sink': ['json'],
'iceberg-cdc': ['json'],
'iceberg-sink': ['none'],
'iceberg-source': ['none'],
'twitter': ['json', 'protobuf'],
'twitter-pulsar': ['json'],
'debezium-mysql': ['json'],
Expand All @@ -33,7 +34,7 @@
'big-query-sink': ['json'],
'mindsdb': ['json'],
'vector': ['json'],
'nats': ['json'],
'nats': ['json', 'protobuf'],
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
Expand Down
2 changes: 2 additions & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
"deltalake-sink-json": ["xinhao"],
"pinot-sink-json": ["yiming"],
"client-library-none": ["tao"],
"nats-json": ["tao"],
"nats-protobuf": ["tao"],
}

def get_failed_tests(get_test_status, test_map):
Expand Down
4 changes: 3 additions & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ echo "--- e2e, $mode, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cluster_start
sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"
if [[ "$mode" != "single-node" ]]; then
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"
fi
sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt'
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 60
timeout_in_minutes: 65
retry: *auto-retry

- label: "end-to-end test (parallel) (release)"
Expand Down
54 changes: 54 additions & 0 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,55 @@ RUN add-apt-repository ppa:deadsnakes/ppa -y && \
DEBIAN_FRONTEND=noninteractive apt-get install python3.12 python3.12-dev -yy
ENV PYO3_PYTHON=python3.12


FROM base AS dashboard-builder

RUN apt-get update && apt-get install -y curl gnupg protobuf-compiler && mkdir -p /etc/apt/keyrings \
&& curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key | gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg \
&& echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_18.x nodistro main" | tee /etc/apt/sources.list.d/nodesource.list \
&& apt-get update && apt-get install -y nodejs

COPY ./dashboard/ /risingwave/dashboard
COPY ./proto /risingwave/proto

RUN cd /risingwave/dashboard && npm i && npm run build-static && rm -rf node_modules


FROM base AS java-planner

RUN mkdir -p /risingwave
WORKDIR /risingwave

COPY java /risingwave/java/

# Move java/**/pom.xml to poms/**/pom.xml
RUN find . -name pom.xml -exec bash -c 'mkdir -p poms/$(dirname {}); mv {} poms/{}' \;

# We use rust-maven-plugin to build java-binding. So it's FROM rust-base
FROM rust-base AS java-builder

RUN apt-get update && apt-get -y install maven

RUN mkdir -p /risingwave
WORKDIR /risingwave/java

# 1. copy only poms
COPY --from=java-planner /risingwave/poms /risingwave/java/

# 2. start downloading dependencies
RUN mvn dependency:go-offline --fail-never

# 3. add all source code and start compiling
# TODO: only add java related code so that changing rust code won't recompile java code
# Currently java-binding depends on the workspace Cargo.toml, which depends on the whole rust codebase
# Besides, rust-maven-plugin sets --target-dir, so the dependencies are built twice. How to dedup?
COPY ./ /risingwave

RUN mvn -B package -Dmaven.test.skip=true -Dno-build-rust && \
mkdir -p /risingwave/bin/connector-node && \
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node


FROM base AS builder

RUN apt-get update && apt-get -y install make cmake protobuf-compiler curl bash lld maven unzip
Expand All @@ -20,6 +69,11 @@ SHELL ["/bin/bash", "-c"]

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --default-toolchain none -y

ENV PATH /root/.cargo/bin/:$PATH
ENV CARGO_INCREMENTAL=0

COPY rust-toolchain rust-toolchain

RUN mkdir -p /risingwave

WORKDIR /risingwave
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions e2e_test/batch/config.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,12 @@ query T
show application_name;
----
slt

statement ok
set synchronize_seqscans to on;

statement ok
set synchronize_seqscans to f;

statement ok
set synchronize_seqscans to default;
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `maybe` for `rw_implicit_flush`
3: provided string was not `true` or `false`
3: Invalid bool


statement error
Expand Down
71 changes: 40 additions & 31 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,68 @@ FORMAT PLAIN ENCODE PROTOBUF(
statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

# Changing type is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(city: character varying\)
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithNewType'
);

# Changing format/encode is not allowed
statement error Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Protobuf, and altering them is not supported yet
ALTER SOURCE src_user FORMAT NATIVE ENCODE PROTOBUF(
statement ok
CREATE TABLE t_user WITH (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithMoreFields'
);
statement error
SELECT age FROM mv_user;

# Dropping columns is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(age: integer\)
statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user;
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

# Refresh table schema
statement ok
ALTER TABLE t_user REFRESH SCHEMA;

query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 4 0 10

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 10s
sleep 5s

query I
SELECT COUNT(*) FROM mv_user;
query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
25

statement error
SELECT SUM(age) FROM mv_user;
30 4 0 10

query III
SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields;
----
25 4 0
statement ok
DROP MATERIALIZED VIEW mv_user_more;

statement ok
DROP MATERIALIZED VIEW mv_user;
DROP TABLE t_user;

statement ok
DROP MATERIALIZED VIEW mv_more_fields;
DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;
27 changes: 9 additions & 18 deletions e2e_test/schema_registry/pb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from protobuf import user_pb2
from google.protobuf.source_context_pb2 import SourceContext
import sys
import importlib
from google.protobuf.source_context_pb2 import SourceContext
from confluent_kafka import Producer
from confluent_kafka.serialization import (
SerializationContext,
Expand All @@ -26,7 +26,7 @@ def get_user(i):
)

def get_user_with_more_fields(i):
return user_pb2.UserWithMoreFields(
return user_pb2.User(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
Expand All @@ -36,16 +36,6 @@ def get_user_with_more_fields(i):
age=i,
)

def get_user_with_new_type(i):
return user_pb2.UserWithNewType(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city=i,
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
)

def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
Expand All @@ -69,7 +59,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u


if __name__ == "__main__":
if len(sys.argv) < 5:
if len(sys.argv) < 6:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>")
exit(1)

Expand All @@ -79,10 +69,11 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
num_records = int(sys.argv[4])
pb_message = sys.argv[5]

user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2')

all_pb_messages = {
'user': (get_user, user_pb2.User),
'user_with_more_fields': (get_user_with_more_fields, user_pb2.UserWithMoreFields),
'user_with_new_type': (get_user_with_new_type, user_pb2.UserWithNewType),
'user': get_user,
'user_with_more_fields': get_user_with_more_fields,
}

assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}'
Expand All @@ -91,7 +82,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
producer_conf = {"bootstrap.servers": broker_list}

try:
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, *all_pb_messages[pb_message])
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User)
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
exit(1)
19 changes: 0 additions & 19 deletions e2e_test/schema_registry/protobuf/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,3 @@ enum Gender {
MALE = 0;
FEMALE = 1;
}

message UserWithMoreFields {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7; // new field here
}

message UserWithNewType {
int32 id = 1;
string name = 2;
string address = 3;
int32 city = 4; // change the type from string to int32
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
}
Loading

0 comments on commit 3bd9ea9

Please sign in to comment.