Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into wrj/flat-json
Browse files Browse the repository at this point in the history
  • Loading branch information
wangrunji0408 committed Oct 20, 2023
2 parents ed10b36 + bbd2852 commit b49c78c
Show file tree
Hide file tree
Showing 90 changed files with 3,868 additions and 2,441 deletions.
18 changes: 8 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ hashbrown = { version = "0.14.0", features = [
] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" }
arrow-array = "47"
Expand Down
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ condition = { env_set = [
"ENABLE_BUILD_DASHBOARD",
], files_modified = { input = [
"./dashboard/**/*.js",
"./dashboard/**/*.ts*",
"./dashboard/package.json",
"./dashboard/next.config.js",
], output = [
Expand Down
28 changes: 28 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
Expand All @@ -28,6 +29,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
Expand Down Expand Up @@ -62,6 +72,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
Expand All @@ -87,6 +106,15 @@ if [ $? -ne 0 ]; then
exit 1
fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
Expand Down
8 changes: 5 additions & 3 deletions ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ if [ "${BUILDKITE_SOURCE}" != "schedule" ] && [ "${BUILDKITE_SOURCE}" != "webhoo
fi

echo "--- Install java and maven"
yum install -y java-11-openjdk wget python3 cyrus-sasl-devel
yum install -y java-11-openjdk java-11-openjdk-devel wget python3 cyrus-sasl-devel
pip3 install toml-cli
wget https://ci-deps-dist.s3.amazonaws.com/apache-maven-3.9.3-bin.tar.gz && tar -zxvf apache-maven-3.9.3-bin.tar.gz
export PATH="${REPO_ROOT}/apache-maven-3.9.3/bin:$PATH"
Expand Down Expand Up @@ -64,6 +64,10 @@ elif [[ -n "${BINARY_NAME+x}" ]]; then
aws s3 cp risingwave-${BINARY_NAME}-x86_64-unknown-linux.tar.gz s3://risingwave-nightly-pre-built-binary
fi

echo "--- Build connector node"
cd ${REPO_ROOT}/java && mvn -B package -Dmaven.test.skip=true -Dno-build-rust
cd ${REPO_ROOT} && mv ${REPO_ROOT}/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz risingwave-connector-"${BUILDKITE_TAG}".tar.gz

if [[ -n "${BUILDKITE_TAG}" ]]; then
echo "--- Install gh cli"
yum install -y dnf
Expand All @@ -87,8 +91,6 @@ if [[ -n "${BUILDKITE_TAG}" ]]; then
gh release upload "${BUILDKITE_TAG}" risectl-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz

echo "--- Release build and upload risingwave connector node jar asset"
cd ${REPO_ROOT}/java && mvn -B package -Dmaven.test.skip=true -Dno-build-rust
cd connector-node/assembly/target && mv risingwave-connector-1.0.0.tar.gz risingwave-connector-"${BUILDKITE_TAG}".tar.gz
gh release upload "${BUILDKITE_TAG}" risingwave-connector-"${BUILDKITE_TAG}".tar.gz
fi

Expand Down
31 changes: 22 additions & 9 deletions dashboard/pages/await_tree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,32 @@ import { getClusterInfoComputeNode } from "./api/cluster"
import useFetch from "./api/fetch"

const SIDEBAR_WIDTH = 200
const ALL_COMPUTE_NODES = ""

export default function AwaitTreeDump() {
const { response: computeNodes } = useFetch(getClusterInfoComputeNode)

const [computeNodeId, setComputeNodeId] = useState<number>()
const [dump, setDump] = useState<string | undefined>("")
const [computeNodeId, setComputeNodeId] = useState<string>()
const [dump, setDump] = useState<string>("")

useEffect(() => {
if (computeNodes && !computeNodeId && computeNodes.length > 0) {
setComputeNodeId(computeNodes[0].id)
if (computeNodes && !computeNodeId) {
setComputeNodeId(ALL_COMPUTE_NODES)
}
}, [computeNodes, computeNodeId])

const dumpTree = async () => {
const title = `Await-Tree Dump of Compute Node ${computeNodeId}:`
setDump(undefined)
if (computeNodeId === undefined) {
return
}

let title
if (computeNodeId === ALL_COMPUTE_NODES) {
title = "Await-Tree Dump of All Compute Nodes:"
} else {
title = `Await-Tree Dump of Compute Node ${computeNodeId}:`
}
setDump("Loading...")

let result

Expand Down Expand Up @@ -92,10 +102,13 @@ export default function AwaitTreeDump() {
<FormLabel>Compute Nodes</FormLabel>
<VStack>
<Select
onChange={(event) =>
setComputeNodeId(parseInt(event.target.value))
}
onChange={(event) => setComputeNodeId(event.target.value)}
>
{computeNodes && (
<option value={ALL_COMPUTE_NODES} key={ALL_COMPUTE_NODES}>
All
</option>
)}
{computeNodes &&
computeNodes.map((n) => (
<option value={n.id} key={n.id}>
Expand Down
10 changes: 10 additions & 0 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ create sink si_kafka_upsert from t_kafka with (
primary_key = 'id',
);

statement ok
create sink si_kafka_upsert_schema from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'test-rw-sink-upsert-schema',
primary_key = 'id',
) format upsert encode json (
schemas.enable = true
);

statement ok
create sink si_kafka_debezium from t_kafka with (
connector = 'kafka',
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/drop_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ drop sink si_kafka_upsert;
statement ok
drop sink si_kafka_debezium;

statement ok
drop sink si_kafka_upsert_schema;

statement ok
drop table t_kafka;
10 changes: 10 additions & 0 deletions e2e_test/sink/kafka/upsert_schema1.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":10,"v_bigint":20674,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_smallint":26951,"v_timestamp":1681404058888,"v_varchar":"0oVqRIHqkb"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1,"v_bigint":1872,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_smallint":31031,"v_timestamp":1681453634104,"v_varchar":"8DfUFencLe"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":2},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":2,"v_bigint":4598,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_smallint":22690,"v_timestamp":1681429444869,"v_varchar":"sIo1XXVeHZ"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":3},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":3,"v_bigint":14894,"v_double":9742.475509566086,"v_float":2660.290283203125,"v_integer":5894,"v_smallint":5985,"v_timestamp":1681429011269,"v_varchar":"LVLAhd1pQv"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":4},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":4,"v_bigint":24962,"v_double":3119.719721891862,"v_float":21217.77734375,"v_integer":7406,"v_smallint":6306,"v_timestamp":1681434727993,"v_varchar":"ORjwy3oMNb"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":5},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":5,"v_bigint":9253,"v_double":17464.91553421121,"v_float":22749.5,"v_integer":9253,"v_smallint":22765,"v_timestamp":1681444642324,"v_varchar":"sSkKswxrYd"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":6},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":6,"v_bigint":28842,"v_double":11210.458724794062,"v_float":5885.3681640625,"v_integer":10844,"v_smallint":4014,"v_timestamp":1681382522137,"v_varchar":"V4y71v4Gip"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":7},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":7,"v_bigint":15914,"v_double":10967.182297153104,"v_float":3946.743408203125,"v_integer":12652,"v_smallint":10324,"v_timestamp":1681447263083,"v_varchar":"YIVLnWxHyf"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":8},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":8,"v_bigint":28641,"v_double":993.408963466774,"v_float":13652.0732421875,"v_integer":19036,"v_smallint":194,"v_timestamp":1681393929356,"v_varchar":"lv7Eq3g8hx"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
{"payload":{"id":9},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":9,"v_bigint":24837,"v_double":11615.276406159757,"v_float":20699.55859375,"v_integer":20090,"v_smallint":10028,"v_timestamp":1681389642487,"v_varchar":"nwRq4zejSQ"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"}],"name":"dev.t_kafka","optional":false,"type":"struct"}}
Loading

0 comments on commit b49c78c

Please sign in to comment.