Skip to content

Commit

Permalink
feat(risedev): deprecate zookeeper & use kraft mode for kafka service (
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao authored May 7, 2024
1 parent 48110e1 commit c12bf3c
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 278 deletions.
18 changes: 1 addition & 17 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -697,32 +697,19 @@ wait_kafka_exit() {
done
}
wait_zookeeper_exit() {
# Follow zookeeper-server-stop.sh
while [[ -n "$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')" ]]; do
echo "Waiting for zookeeper to exit"
sleep 1
done
}
kill_kafka() {
${PREFIX_BIN}/kafka/bin/kafka-server-stop.sh
wait_kafka_exit
}
kill_zookeeper() {
${PREFIX_BIN}/kafka/bin/zookeeper-server-stop.sh
wait_zookeeper_exit
}
if ! ${TMUX} ls &>/dev/null ; then
echo "No risedev cluster to kill. Exiting..."
exit 0
fi
# Kill other components with Ctrl+C/Ctrl+D
${TMUX} list-windows -F "#{window_name} #{pane_id}" \
| grep --invert-match --extended-regexp '(kafka|zookeeper)' \
| grep --invert-match --extended-regexp '(kafka)' \
| awk '{ print $2 }' \
| xargs -I {} ${TMUX} send-keys -t {} C-c C-d
Expand All @@ -738,9 +725,6 @@ if [[ -n $(${TMUX} list-windows | grep kafka) ]];
then
echo "kill kafka"
kill_kafka || true
echo "kill zookeeper"
kill_zookeeper || true
fi
${TMUX} kill-server
Expand Down
8 changes: 1 addition & 7 deletions docs/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,10 @@ For example, you can modify the default section to:
- use: frontend
- use: prometheus
- use: grafana
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true
```
> [!NOTE]
>
> The Kafka service depends on the ZooKeeper service. If you want to enable the Kafka component, enable the ZooKeeper component first.
Now you can run `./risedev d` to start a new dev cluster. The new dev cluster will contain components as configured in the yaml file. RiseDev will automatically configure the components to use the available storage service and to monitor the target.

You may also add multiple compute nodes in the cluster. The `ci-3cn-1fe` config is an example.
Expand Down Expand Up @@ -566,4 +560,4 @@ Instructions about submitting PRs are included in the [contribution guidelines](
These correspond to its `depends` field in `pull-request.yml` and `main-cron.yml` .
2. Add `ci/run-e2e-test` to run the step as well.
3. Add `ci/run-main-cron` to run `main-cron` workflow in your pull request,
4. Add `ci/main-cron/skip-ci` to skip all other steps which were not selected with `ci/run-xxx`.
4. Add `ci/main-cron/skip-ci` to skip all other steps which were not selected with `ci/run-xxx`.
42 changes: 6 additions & 36 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ profile:
# - use: compactor

# If you want to create source from Kafka, uncomment the following lines
# Note that kafka depends on zookeeper, so zookeeper must be started beforehand.
# - use: zookeeper
# persist-data: true
# - use: kafka
# persist-data: true

Expand Down Expand Up @@ -113,8 +110,6 @@ profile:
- use: compactor
- use: prometheus
- use: grafana
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true

Expand All @@ -132,8 +127,6 @@ profile:
user-managed: true
- use: prometheus
- use: grafana
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true

Expand Down Expand Up @@ -268,8 +261,6 @@ profile:
remote-write-region: "ap-southeast-1"
remote-write-url: "https://aps-workspaces.ap-southeast-1.amazonaws.com/workspaces/ws-f3841dad-6a5c-420f-8f62-8f66487f512a/api/v1/remote_write"
- use: grafana
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true

Expand Down Expand Up @@ -407,10 +398,8 @@ profile:
- use: compactor
- use: prometheus
- use: grafana
# Do not use kafka and zookeeper here, we will spawn it separately,
# Do not use kafka here, we will spawn it separately,
# so we don't have to re-generate data each time.
# - use: zookeeper
# persist-data: true
# RW will still be ale to talk to it.
# - use: kafka
# port: 9092
Expand Down Expand Up @@ -916,8 +905,6 @@ profile:
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true

Expand Down Expand Up @@ -1375,17 +1362,17 @@ template:
# Listen port of Kafka
port: 29092

# Listen port of KRaft controller
controller-port: 29093

# Listen address
listen-address: ${address}

# ZooKeeper used by this Kafka instance
provide-zookeeper: "zookeeper*"

# If set to true, data will be persisted at data/{id}.
persist-data: true

# Kafka broker id. If there are multiple instances of Kafka, we will need to set.
broker-id: 0
# Kafka node id. If there are multiple instances of Kafka, we will need to set.
node-id: 0

user-managed: false

Expand All @@ -1399,23 +1386,6 @@ template:

persist-data: true

# Apache ZooKeeper service
zookeeper:
# Id to be picked-up by services
id: zookeeper-${port}

# Advertise address of ZooKeeper
address: "127.0.0.1"

# Listen address
listen-address: ${address}

# Listen port of ZooKeeper
port: 2181

# If set to true, data will be persisted at data/{id}.
persist-data: true

# Only supported in RiseDev compose
redpanda:
# Id to be picked-up by services
Expand Down
3 changes: 0 additions & 3 deletions src/risedevtool/src/bin/risedev-compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ fn main() -> Result<()> {
ServiceConfig::Pubsub(_) => {
return Err(anyhow!("not supported, please use redpanda instead"))
}
ServiceConfig::ZooKeeper(_) => {
return Err(anyhow!("not supported, please use redpanda instead"))
}
ServiceConfig::Opendal(_) => continue,
ServiceConfig::AwsS3(_) => continue,
ServiceConfig::RedPanda(c) => {
Expand Down
14 changes: 1 addition & 13 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use risedev::{
generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander,
ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService,
GrafanaService, KafkaService, MetaNodeService, MinioService, MySqlService, PrometheusService,
PubsubService, RedisService, ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService,
RISEDEV_NAME,
PubsubService, RedisService, ServiceConfig, SqliteConfig, Task, TempoService, RISEDEV_NAME,
};
use tempfile::tempdir;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -272,17 +271,6 @@ fn task_main(
ctx.pb
.set_message(format!("using Opendal, namenode = {}", c.namenode));
}
ServiceConfig::ZooKeeper(c) => {
let mut ctx =
ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
let mut service = ZooKeeperService::new(c.clone())?;
service.execute(&mut ctx)?;
let mut task =
risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?;
task.execute(&mut ctx)?;
ctx.pb
.set_message(format!("zookeeper {}:{}", c.address, c.port));
}
ServiceConfig::Kafka(c) => {
let mut ctx =
ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
Expand Down
1 change: 0 additions & 1 deletion src/risedevtool/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ impl ConfigExpander {
"kafka" => ServiceConfig::Kafka(serde_yaml::from_str(&out_str)?),
"pubsub" => ServiceConfig::Pubsub(serde_yaml::from_str(&out_str)?),
"redis" => ServiceConfig::Redis(serde_yaml::from_str(&out_str)?),
"zookeeper" => ServiceConfig::ZooKeeper(serde_yaml::from_str(&out_str)?),
"redpanda" => ServiceConfig::RedPanda(serde_yaml::from_str(&out_str)?),
"mysql" => ServiceConfig::MySql(serde_yaml::from_str(&out_str)?),
other => return Err(anyhow!("unsupported use type: {}", other)),
Expand Down
2 changes: 0 additions & 2 deletions src/risedevtool/src/config_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ mod prometheus_gen;
pub use prometheus_gen::*;
mod grafana_gen;
pub use grafana_gen::*;
mod zookeeper_gen;
pub use zookeeper_gen::*;
mod kafka_gen;
pub use kafka_gen::*;
mod tempo_gen;
Expand Down
44 changes: 17 additions & 27 deletions src/risedevtool/src/config_gen/kafka_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;

use crate::KafkaConfig;

pub struct KafkaGen;
Expand All @@ -23,15 +21,10 @@ impl KafkaGen {
let kafka_listen_host = &config.listen_address;
let kafka_advertise_host = &config.address;
let kafka_port = &config.port;
let zookeeper_hosts = config
.provide_zookeeper
.as_ref()
.unwrap()
.iter()
.map(|node| format!("{}:{}", node.address, node.port))
.join(",");
let kafka_broker_id = config.broker_id;
let controller_port = &config.controller_port;
let kafka_node_id = config.node_id;

// https://github.com/apache/kafka/blob/trunk/config/kraft/server.properties
format!(
r#"# --- THIS FILE IS AUTO GENERATED BY RISEDEV ---
Expand All @@ -54,8 +47,14 @@ impl KafkaGen {
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id={kafka_broker_id}
# The role of this server. Setting this puts us in KRaft mode
process.roles=controller,broker
# The node id associated with this instance's roles
node.id={kafka_node_id}
# The connect string for the controller quorum
controller.quorum.voters={kafka_node_id}@{kafka_advertise_host}:{controller_port}
############################# Socket Server Settings #############################
Expand All @@ -65,15 +64,19 @@ broker.id={kafka_broker_id}
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://{kafka_listen_host}:{kafka_port}
listeners=PLAINTEXT://{kafka_listen_host}:{kafka_port},CONTROLLER://{kafka_listen_host}:{controller_port}
# A comma-separated list of the names of the listeners used by the controller.
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://{kafka_advertise_host}:{kafka_port}
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
Expand Down Expand Up @@ -149,19 +152,6 @@ log.segment.bytes=1073741824
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect={zookeeper_hosts}
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
Expand Down
59 changes: 0 additions & 59 deletions src/risedevtool/src/config_gen/zookeeper_gen.rs

This file was deleted.

Loading

0 comments on commit c12bf3c

Please sign in to comment.