diff --git a/Makefile.toml b/Makefile.toml index b0c8e2c4b993b..07e83dbff2d58 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -4,7 +4,6 @@ extend = [ { path = "src/risedevtool/minio.toml" }, { path = "src/risedevtool/etcd.toml" }, { path = "src/risedevtool/tempo.toml" }, - { path = "src/risedevtool/kafka.toml" }, { path = "src/risedevtool/gcloud-pubsub.toml" }, { path = "src/risedevtool/redis.toml" }, { path = "src/risedevtool/connector.toml" }, @@ -498,7 +497,6 @@ dependencies = [ "download-etcd", "download-grafana", "download-tempo", - "download-kafka", "download-mcli", "download-minio", "download-prometheus", @@ -556,7 +554,6 @@ dependencies = [ "download-grafana", "download-prometheus", "download-tempo", - "download-kafka", "download-redis", ] @@ -707,19 +704,6 @@ script = ''' set -euo pipefail -wait_kafka_exit() { - # Follow kafka-server-stop.sh - while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do - echo "Waiting for kafka to exit" - sleep 1 - done -} - -kill_kafka() { - ${PREFIX_BIN}/kafka/bin/kafka-server-stop.sh - wait_kafka_exit -} - if ! ${TMUX} ls &>/dev/null ; then echo "No risedev cluster to kill. Exiting..." exit 0 @@ -727,7 +711,6 @@ fi # Kill other components with Ctrl+C/Ctrl+D ${TMUX} list-windows -F "#{window_name} #{pane_id}" \ -| grep --invert-match --extended-regexp '(kafka)' \ | awk '{ print $2 }' \ | xargs -I {} ${TMUX} send-keys -t {} C-c C-d @@ -738,13 +721,6 @@ if [[ -n ${containers} ]]; then docker stop ${containers} fi -# Kill kafka cleanly. Ctrl+C will lose data. -if [[ -n $(${TMUX} list-windows | grep kafka) ]]; -then - echo "kill kafka" - kill_kafka || true -fi - ${TMUX} kill-server test $? -eq 0 || { echo "Failed to stop all RiseDev components."; exit 1; } ''' diff --git a/ci/risedev-components.ci.benchmark.env b/ci/risedev-components.ci.benchmark.env index 67171fe10bc28..7761d6ce969d0 100644 --- a/ci/risedev-components.ci.benchmark.env +++ b/ci/risedev-components.ci.benchmark.env @@ -1,7 +1,6 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true -ENABLE_KAFKA=true ENABLE_BUILD_RUST=true ENABLE_RELEASE_PROFILE=true ENABLE_PROMETHEUS_GRAFANA=true diff --git a/ci/risedev-components.ci.source.env b/ci/risedev-components.ci.source.env index 255b873eab94d..22755d4c6f3d8 100644 --- a/ci/risedev-components.ci.source.env +++ b/ci/risedev-components.ci.source.env @@ -1,5 +1,4 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true -ENABLE_KAFKA=true ENABLE_PUBSUB=true diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 559da130b7b61..206ce4ba1d75d 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -3,20 +3,22 @@ # Exits as soon as any line fails. set -euo pipefail -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1 +export RPK_BROKERS="message_queue:29092" + +rpk topic create test-rw-sink-append-only +rpk topic create test-rw-sink-upsert +rpk topic create test-rw-sink-upsert-schema +rpk topic create test-rw-sink-debezium +rpk topic create test-rw-sink-without-snapshot +rpk topic create test-rw-sink-text-key-id sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 # test append-only kafka sink echo "testing append-only kafka sink" -diff ./e2e_test/sink/kafka/append_only1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/append_only1.result \ +<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink is not as expected." exit 1 @@ -24,8 +26,8 @@ fi # test upsert kafka sink echo "testing upsert kafka sink" -diff ./e2e_test/sink/kafka/upsert1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/upsert1.result \ +<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink is not as expected." exit 1 @@ -33,8 +35,8 @@ fi # test upsert kafka sink with schema echo "testing upsert kafka sink with schema" -diff ./e2e_test/sink/kafka/upsert_schema1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/upsert_schema1.result \ +<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -42,7 +44,7 @@ fi # test debezium kafka sink echo "testing debezium kafka sink" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null +(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink is not as expected." @@ -58,8 +60,8 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '', # test append-only kafka sink after update echo "testing append-only kafka sink after updating data" -diff ./e2e_test/sink/kafka/append_only2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/append_only2.result \ +<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink after update is not as expected." exit 1 @@ -67,8 +69,8 @@ fi # test upsert kafka sink after update echo "testing upsert kafka sink after updating data" -diff ./e2e_test/sink/kafka/upsert2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/upsert2.result \ +<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." exit 1 @@ -76,8 +78,8 @@ fi # test upsert kafka sink with schema after update echo "testing upsert kafka sink with schema after updating data" -diff ./e2e_test/sink/kafka/upsert_schema2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/upsert_schema2.result \ +<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -85,7 +87,7 @@ fi # test debezium kafka sink after update echo "testing debezium kafka sink after updating data" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null +(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink after update is not as expected." @@ -97,8 +99,8 @@ fi # test without-snapshot kafka sink echo "testing without-snapshot kafka sink" -diff ./e2e_test/sink/kafka/without_snapshot.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --from-beginning --max-messages 3 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/without_snapshot.result \ +<((rpk topic consume test-rw-sink-without-snapshot --offset start --format '%v\n' --num 3 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink is not as expected." exit 1 @@ -110,8 +112,8 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;" # test upsert kafka sink after delete echo "testing upsert kafka sink after deleting data" -diff ./e2e_test/sink/kafka/upsert3.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/upsert3.result \ +<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." exit 1 @@ -119,8 +121,8 @@ fi # test upsert kafka sink with schema after delete echo "testing upsert kafka sink with schema after deleting data" -diff ./e2e_test/sink/kafka/upsert_schema3.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) +diff -b ./e2e_test/sink/kafka/upsert_schema3.result \ +<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -128,7 +130,7 @@ fi # test debezium kafka sink after delete echo "testing debezium kafka sink after deleting data" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null +(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink after delete is not as expected." @@ -139,9 +141,9 @@ else fi sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 +rpk topic delete test-rw-sink-append-only +rpk topic delete test-rw-sink-upsert +rpk topic delete test-rw-sink-debezium # test different encoding echo "preparing confluent schema registry" @@ -149,19 +151,19 @@ python3 -m pip install --break-system-packages requests confluent-kafka echo "testing protobuf" cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1 +rpk topic create test-rw-sink-append-only-protobuf +rpk topic create test-rw-sink-append-only-protobuf-csr-a +rpk topic create test-rw-sink-append-only-protobuf-csr-hi python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 +rpk topic delete test-rw-sink-append-only-protobuf +rpk topic delete test-rw-sink-append-only-protobuf-csr-a +rpk topic delete test-rw-sink-append-only-protobuf-csr-hi echo "testing avro" python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1 +rpk topic create test-rw-sink-upsert-avro sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1 +rpk topic delete test-rw-sink-upsert-avro diff --git a/e2e_test/sink/kafka/debezium.py b/e2e_test/sink/kafka/debezium.py index 4ec5e31281d88..5dbac1ae80216 100644 --- a/e2e_test/sink/kafka/debezium.py +++ b/e2e_test/sink/kafka/debezium.py @@ -11,9 +11,9 @@ # debezium sink sends k/v pair kv = line.split() key = json.loads(kv[0]) - # kafka consumer outputs string "null" for null payload - if kv[1] == "null": - value = kv[1] + # rpk output nothing for null payload + if len(kv) == 1: + value = None else: value = json.loads(kv[1]) # The `ts_ms` field may vary, so we delete it from the json object @@ -26,12 +26,10 @@ with open(test_output_file) as file: for line in file: kv = line.split() - if len(kv) != 2: - print(line) - assert(len(kv) == 2) key = json.loads(kv[0]) - if kv[1] == "null": - value = kv[1] + # rpk output nothing for null payload + if len(kv) == 1: + value = None else: value = json.loads(kv[1]) # Assert `ts_ms` is an integer here. diff --git a/e2e_test/sink/kafka/debezium3.result b/e2e_test/sink/kafka/debezium3.result index 196037c88b33b..b2d0235c2cfd2 100644 --- a/e2e_test/sink/kafka/debezium3.result +++ b/e2e_test/sink/kafka/debezium3.result @@ -1,5 +1,5 @@ {"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":0,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01T00:00:00.123456Z","v_varchar":"0oVqRIHqkb"},"before":null,"op":"c","source":{"db":"dev","table":"t_kafka","ts_ms":1700210120041},"ts_ms":1700210120041},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} -{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} null +{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":null,"before":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":-719162,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"op":"d","source":{"db":"dev","table":"t_kafka","ts_ms":1700210132903},"ts_ms":1700210132903},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":-719162,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"before":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":-719162,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"op":"u","source":{"db":"dev","table":"t_kafka","ts_ms":1700210127905},"ts_ms":1700210127905},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"RisingWave.dev.t_kafka.Key","optional":false,"type":"struct"}} {"payload":{"after":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":-719162,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"before":null,"op":"c","source":{"db":"dev","table":"t_kafka","ts_ms":1700210120041},"ts_ms":1700210120041},"schema":{"fields":[{"field":"before","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","name":"org.apache.kafka.connect.data.Timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","name":"io.debezium.time.Interval","optional":true,"type":"string"},{"field":"v_date","name":"org.apache.kafka.connect.data.Date","optional":true,"type":"int32"},{"field":"v_time","name":"org.apache.kafka.connect.data.Time","optional":true,"type":"int64"},{"field":"v_timestamptz","name":"io.debezium.time.ZonedTimestamp","optional":true,"type":"string"},{"field":"v_jsonb","name":"io.debezium.data.Json","optional":true,"type":"string"}],"name":"RisingWave.dev.t_kafka.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.dev.t_kafka.Envelope","optional":false,"type":"struct"}} diff --git a/e2e_test/sink/kafka/upsert3.result b/e2e_test/sink/kafka/upsert3.result index dde36e5f444eb..fe845aff27301 100644 --- a/e2e_test/sink/kafka/upsert3.result +++ b/e2e_test/sink/kafka/upsert3.result @@ -1,5 +1,5 @@ {"id":10} {"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01 00:00:00.123456","v_varchar":"0oVqRIHqkb"} -{"id":1} null +{"id":1} {"id":1} {"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":1,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":""} {"id":1} {"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":1,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"8DfUFencLe"} {"id":2} {"id":2,"v_bigint":4598,"v_bytea":"AA==","v_date":719163,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_interval":"P0Y0M0DT4H0M0S","v_jsonb":"{}","v_smallint":22690,"v_time":1000,"v_timestamp":1681429444869,"v_timestamptz":"0001-01-01 00:00:00.123456","v_varchar":"sIo1XXVeHZ"} diff --git a/e2e_test/sink/kafka/upsert_schema3.result b/e2e_test/sink/kafka/upsert_schema3.result index 18675dcbb76d3..ccbb7fe542b37 100644 --- a/e2e_test/sink/kafka/upsert_schema3.result +++ b/e2e_test/sink/kafka/upsert_schema3.result @@ -1,5 +1,5 @@ {"payload":{"id":10},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":10,"v_bigint":20674,"v_bytea":"AA==","v_date":719163,"v_double":9042.404483827513,"v_float":19387.23828125,"v_integer":20674,"v_interval":"P0Y0M0DT5H1M0.123457S","v_jsonb":"{}","v_smallint":26951,"v_time":1000,"v_timestamp":1681404058888,"v_timestamptz":"1970-01-01T00:00:00.123456Z","v_varchar":"0oVqRIHqkb"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} -{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} null +{"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1,"v_bigint":0,"v_bytea":"AA==","v_date":1,"v_double":0.0,"v_float":0.0,"v_integer":0,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":0,"v_time":1000,"v_timestamp":0,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":""},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":1,"v_bigint":1872,"v_bytea":"AA==","v_date":1,"v_double":23956.39329760601,"v_float":26261.416015625,"v_integer":1872,"v_interval":"P0Y0M0DT0H0M0S","v_jsonb":"{}","v_smallint":31031,"v_time":1000,"v_timestamp":1681453634104,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"8DfUFencLe"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":2},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} {"payload":{"id":2,"v_bigint":4598,"v_bytea":"AA==","v_date":719163,"v_double":31923.077305746086,"v_float":27031.224609375,"v_integer":4598,"v_interval":"P0Y0M0DT4H0M0S","v_jsonb":"{}","v_smallint":22690,"v_time":1000,"v_timestamp":1681429444869,"v_timestamptz":"0001-01-01T00:00:00.123456Z","v_varchar":"sIo1XXVeHZ"},"schema":{"fields":[{"field":"id","optional":true,"type":"int32"},{"field":"v_varchar","optional":true,"type":"string"},{"field":"v_smallint","optional":true,"type":"int16"},{"field":"v_integer","optional":true,"type":"int32"},{"field":"v_bigint","optional":true,"type":"int64"},{"field":"v_float","optional":true,"type":"float"},{"field":"v_double","optional":true,"type":"double"},{"field":"v_timestamp","optional":true,"type":"int64"},{"field":"v_bytea","optional":true,"type":"bytes"},{"field":"v_interval","optional":true,"type":"string"},{"field":"v_date","optional":true,"type":"int32"},{"field":"v_time","optional":true,"type":"int64"},{"field":"v_timestamptz","optional":true,"type":"string"},{"field":"v_jsonb","optional":true,"type":"string"}],"name":"dev.t_kafka","optional":false,"type":"struct"}} diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index fd9163f8747f7..48342bceafd42 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -20,11 +20,6 @@ script = ''' #!/usr/bin/env sh set -e -if [ ! -d "${PREFIX_BIN}/kafka" ]; then - echo "Kafka is not installed in ${PREFIX_BIN}/kafka. Did you enable Kafka using $(tput setaf 4)\`./risedev configure\`$(tput sgr0)?" - exit 1 -fi - if [ -z "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then echo "RISEDEV_KAFKA_BOOTSTRAP_SERVERS is not set in risedev-env file. Did you start Kafka using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?" exit 1 @@ -47,42 +42,6 @@ else fi ''' -[tasks.kafka-topics] -category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["check-kafka"] -script = """ -#!/usr/bin/env sh -set -e -${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" -""" - -[tasks.kafka-produce] -category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["check-kafka"] -script = """ -#!/usr/bin/env sh -set -e -${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" -""" - -[tasks.kafka-consume] -category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["check-kafka"] -script = """ -#!/usr/bin/env sh -set -e -${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" -""" - -[tasks.kafka-consumer-groups] -category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["check-kafka"] -script = """ -#!/usr/bin/env sh -set -e -${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" -""" - # rpk tools [tasks.rpk] category = "RiseDev - Test - Source Test - Kafka" diff --git a/risedev.yml b/risedev.yml index 6127eab7e8f12..65f84882c682c 100644 --- a/risedev.yml +++ b/risedev.yml @@ -1418,7 +1418,7 @@ template: # access key, secret key and region should be set in aws config (either by env var or .aws/config) - # Apache Kafka service + # Apache Kafka service backed by docker. kafka: # Id to be picked-up by services id: kafka-${port} @@ -1435,6 +1435,9 @@ template: # Listen address listen-address: ${address} + # The docker image. Can be overridden to use a different version. + image: "confluentinc/cp-kafka:7.6.1" + # If set to true, data will be persisted at data/{id}. persist-data: true diff --git a/scripts/source/prepare_ci_kafka.sh b/scripts/source/prepare_ci_kafka.sh index c3d72cca3d250..117e3a72401cb 100755 --- a/scripts/source/prepare_ci_kafka.sh +++ b/scripts/source/prepare_ci_kafka.sh @@ -37,10 +37,10 @@ for filename in $kafka_data_files; do # always ok echo "Drop topic $topic" - risedev kafka-topics --topic "$topic" --delete || true + risedev rpk topic delete "$topic" || true echo "Recreate topic $topic with partition $partition" - risedev kafka-topics --topic "$topic" --create --partitions "$partition") & + risedev rpk topic create "$topic" --partitions "$partition") & done wait diff --git a/src/cmd_all/scripts/e2e-full-standalone-demo.sh b/src/cmd_all/scripts/e2e-full-standalone-demo.sh index 28469aaddbe70..409c543e03afa 100755 --- a/src/cmd_all/scripts/e2e-full-standalone-demo.sh +++ b/src/cmd_all/scripts/e2e-full-standalone-demo.sh @@ -20,17 +20,14 @@ set -euo pipefail insert_json_kafka() { - echo $1 | \ - $KAFKA_PATH/bin/kafka-console-producer.sh \ - --topic source_kafka \ - --bootstrap-server localhost:29092 + echo $1 | + RPK_BROKERS=localhost:29092 \ + rpk topic produce source_kafka -f "%v" } create_topic_kafka() { - "$KAFKA_PATH"/bin/kafka-topics.sh \ - --create \ - --topic source_kafka \ - --bootstrap-server localhost:29092 + RPK_BROKERS=localhost:29092 \ + rpk topic create source_kafka } # Make sure we start on clean cluster diff --git a/src/risedevtool/config/src/main.rs b/src/risedevtool/config/src/main.rs index d69aad43f2dac..27a676c2bfb3e 100644 --- a/src/risedevtool/config/src/main.rs +++ b/src/risedevtool/config/src/main.rs @@ -62,7 +62,6 @@ pub enum Components { Hdfs, PrometheusAndGrafana, Etcd, - Kafka, Pubsub, Redis, Tracing, @@ -89,7 +88,6 @@ impl Components { Self::Hdfs => "[Component] Hummock: Hdfs Backend", Self::PrometheusAndGrafana => "[Component] Metrics: Prometheus + Grafana", Self::Etcd => "[Component] Etcd", - Self::Kafka => "[Component] Kafka", Self::Pubsub => "[Component] Google Pubsub", Self::Redis => "[Component] Redis", Self::BuildConnectorNode => "[Build] Build RisingWave Connector (Java)", @@ -130,11 +128,6 @@ Required if you want to view metrics." Required if you want to persistent meta-node data. " } - Self::Kafka => { - " -Required if you want to create source from Kafka. - " - } Self::Pubsub => { " Required if you want to create source from Emulated Google Pub/sub. @@ -224,7 +217,6 @@ With this option enabled, RiseDev will not set `RUST_BACKTRACE` when launching n "ENABLE_HDFS" => Some(Self::Hdfs), "ENABLE_PROMETHEUS_GRAFANA" => Some(Self::PrometheusAndGrafana), "ENABLE_ETCD" => Some(Self::Etcd), - "ENABLE_KAFKA" => Some(Self::Kafka), "ENABLE_PUBSUB" => Some(Self::Pubsub), "ENABLE_BUILD_RUST" => Some(Self::RustComponents), "ENABLE_BUILD_DASHBOARD" => Some(Self::Dashboard), @@ -252,7 +244,6 @@ With this option enabled, RiseDev will not set `RUST_BACKTRACE` when launching n Self::Hdfs => "ENABLE_HDFS", Self::PrometheusAndGrafana => "ENABLE_PROMETHEUS_GRAFANA", Self::Etcd => "ENABLE_ETCD", - Self::Kafka => "ENABLE_KAFKA", Self::Pubsub => "ENABLE_PUBSUB", Self::Redis => "ENABLE_REDIS", Self::RustComponents => "ENABLE_BUILD_RUST", diff --git a/src/risedevtool/kafka.toml b/src/risedevtool/kafka.toml deleted file mode 100644 index 4353e30aef919..0000000000000 --- a/src/risedevtool/kafka.toml +++ /dev/null @@ -1,32 +0,0 @@ -extend = "common.toml" - -[env] -KAFKA_DOWNLOAD_PATH = "${PREFIX_TMP}/kafka.tgz" - -[tasks.download-kafka] -private = true -category = "RiseDev - Components" -dependencies = ["prepare"] -condition = { env_set = [ "ENABLE_KAFKA" ] } -description = "Download and extract Kafka" -script = ''' -#!/usr/bin/env bash -set -e -if [ -d "${PREFIX_BIN}/kafka" ]; then - exit 0 -fi - -get_latest_kafka_version() { - local versions=$(curl -s https://downloads.apache.org/kafka/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+") - # Sort the version numbers and get the latest one - local latest_version=$(echo "$versions" | sort -V | tail -n1) - echo $latest_version -} - -echo "Kafka not found, downloading..." -latest_version=$(get_latest_kafka_version) -curl -fL -o "${KAFKA_DOWNLOAD_PATH}" "https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz" -tar -xf "${KAFKA_DOWNLOAD_PATH}" -C "${PREFIX_TMP}" -mv "${PREFIX_TMP}/kafka_2.13-${latest_version}" "${PREFIX_BIN}/kafka" -rm ${KAFKA_DOWNLOAD_PATH} -''' diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index a858e35026fd4..5a7ab843ddae2 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -272,7 +272,7 @@ fn task_main( ServiceConfig::Kafka(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = KafkaService::new(c.clone())?; + let mut service = KafkaService::new(c.clone()); service.execute(&mut ctx)?; let mut task = risedev::KafkaReadyCheckTask::new(c.clone())?; task.execute(&mut ctx)?; diff --git a/src/risedevtool/src/config_gen.rs b/src/risedevtool/src/config_gen.rs index d8aaa07b07aed..5b931a5d238cc 100644 --- a/src/risedevtool/src/config_gen.rs +++ b/src/risedevtool/src/config_gen.rs @@ -16,7 +16,5 @@ mod prometheus_gen; pub use prometheus_gen::*; mod grafana_gen; pub use grafana_gen::*; -mod kafka_gen; -pub use kafka_gen::*; mod tempo_gen; pub use tempo_gen::*; diff --git a/src/risedevtool/src/config_gen/kafka_gen.rs b/src/risedevtool/src/config_gen/kafka_gen.rs deleted file mode 100644 index b1b1443e907e8..0000000000000 --- a/src/risedevtool/src/config_gen/kafka_gen.rs +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::KafkaConfig; - -pub struct KafkaGen; - -impl KafkaGen { - pub fn gen_server_properties(&self, config: &KafkaConfig, kafka_log_dirs: &str) -> String { - let kafka_listen_host = &config.listen_address; - let kafka_advertise_host = &config.address; - let kafka_port = &config.port; - let controller_port = &config.controller_port; - let kafka_node_id = config.node_id; - - // https://github.com/apache/kafka/blob/trunk/config/kraft/server.properties - format!( - r#"# --- THIS FILE IS AUTO GENERATED BY RISEDEV --- - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# see kafka.server.KafkaConfig for additional details and defaults - -############################# Server Basics ############################# - -# The role of this server. Setting this puts us in KRaft mode -process.roles=controller,broker - -# The node id associated with this instance's roles -node.id={kafka_node_id} - -# The connect string for the controller quorum -controller.quorum.voters={kafka_node_id}@{kafka_advertise_host}:{controller_port} - -############################# Socket Server Settings ############################# - -# The address the socket server listens on. It will get the value returned from -# java.net.InetAddress.getCanonicalHostName() if not configured. -# FORMAT: -# listeners = listener_name://host_name:port -# EXAMPLE: -# listeners = PLAINTEXT://your.host.name:9092 -listeners=PLAINTEXT://{kafka_listen_host}:{kafka_port},CONTROLLER://{kafka_listen_host}:{controller_port} - -# A comma-separated list of the names of the listeners used by the controller. -# This is required if running in KRaft mode. -controller.listener.names=CONTROLLER - -# Hostname and port the broker will advertise to producers and consumers. If not set, -# it uses the value for "listeners" if configured. Otherwise, it will use the value -# returned from java.net.InetAddress.getCanonicalHostName(). -advertised.listeners=PLAINTEXT://{kafka_advertise_host}:{kafka_port} - -# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details -listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL - -# The number of threads that the server uses for receiving requests from the network and sending responses to the network -num.network.threads=3 - -# The number of threads that the server uses for processing requests, which may include disk I/O -num.io.threads=8 - -# The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes=102400 - -# The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes=102400 - -# The maximum size of a request that the socket server will accept (protection against OOM) -socket.request.max.bytes=104857600 - -############################# Log Basics ############################# - -# A comma separated list of directories under which to store log files -log.dirs={kafka_log_dirs} - -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. -num.partitions=1 - -# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. -# This value is recommended to be increased for installations with data dirs located in RAID array. -num.recovery.threads.per.data.dir=1 - -############################# Internal Topic Settings ############################# -# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" -# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. -offsets.topic.replication.factor=1 -transaction.state.log.replication.factor=1 -transaction.state.log.min.isr=1 - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk -#log.flush.interval.messages=10000 - -# The maximum amount of time a message can sit in a log before we force a flush -#log.flush.interval.ms=1000 - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion due to age -log.retention.hours=168 - -# A size-based retention policy for logs. Segments are pruned from the log unless the remaining -# segments drop below log.retention.bytes. Functions independently of log.retention.hours. -#log.retention.bytes=1073741824 - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1073741824 - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies -log.retention.check.interval.ms=300000 - -############################# Group Coordinator Settings ############################# - -# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. -# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. -# The default value for this is 3 seconds. -# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. -# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. -group.initial.rebalance.delay.ms=0 -"# - ) - } -} diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 4811ef96f5b87..88c1594fb1153 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -278,6 +278,7 @@ pub struct KafkaConfig { pub controller_port: u16, pub listen_address: String, + pub image: String, pub persist_data: bool, pub node_id: u32, diff --git a/src/risedevtool/src/task/kafka_service.rs b/src/risedevtool/src/task/kafka_service.rs index 293fcc6cecc82..52bdd227a72a4 100644 --- a/src/risedevtool/src/task/kafka_service.rs +++ b/src/risedevtool/src/task/kafka_service.rs @@ -12,108 +12,61 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::env; -use std::path::{Path, PathBuf}; -use std::process::Command; +use super::docker_service::{DockerService, DockerServiceConfig}; +use crate::KafkaConfig; -use anyhow::{anyhow, Result}; - -use super::{ExecuteContext, Task}; -use crate::{KafkaConfig, KafkaGen}; - -pub struct KafkaService { - config: KafkaConfig, -} - -impl KafkaService { - pub fn new(config: KafkaConfig) -> Result { - Ok(Self { config }) +impl DockerServiceConfig for KafkaConfig { + fn id(&self) -> String { + self.id.clone() } - fn kafka_path(&self) -> Result { - let prefix_bin = env::var("PREFIX_BIN")?; - Ok(Path::new(&prefix_bin) - .join("kafka") - .join("bin") - .join("kafka-server-start.sh")) + fn is_user_managed(&self) -> bool { + self.user_managed } - fn kafka(&self) -> Result { - Ok(Command::new(self.kafka_path()?)) + fn image(&self) -> String { + self.image.clone() } - /// Format kraft storage. This is a necessary step to start a fresh Kafka service. - fn kafka_storage_format(&self) -> Result { - let prefix_bin = env::var("PREFIX_BIN")?; - let path = Path::new(&prefix_bin) - .join("kafka") - .join("bin") - .join("kafka-storage.sh"); - - let mut cmd = Command::new(path); - cmd.arg("format").arg("-t").arg("risedev-kafka").arg("-c"); // the remaining arg is the path to the config file - Ok(cmd) + fn envs(&self) -> Vec<(String, String)> { + vec![ + ("KAFKA_NODE_ID".to_owned(), self.node_id.to_string()), + ( + "KAFKA_PROCESS_ROLES".to_owned(), + "controller,broker".to_owned(), + ), + ( + "KAFKA_LISTENERS".to_owned(), + "PLAINTEXT://:9092,CONTROLLER://:9093".to_owned(), + ), + ( + "KAFKA_ADVERTISED_LISTENERS".to_owned(), + format!("PLAINTEXT://{}:{}", self.address, self.port), + ), + ( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), + "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(), + ), + ( + "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(), + format!("{}@localhost:9093", self.node_id), + ), + ( + "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(), + "CONTROLLER".to_owned(), + ), + ("CLUSTER_ID".to_owned(), "RiseDevRiseDevRiseDev1".to_owned()), + ] } -} - -impl Task for KafkaService { - fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { - ctx.service(self); - - if self.config.user_managed { - ctx.pb.set_message("user managed"); - writeln!( - &mut ctx.log, - "Please start your Kafka at {}:{}\n\n", - self.config.address, self.config.port - )?; - return Ok(()); - } - - ctx.pb.set_message("starting..."); - let path = self.kafka_path()?; - if !path.exists() { - return Err(anyhow!("Kafka binary not found in {:?}\nDid you enable kafka feature in `./risedev configure`?", path)); - } - - let prefix_config = env::var("PREFIX_CONFIG")?; - - let path = if self.config.persist_data { - Path::new(&env::var("PREFIX_DATA")?).join(self.id()) - } else { - let path = Path::new("/tmp/risedev").join(self.id()); - fs_err::remove_dir_all(&path).ok(); - path - }; - fs_err::create_dir_all(&path)?; - - let config_path = Path::new(&prefix_config).join(format!("{}.properties", self.id())); - fs_err::write( - &config_path, - KafkaGen.gen_server_properties(&self.config, &path.to_string_lossy()), - )?; - - // Format storage if empty. - if path.read_dir()?.next().is_none() { - let mut cmd = self.kafka_storage_format()?; - cmd.arg(&config_path); - - ctx.pb.set_message("formatting storage..."); - ctx.run_command(cmd)?; - } - - let mut cmd = self.kafka()?; - cmd.arg(config_path); - ctx.pb.set_message("starting kafka..."); - ctx.run_command(ctx.tmux_run(cmd)?)?; - - ctx.pb.set_message("started"); - - Ok(()) + fn ports(&self) -> Vec<(String, String)> { + vec![(self.port.to_string(), "9092".to_owned())] } - fn id(&self) -> String { - self.config.id.clone() + fn data_path(&self) -> Option { + self.persist_data.then(|| "/var/lib/kafka/data".to_owned()) } } + +/// Docker-backed Kafka service. +pub type KafkaService = DockerService;