diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 60d2d8946717c..15274be94be9b 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -61,7 +61,8 @@ services: - "29092:29092" - "9092:9092" - "9644:9644" - - "8081:8081" + # Don't use Redpanda's schema registry, use the separated service instead + # - "8081:8081" environment: {} container_name: message_queue healthcheck: @@ -89,6 +90,7 @@ services: - mysql - db - message_queue + - schemaregistry - elasticsearch - clickhouse-server - redis-server diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 206ce4ba1d75d..7cab1ae1f76f7 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -154,16 +154,16 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive rpk topic create test-rw-sink-append-only-protobuf rpk topic create test-rw-sink-append-only-protobuf-csr-a rpk topic create test-rw-sink-append-only-protobuf-csr-hi -python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto -python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto +python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto +python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' rpk topic delete test-rw-sink-append-only-protobuf rpk topic delete test-rw-sink-append-only-protobuf-csr-a rpk topic delete test-rw-sink-append-only-protobuf-csr-hi echo "testing avro" -python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc -python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' +python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc +python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' rpk topic create test-rw-sink-upsert-avro sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt' rpk topic delete test-rw-sink-upsert-avro diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 5127731256c6b..35b7965f12bb3 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -137,11 +137,11 @@ export RISINGWAVE_CI=true RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe python3 -m pip install --break-system-packages requests protobuf confluent-kafka -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 20 user +python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user echo "make sure google/protobuf/source_context.proto is NOT in schema registry" -curl --silent 'http://message_queue:8081/subjects'; echo -# curl --silent --head -X GET 'http://message_queue:8081/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 -curl --silent 'http://message_queue:8081/subjects' | grep -v 'google/protobuf/source_context.proto' +curl --silent 'http://schemaregistry:8082/subjects'; echo +# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 +curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto' risedev slt './e2e_test/schema_registry/pb.slt' risedev slt './e2e_test/schema_registry/alter_sr.slt' diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt index 8daf41d87b633..d703c0401a35e 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/schema_registry/alter_sr.slt @@ -9,7 +9,7 @@ CREATE SOURCE src_user WITH ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.User' ); @@ -24,7 +24,7 @@ CREATE TABLE t_user WITH ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.User' ); @@ -36,7 +36,7 @@ SELECT age FROM t_user; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields sleep 5s @@ -58,7 +58,7 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields sleep 5s diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt index d9c0edca1b21c..7b60b4fa8d7a4 100644 --- a/e2e_test/schema_registry/pb.slt +++ b/e2e_test/schema_registry/pb.slt @@ -9,7 +9,7 @@ create table sr_pb_test with ( properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') FORMAT plain ENCODE protobuf( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.User' ); @@ -21,7 +21,7 @@ create table sr_pb_test_bk with ( properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') FORMAT plain ENCODE protobuf( - schema.registry = 'http://message_queue:8081,http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082', message = 'test.User' ); diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index d9fa53bc589ac..1cf27b811d9be 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -6,7 +6,7 @@ with ( topic = 'test-rw-sink-upsert-avro', properties.bootstrap.server = 'message_queue:29092') format upsert encode avro ( - schema.registry = 'http://message_queue:8081'); + schema.registry = 'http://schemaregistry:8082'); statement ok create table into_kafka ( @@ -40,7 +40,7 @@ create sink sink0 from into_kafka with ( properties.bootstrap.server = 'message_queue:29092', primary_key = 'int32_field,string_field') format upsert encode avro ( - schema.registry = 'http://message_queue:8081'); + schema.registry = 'http://schemaregistry:8082'); sleep 2s @@ -72,7 +72,7 @@ create sink sink_err from into_kafka with ( properties.bootstrap.server = 'message_queue:29092', primary_key = 'int32_field,string_field') format upsert encode avro ( - schema.registry = 'http://message_queue:8081'); + schema.registry = 'http://schemaregistry:8082'); statement error field not in avro create sink sink_err as select 1 as extra_column, * from into_kafka with ( @@ -81,7 +81,7 @@ create sink sink_err as select 1 as extra_column, * from into_kafka with ( properties.bootstrap.server = 'message_queue:29092', primary_key = 'int32_field,string_field') format upsert encode avro ( - schema.registry = 'http://message_queue:8081'); + schema.registry = 'http://schemaregistry:8082'); statement error unrecognized create sink sink_err from into_kafka with ( @@ -90,7 +90,7 @@ create sink sink_err from into_kafka with ( properties.bootstrap.server = 'message_queue:29092', primary_key = 'int32_field,string_field') format upsert encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'typo'); statement error empty field key.message @@ -100,7 +100,7 @@ create sink sink_err from into_kafka with ( properties.bootstrap.server = 'message_queue:29092', primary_key = 'int32_field,string_field') format upsert encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'record_name_strategy'); statement ok diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 0c74cc8a0b369..c3f6f0d3ad8e2 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -13,7 +13,7 @@ create table from_kafka_csr_trivial with ( topic = 'test-rw-sink-append-only-protobuf-csr-a', properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.package.MessageA'); statement ok @@ -22,7 +22,7 @@ create table from_kafka_csr_nested with ( topic = 'test-rw-sink-append-only-protobuf-csr-hi', properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.package.MessageH.MessageI'); statement ok @@ -68,7 +68,7 @@ create sink sink_csr_trivial as select string_field as field_a from into_kafka w properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.package.MessageA'); statement ok @@ -78,7 +78,7 @@ create sink sink_csr_nested as select sint32_field as field_i from into_kafka wi properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', message = 'test.package.MessageH.MessageI'); sleep 2s diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index 12626b6926fdf..f143471e0f269 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -1,3 +1,6 @@ +control substitution on + +# FIXME: does this really work?? # Start with nosim to avoid running in deterministic test @@ -7,18 +10,18 @@ CREATE TABLE upsert_avro_json_default_key ( primary key (rw_key) ) INCLUDE KEY AS rw_key WITH ( connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', + properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'upsert_avro_json') -FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); +FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok CREATE TABLE upsert_student_avro_json ( primary key (rw_key) ) INCLUDE KEY AS rw_key WITH ( connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', + properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'upsert_student_avro_json') -FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); +FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); # TODO: Uncomment this when we add test data kafka key with format `"ID":id` @@ -28,35 +31,35 @@ FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); # ) # WITH ( # connector = 'kafka', -# properties.bootstrap.server = 'message_queue:29092', +# properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', # topic = 'upsert_avro_json') -# FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); +# FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with ( connector = 'kafka', kafka.topic = 'debezium_non_compact_avro_json', - kafka.brokers = 'message_queue:29092', + kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', kafka.scan.startup.mode = 'earliest' -) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081'); +) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with ( connector = 'kafka', kafka.topic = 'debezium_compact_avro_json', - kafka.brokers = 'message_queue:29092', + kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', kafka.scan.startup.mode = 'earliest' -) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = 'http://message_queue:8081'); +) FORMAT DEBEZIUM ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok CREATE TABLE kafka_json_schema_plain with ( connector = 'kafka', kafka.topic = 'kafka_json_schema', - kafka.brokers = 'message_queue:29092', + kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', kafka.scan.startup.mode = 'earliest' -) FORMAT PLAIN ENCODE JSON (schema.registry = 'http://schemaregistry:8082'); +) FORMAT PLAIN ENCODE JSON (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key)) @@ -64,9 +67,9 @@ INCLUDE KEY AS rw_key with ( connector = 'kafka', kafka.topic = 'kafka_upsert_json_schema', - kafka.brokers = 'message_queue:29092', + kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', kafka.scan.startup.mode = 'earliest' -) FORMAT UPSERT ENCODE JSON (schema.registry = 'http://schemaregistry:8082'); +) FORMAT UPSERT ENCODE JSON (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok flush; diff --git a/e2e_test/source/basic/schema_registry.slt b/e2e_test/source/basic/schema_registry.slt index 76f867b2b1d0e..4673e441e80c6 100644 --- a/e2e_test/source/basic/schema_registry.slt +++ b/e2e_test/source/basic/schema_registry.slt @@ -5,7 +5,7 @@ create source s1 () with ( topic = 'upsert_avro_json-record', properties.bootstrap.server = 'message_queue:29092' ) format plain encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'no sense', message = 'CPLM.OBJ_ATTRIBUTE_VALUE', ); @@ -17,7 +17,7 @@ create source s1 () with ( topic = 'upsert_avro_json-record', properties.bootstrap.server = 'message_queue:29092' ) format plain encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'record_name_strategy', message = 'CPLM.OBJ_ATTRIBUTE_VALUE', key.message = 'string' @@ -29,7 +29,7 @@ create source s1 () with ( topic = 'upsert_avro_json-record', properties.bootstrap.server = 'message_queue:29092' ) format plain encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'record_name_strategy', message = 'CPLM.OBJ_ATTRIBUTE_VALUE', ); @@ -41,7 +41,7 @@ create table t1 () with ( topic = 'upsert_avro_json-topic-record', properties.bootstrap.server = 'message_queue:29092' ) format upsert encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'topic_record_name_strategy', message = 'CPLM.OBJ_ATTRIBUTE_VALUE' ); @@ -54,7 +54,7 @@ with ( topic = 'upsert_avro_json-topic-record', properties.bootstrap.server = 'message_queue:29092' ) format upsert encode avro ( - schema.registry = 'http://message_queue:8081', + schema.registry = 'http://schemaregistry:8082', schema.registry.name.strategy = 'topic_record_name_strategy', message = 'CPLM.OBJ_ATTRIBUTE_VALUE', key.message = 'string' diff --git a/risedev.yml b/risedev.yml index 65f84882c682c..ea1c6f3bc6c83 100644 --- a/risedev.yml +++ b/risedev.yml @@ -861,6 +861,10 @@ profile: user-managed: true address: message_queue port: 29092 + - use: schema-registry + user-managed: true + address: schemaregistry + port: 8082 ci-inline-source-test: config-path: src/config/ci-recovery.toml @@ -1431,9 +1435,8 @@ template: # Listen port of KRaft controller controller-port: 29093 - - # Listen address - listen-address: ${address} + # Listen port for other services in docker (schema-registry) + docker-port: 29094 # The docker image. Can be overridden to use a different version. image: "confluentinc/cp-kafka:7.6.1" @@ -1446,6 +1449,23 @@ template: user-managed: false + schema-registry: + # Id to be picked-up by services + id: schema-registry-${port} + + # Advertise address + address: "127.0.0.1" + + # Listen port of Schema Registry + port: 8081 + + # The docker image. Can be overridden to use a different version. + image: "confluentinc/cp-schema-registry:7.6.1" + + user-managed: false + + provide-kafka: "kafka*" + # Google pubsub emulator service pubsub: id: pubsub-${port} diff --git a/scripts/source/prepare_ci_kafka.sh b/scripts/source/prepare_ci_kafka.sh index e50229a73759f..9f3e2f473ca9b 100755 --- a/scripts/source/prepare_ci_kafka.sh +++ b/scripts/source/prepare_ci_kafka.sh @@ -56,7 +56,7 @@ for filename in $kafka_data_files; do if [[ "$topic" = *bin ]]; then kcat -P -b message_queue:29092 -t "$topic" "$filename" elif [[ "$topic" = *avro_json ]]; then - python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" "$filename" "topic" "avro" + python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" "$filename" "topic" "avro" elif [[ "$topic" = *json_schema ]]; then python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" "$filename" "topic" "json" else @@ -72,9 +72,9 @@ for i in {0..100}; do echo "key$i:{\"a\": $i}" | kcat -P -b message_queue:29092 # write schema with name strategy ## topic: upsert_avro_json-record, key subject: string, value subject: CPLM.OBJ_ATTRIBUTE_VALUE -(python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" source/test_data/upsert_avro_json.1 "record" "avro") & +(python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" source/test_data/upsert_avro_json.1 "record" "avro") & ## topic: upsert_avro_json-topic-record, ## key subject: upsert_avro_json-topic-record-string ## value subject: upsert_avro_json-topic-record-CPLM.OBJ_ATTRIBUTE_VALUE -(python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" source/test_data/upsert_avro_json.1 "topic-record" "avro") & +(python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" source/test_data/upsert_avro_json.1 "topic-record" "avro") & wait diff --git a/src/risedevtool/src/bin/risedev-compose.rs b/src/risedevtool/src/bin/risedev-compose.rs index ec805a840fa71..5ff56916deca6 100644 --- a/src/risedevtool/src/bin/risedev-compose.rs +++ b/src/risedevtool/src/bin/risedev-compose.rs @@ -219,9 +219,10 @@ fn main() -> Result<()> { volumes.insert(c.id.clone(), ComposeVolume::default()); (c.address.clone(), c.compose(&compose_config)?) } - ServiceConfig::Redis(_) | ServiceConfig::MySql(_) | ServiceConfig::Postgres(_) => { - return Err(anyhow!("not supported")) - } + ServiceConfig::Redis(_) + | ServiceConfig::MySql(_) + | ServiceConfig::Postgres(_) + | ServiceConfig::SchemaRegistry(_) => return Err(anyhow!("not supported")), }; compose.container_name = service.id().to_string(); if opts.deploy { diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 5a7ab843ddae2..8dbe155bcd086 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -27,8 +27,8 @@ use risedev::{ generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander, ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService, GrafanaService, KafkaService, MetaNodeService, MinioService, MySqlService, PostgresService, - PrometheusService, PubsubService, RedisService, ServiceConfig, SqliteConfig, Task, - TempoService, RISEDEV_NAME, + PrometheusService, PubsubService, RedisService, SchemaRegistryService, ServiceConfig, + SqliteConfig, Task, TempoService, RISEDEV_NAME, }; use tempfile::tempdir; use thiserror_ext::AsReport; @@ -279,6 +279,18 @@ fn task_main( ctx.pb .set_message(format!("kafka {}:{}", c.address, c.port)); } + ServiceConfig::SchemaRegistry(c) => { + let mut ctx = + ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); + let mut service = SchemaRegistryService::new(c.clone()); + service.execute(&mut ctx)?; + let mut task = + risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; + task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("schema registry http://{}:{}", c.address, c.port)); + } + ServiceConfig::Pubsub(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); diff --git a/src/risedevtool/src/config.rs b/src/risedevtool/src/config.rs index 839ebc22486ee..bf768f8e68cd1 100644 --- a/src/risedevtool/src/config.rs +++ b/src/risedevtool/src/config.rs @@ -175,6 +175,9 @@ impl ConfigExpander { "redpanda" => ServiceConfig::RedPanda(serde_yaml::from_str(&out_str)?), "mysql" => ServiceConfig::MySql(serde_yaml::from_str(&out_str)?), "postgres" => ServiceConfig::Postgres(serde_yaml::from_str(&out_str)?), + "schema-registry" => { + ServiceConfig::SchemaRegistry(serde_yaml::from_str(&out_str)?) + } other => return Err(anyhow!("unsupported use type: {}", other)), }; Ok(result) diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index a45864f097854..2b6cc367b2e71 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -77,6 +77,15 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap(); writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); } + ServiceConfig::SchemaRegistry(c) => { + let address = &c.address; + let port = &c.port; + writeln!( + env, + r#"RISEDEV_SCHEMA_REGISTRY_URL="http://{address}:{port}""#, + ) + .unwrap(); + } ServiceConfig::MySql(c) => { let host = &c.address; let port = &c.port; diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 88c1594fb1153..71461b0f58bcc 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -271,12 +271,16 @@ pub struct KafkaConfig { phantom_use: Option, pub id: String, + /// Advertise address pub address: String, #[serde(with = "string")] pub port: u16, + /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host + /// need to connect to `localhost`. + pub docker_port: u16, + #[serde(with = "string")] pub controller_port: u16, - pub listen_address: String, pub image: String, pub persist_data: bool, @@ -284,6 +288,28 @@ pub struct KafkaConfig { pub user_managed: bool, } + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +#[serde(deny_unknown_fields)] +pub struct SchemaRegistryConfig { + #[serde(rename = "use")] + phantom_use: Option, + + pub id: String, + + pub address: String, + #[serde(with = "string")] + pub port: u16, + + pub provide_kafka: Option>, + + pub image: String, + /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry + /// to use with redpanda. + pub user_managed: bool, +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] #[serde(deny_unknown_fields)] @@ -380,6 +406,7 @@ pub enum ServiceConfig { Opendal(OpendalConfig), AwsS3(AwsS3Config), Kafka(KafkaConfig), + SchemaRegistry(SchemaRegistryConfig), Pubsub(PubsubConfig), Redis(RedisConfig), RedPanda(RedPandaConfig), @@ -407,10 +434,12 @@ impl ServiceConfig { Self::RedPanda(c) => &c.id, Self::Opendal(c) => &c.id, Self::MySql(c) => &c.id, - ServiceConfig::Postgres(c) => &c.id, + Self::Postgres(c) => &c.id, + Self::SchemaRegistry(c) => &c.id, } } + /// Used to check whether the port is occupied before running the service. pub fn port(&self) -> Option { match self { Self::ComputeNode(c) => Some(c.port), @@ -430,7 +459,8 @@ impl ServiceConfig { Self::RedPanda(_c) => None, Self::Opendal(_) => None, Self::MySql(c) => Some(c.port), - ServiceConfig::Postgres(c) => Some(c.port), + Self::Postgres(c) => Some(c.port), + Self::SchemaRegistry(c) => Some(c.port), } } @@ -454,6 +484,7 @@ impl ServiceConfig { Self::Opendal(_c) => false, Self::MySql(c) => c.user_managed, Self::Postgres(c) => c.user_managed, + Self::SchemaRegistry(c) => c.user_managed, } } } diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index e34cddd908b7f..21b6f20eec5ee 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -29,6 +29,7 @@ mod postgres_service; mod prometheus_service; mod pubsub_service; mod redis_service; +mod schema_registry_service; mod task_configure_minio; mod task_etcd_ready_check; mod task_kafka_ready_check; @@ -68,6 +69,7 @@ pub use self::postgres_service::*; pub use self::prometheus_service::*; pub use self::pubsub_service::*; pub use self::redis_service::*; +pub use self::schema_registry_service::SchemaRegistryService; pub use self::task_configure_minio::*; pub use self::task_etcd_ready_check::*; pub use self::task_kafka_ready_check::*; diff --git a/src/risedevtool/src/task/docker_service.rs b/src/risedevtool/src/task/docker_service.rs index 58ff2b59648c0..b87ee8a6a8aef 100644 --- a/src/risedevtool/src/task/docker_service.rs +++ b/src/risedevtool/src/task/docker_service.rs @@ -100,7 +100,9 @@ where cmd.arg("run") .arg("--rm") .arg("--name") - .arg(format!("risedev-{}", self.id())); + .arg(format!("risedev-{}", self.id())) + .arg("--add-host") + .arg("host.docker.internal:host-gateway"); for (k, v) in self.config.envs() { cmd.arg("-e").arg(format!("{k}={v}")); diff --git a/src/risedevtool/src/task/kafka_service.rs b/src/risedevtool/src/task/kafka_service.rs index 52bdd227a72a4..7c415b6d9749a 100644 --- a/src/risedevtool/src/task/kafka_service.rs +++ b/src/risedevtool/src/task/kafka_service.rs @@ -37,15 +37,18 @@ impl DockerServiceConfig for KafkaConfig { ), ( "KAFKA_LISTENERS".to_owned(), - "PLAINTEXT://:9092,CONTROLLER://:9093".to_owned(), + "HOST://:9092,CONTROLLER://:9093,DOCKER://:9094".to_owned(), ), ( "KAFKA_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://{}:{}", self.address, self.port), + format!( + "HOST://{}:{},DOCKER://host.docker.internal:{}", + self.address, self.port, self.docker_port + ), ), ( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), - "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(), + "HOST:PLAINTEXT,CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT".to_owned(), ), ( "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(), @@ -55,12 +58,19 @@ impl DockerServiceConfig for KafkaConfig { "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(), "CONTROLLER".to_owned(), ), + ( + "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(), + "HOST".to_owned(), + ), ("CLUSTER_ID".to_owned(), "RiseDevRiseDevRiseDev1".to_owned()), ] } fn ports(&self) -> Vec<(String, String)> { - vec![(self.port.to_string(), "9092".to_owned())] + vec![ + (self.port.to_string(), "9092".to_owned()), + (self.docker_port.to_string(), "9094".to_owned()), + ] } fn data_path(&self) -> Option { diff --git a/src/risedevtool/src/task/schema_registry_service.rs b/src/risedevtool/src/task/schema_registry_service.rs new file mode 100644 index 0000000000000..5c5eba4fa8f35 --- /dev/null +++ b/src/risedevtool/src/task/schema_registry_service.rs @@ -0,0 +1,65 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::docker_service::{DockerService, DockerServiceConfig}; +use crate::SchemaRegistryConfig; + +impl DockerServiceConfig for SchemaRegistryConfig { + fn id(&self) -> String { + self.id.clone() + } + + fn is_user_managed(&self) -> bool { + self.user_managed + } + + fn image(&self) -> String { + self.image.clone() + } + + fn envs(&self) -> Vec<(String, String)> { + // https://docs.confluent.io/platform/current/installation/docker/config-reference.html#sr-long-configuration + // https://docs.confluent.io/platform/current/schema-registry/installation/config.html + let kafka = self + .provide_kafka + .as_ref() + .expect("Kafka is required for Schema Registry"); + if kafka.len() != 1 { + panic!("More than one Kafka is not supported yet"); + } + let kafka = &kafka[0]; + vec![ + ("SCHEMA_REGISTRY_HOST_NAME".to_owned(), self.address.clone()), + ( + "SCHEMA_REGISTRY_LISTENERS".to_owned(), + format!("http://{}:{}", self.address, self.port), + ), + ( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS".to_owned(), + format!("host.docker.internal:{}", kafka.docker_port), + ), + ] + } + + fn ports(&self) -> Vec<(String, String)> { + vec![(self.port.to_string(), "8081".to_owned())] + } + + fn data_path(&self) -> Option { + None + } +} + +/// Docker-backed Schema Registry service. +pub type SchemaRegistryService = DockerService;