Skip to content

Commit

Permalink
Merge branch 'main' into yiming/separate-connector-crate
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 20, 2023
2 parents 21a5201 + fa66cbd commit 2151b86
Show file tree
Hide file tree
Showing 229 changed files with 5,687 additions and 5,039 deletions.
134 changes: 77 additions & 57 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"src/cmd_all",
"src/common",
"src/common/common_service",
"src/common/heap_profiling",
"src/compute",
"src/connector",
"src/connector/connector_common",
Expand All @@ -20,6 +21,8 @@ members = [
"src/java_binding",
"src/jni_core",
"src/meta",
"src/meta/node",
"src/meta/service",
"src/meta/src/model_v2/migration",
"src/object_store",
"src/prost",
Expand Down Expand Up @@ -108,7 +111,7 @@ hashbrown = { version = "0.14.0", features = [
] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" }
arrow-array = "47"
Expand Down Expand Up @@ -142,6 +145,8 @@ risingwave_hummock_sdk = { path = "./src/storage/hummock_sdk" }
risingwave_hummock_test = { path = "./src/storage/hummock_test" }
risingwave_hummock_trace = { path = "./src/storage/hummock_trace" }
risingwave_meta = { path = "./src/meta" }
risingwave_meta_service = { path = "./src/meta/service" }
risingwave_meta_node = { path = "./src/meta/node" }
risingwave_object_store = { path = "./src/object_store" }
risingwave_pb = { path = "./src/prost" }
risingwave_rpc_client = { path = "./src/rpc_client" }
Expand Down
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ condition = { env_set = [
"ENABLE_BUILD_DASHBOARD",
], files_modified = { input = [
"./dashboard/**/*.js",
"./dashboard/**/*.ts*",
"./dashboard/package.json",
"./dashboard/next.config.js",
], output = [
Expand Down
28 changes: 28 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
Expand All @@ -28,6 +29,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
Expand Down Expand Up @@ -62,6 +72,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
Expand All @@ -87,6 +106,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
Expand Down
8 changes: 5 additions & 3 deletions ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ if [ "${BUILDKITE_SOURCE}" != "schedule" ] && [ "${BUILDKITE_SOURCE}" != "webhoo
fi

echo "--- Install java and maven"
yum install -y java-11-openjdk wget python3 cyrus-sasl-devel
yum install -y java-11-openjdk java-11-openjdk-devel wget python3 cyrus-sasl-devel
pip3 install toml-cli
wget https://ci-deps-dist.s3.amazonaws.com/apache-maven-3.9.3-bin.tar.gz && tar -zxvf apache-maven-3.9.3-bin.tar.gz
export PATH="${REPO_ROOT}/apache-maven-3.9.3/bin:$PATH"
Expand Down Expand Up @@ -64,6 +64,10 @@ elif [[ -n "${BINARY_NAME+x}" ]]; then
aws s3 cp risingwave-${BINARY_NAME}-x86_64-unknown-linux.tar.gz s3://risingwave-nightly-pre-built-binary
fi

echo "--- Build connector node"
cd ${REPO_ROOT}/java && mvn -B package -Dmaven.test.skip=true -Dno-build-rust
cd ${REPO_ROOT} && mv ${REPO_ROOT}/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz risingwave-connector-"${BUILDKITE_TAG}".tar.gz

if [[ -n "${BUILDKITE_TAG}" ]]; then
echo "--- Install gh cli"
yum install -y dnf
Expand All @@ -87,8 +91,6 @@ if [[ -n "${BUILDKITE_TAG}" ]]; then
gh release upload "${BUILDKITE_TAG}" risectl-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz

echo "--- Release build and upload risingwave connector node jar asset"
cd ${REPO_ROOT}/java && mvn -B package -Dmaven.test.skip=true -Dno-build-rust
cd connector-node/assembly/target && mv risingwave-connector-1.0.0.tar.gz risingwave-connector-"${BUILDKITE_TAG}".tar.gz
gh release upload "${BUILDKITE_TAG}" risingwave-connector-"${BUILDKITE_TAG}".tar.gz
fi

Expand Down
17 changes: 17 additions & 0 deletions ci/scripts/run-micro-benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ set -euo pipefail
# Make sure the added benchmark has a unique name.
BENCHMARKS="stream_hash_agg json_parser bench_block_iter bench_compactor bench_lru_cache bench_merge_iter"

# Reference: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
get_instance_type() {
TOKEN=`curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600"` \
&& curl -H "X-aws-ec2-metadata-token: $TOKEN" -v http://169.254.169.254/latest/meta-data/instance-type
}

# cargo criterion --bench stream_hash_agg --message-format=json
bench() {
BENCHMARK_NAME=$1
Expand All @@ -34,6 +40,17 @@ bench() {
}

main() {
# FIXME(kwannoel): This is a workaround
# Microbenchmarks need to be namespaced by instance types,
# the result upload endpoint needs to be parameterized by instance type as well to support this.
echo "--- Getting aws instance type"
local instance_type=$(get_instance_type)
echo "instance_type: $instance_type"
if [[ $instance_type != "m6i.4xlarge" ]]; then
echo "Only m6i.4xlarge is supported, skipping microbenchmark"
exit 0
fi

# We need cargo criterion to generate machine-readable benchmark results from
# microbench.
echo "--- Installing cargo criterion"
Expand Down
33 changes: 32 additions & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -529,4 +529,35 @@ steps:
if: build.pull_request.labels includes "ci/skip-ci" && !build.pull_request.draft
commands:
- echo "ci/skip-ci is only usable for draft Pull Requests"
- exit 1
- exit 1

- label: "micro benchmark"
command: "ci/scripts/run-micro-benchmarks.sh"
key: "run-micro-benchmarks"
if: build.pull_request.labels includes "ci/run-micro-benchmarks"
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 60
retry: *auto-retry

- label: "upload micro-benchmark"
if: build.pull_request.labels includes "ci/run-upload-micro-benchmark"
command:
- "BUILDKITE_BUILD_NUMBER=$BUILDKITE_BUILD_NUMBER ci/scripts/upload-micro-bench-results.sh"
depends_on: "run-micro-benchmarks"
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
BUILDKITE_TOKEN: buildkite_token
GITHUB_TOKEN: github-token
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- BUILDKITE_TOKEN
- GITHUB_TOKEN
timeout_in_minutes: 5
31 changes: 22 additions & 9 deletions dashboard/pages/await_tree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,32 @@ import { getClusterInfoComputeNode } from "./api/cluster"
import useFetch from "./api/fetch"

const SIDEBAR_WIDTH = 200
const ALL_COMPUTE_NODES = ""

export default function AwaitTreeDump() {
const { response: computeNodes } = useFetch(getClusterInfoComputeNode)

const [computeNodeId, setComputeNodeId] = useState<number>()
const [dump, setDump] = useState<string | undefined>("")
const [computeNodeId, setComputeNodeId] = useState<string>()
const [dump, setDump] = useState<string>("")

useEffect(() => {
if (computeNodes && !computeNodeId && computeNodes.length > 0) {
setComputeNodeId(computeNodes[0].id)
if (computeNodes && !computeNodeId) {
setComputeNodeId(ALL_COMPUTE_NODES)
}
}, [computeNodes, computeNodeId])

const dumpTree = async () => {
const title = `Await-Tree Dump of Compute Node ${computeNodeId}:`
setDump(undefined)
if (computeNodeId === undefined) {
return
}

let title
if (computeNodeId === ALL_COMPUTE_NODES) {
title = "Await-Tree Dump of All Compute Nodes:"
} else {
title = `Await-Tree Dump of Compute Node ${computeNodeId}:`
}
setDump("Loading...")

let result

Expand Down Expand Up @@ -92,10 +102,13 @@ export default function AwaitTreeDump() {
<FormLabel>Compute Nodes</FormLabel>
<VStack>
<Select
onChange={(event) =>
setComputeNodeId(parseInt(event.target.value))
}
onChange={(event) => setComputeNodeId(event.target.value)}
>
{computeNodes && (
<option value={ALL_COMPUTE_NODES} key={ALL_COMPUTE_NODES}>
All
</option>
)}
{computeNodes &&
computeNodes.map((n) => (
<option value={n.id} key={n.id}>
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions e2e_test/ddl/alter_rename_relation.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public.mv_on_v1 CREATE MATERIALIZED VIEW mv_on_v1 AS SELECT * FROM v5 AS v1
statement ok
ALTER INDEX idx RENAME TO idx1;

query TT
SHOW CREATE INDEX idx1;
----
public.idx1 CREATE INDEX idx1 ON t2(v1)

statement ok
INSERT INTO t2 VALUES(1,(1,(1,2)));

Expand Down
5 changes: 1 addition & 4 deletions e2e_test/s3/fs_source_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ def format_csv(data, with_header):
writer.writeheader()
for item_data in file_data:
writer.writerow(item_data)
# For now paser can only handle \n line seperator,
# and tailing white spaces are not allowed.
# TODO: remove replace and rstrip later
csv_files.append(ostream.getvalue().replace('\r', '').rstrip())
csv_files.append(ostream.getvalue())
return csv_files

def do_test(config, file_num, item_num_per_file, prefix, fmt):
Expand Down
10 changes: 10 additions & 0 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ create sink si_kafka_upsert from t_kafka with (
primary_key = 'id',
);

statement ok
create sink si_kafka_upsert_schema from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'test-rw-sink-upsert-schema',
primary_key = 'id',
) format upsert encode json (
schemas.enable = true
);

statement ok
create sink si_kafka_debezium from t_kafka with (
connector = 'kafka',
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/drop_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ drop sink si_kafka_upsert;
statement ok
drop sink si_kafka_debezium;

statement ok
drop sink si_kafka_upsert_schema;

statement ok
drop table t_kafka;
Loading

0 comments on commit 2151b86

Please sign in to comment.