Skip to content

Commit

Permalink
feat(standalone): add compactor (#12623)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 4, 2023
1 parent 1546a33 commit 194b606
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 53 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fi
cluster_start() {
if [[ $mode == "standalone" ]]; then
start_standalone "$PREFIX_LOG"/standalone.log &
cargo make ci-start standalone-minio-etcd-compactor
cargo make ci-start standalone-minio-etcd
else
cargo make ci-start "$mode"
fi
Expand Down
10 changes: 9 additions & 1 deletion ci/scripts/standalone-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ export RW_PREFIX=$PWD/.risingwave
export PREFIX_BIN=$RW_PREFIX/bin
export PREFIX_LOG=$RW_PREFIX/log

# You can fill up this section by consulting
# .risingwave/log/risedev.log, after calling `./risedev d full`.
# It is expected that `minio`, `etcd` will be started after this is called.
start_standalone() {
RUST_BACKTRACE=1 \
"$PREFIX_BIN"/risingwave/standalone \
Expand Down Expand Up @@ -35,7 +38,12 @@ start_standalone() {
--advertise-addr 127.0.0.1:4566 \
--prometheus-listener-addr 127.0.0.1:2222 \
--health-check-listener-addr 127.0.0.1:6786 \
--meta-addr http://127.0.0.1:5690" >"$1" 2>&1
--meta-addr http://127.0.0.1:5690" \
--compactor-opts=" \
--listen-addr 127.0.0.1:6660 \
--prometheus-listener-addr 127.0.0.1:1260 \
--advertise-addr 127.0.0.1:6660 \
--meta-address http://127.0.0.1:5690" >"$1" 2>&1
}

stop_standalone() {
Expand Down
19 changes: 2 additions & 17 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ profile:
- use: frontend
user-managed: true
- use: compactor
user-managed: true
- use: prometheus
- use: grafana
- use: zookeeper
Expand All @@ -145,7 +146,7 @@ profile:
persist-data: true
- use: connector-node

standalone-minio-etcd-compactor:
standalone-minio-etcd:
steps:
- use: minio
- use: etcd
Expand All @@ -156,23 +157,7 @@ profile:
- use: frontend
user-managed: true
- use: compactor

standalone-full-peripherals-without-kafka:
steps:
- use: minio
- use: etcd
- use: meta-node
user-managed: true
- use: compute-node
user-managed: true
- use: frontend
user-managed: true
- use: compactor
- use: prometheus
- use: grafana
- use: zookeeper
persist-data: true
- use: connector-node

hdfs:
steps:
Expand Down
18 changes: 8 additions & 10 deletions src/cmd_all/scripts/e2e-full-standalone-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ echo "--- Starting standalone cluster"
./risedev standalone-demo-full >"$LOG_PREFIX"/standalone.log 2>&1 &
STANDALONE_PID=$!

# Wait for rw cluster to finish startup
sleep 10

# Make sure the env file is present
set +e
while [ ! -f "$RW_PREFIX"/config/risedev-env ]; do
echo "Waiting for risedev-env to be configured."
sleep 1
done
set -e
sleep 15

# FIXME: Integrate standalone into risedev, so we can reuse risedev-env functionality here.
cat << EOF > "$RW_PREFIX"/config/risedev-env
RW_META_ADDR="http://0.0.0.0:5690"
RW_FRONTEND_LISTEN_ADDRESS="0.0.0.0"
RW_FRONTEND_PORT="4566"
EOF

echo "--- Setting up table"
./risedev psql -c "
Expand Down
7 changes: 6 additions & 1 deletion src/cmd_all/scripts/standalone-demo-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ cargo run -p risingwave_cmd_all \
--advertise-addr 127.0.0.1:4566 \
--prometheus-listener-addr 127.0.0.1:2222 \
--health-check-listener-addr 127.0.0.1:6786 \
--meta-addr http://127.0.0.1:5690"
--meta-addr http://127.0.0.1:5690" \
--compactor-opts=" \
--listen-addr 127.0.0.1:6660 \
--prometheus-listener-addr 127.0.0.1:1260 \
--advertise-addr 127.0.0.1:6660 \
--meta-address http://127.0.0.1:5690"
7 changes: 6 additions & 1 deletion src/cmd_all/scripts/standalone-demo-full.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ start_standalone() {
--advertise-addr 127.0.0.1:4566 \
--prometheus-listener-addr 127.0.0.1:2222 \
--health-check-listener-addr 127.0.0.1:6786 \
--meta-addr http://127.0.0.1:5690"
--meta-addr http://127.0.0.1:5690" \
--compactor-opts=" \
--listen-addr 127.0.0.1:6660 \
--prometheus-listener-addr 127.0.0.1:1260 \
--advertise-addr 127.0.0.1:6660 \
--meta-address http://127.0.0.1:5690"
}

start_standalone
76 changes: 54 additions & 22 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,45 @@ pub struct StandaloneOpts {
#[clap(short, long, env = "STANDALONE_FRONTEND_OPTS", default_value = "")]
/// Frontend node options
frontend_opts: String,

#[clap(long, env = "STANDALONE_COMPACTOR_OPTS", default_value = "")]
/// Frontend node options
compactor_opts: String,
}

fn parse_opt_args(opts: &StandaloneOpts) -> (Vec<String>, Vec<String>, Vec<String>) {
#[derive(Debug)]
pub struct ParsedStandaloneOpts {
pub meta_opts: Vec<String>,
pub compute_opts: Vec<String>,
pub frontend_opts: Vec<String>,
pub compactor_opts: Vec<String>,
}

fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
let meta_opts = split(&opts.meta_opts).unwrap();
let compute_opts = split(&opts.compute_opts).unwrap();
let frontend_opts = split(&opts.frontend_opts).unwrap();
(meta_opts, compute_opts, frontend_opts)
let compactor_opts = split(&opts.compactor_opts).unwrap();
ParsedStandaloneOpts {
meta_opts,
compute_opts,
frontend_opts,
compactor_opts,
}
}

fn get_services(opts: &StandaloneOpts) -> Vec<RisingWaveService> {
let (meta_opts, compute_opts, frontend_opts) = parse_opt_args(opts);
let ParsedStandaloneOpts {
meta_opts,
compute_opts,
frontend_opts,
compactor_opts,
} = parse_opt_args(opts);
let services = vec![
RisingWaveService::Meta(osstrs(meta_opts)),
RisingWaveService::Compute(osstrs(compute_opts)),
RisingWaveService::Frontend(osstrs(frontend_opts)),
RisingWaveService::Compactor(osstrs(compactor_opts)),
];
services
}
Expand Down Expand Up @@ -83,8 +107,12 @@ pub async fn standalone(opts: StandaloneOpts) -> Result<()> {
let _frontend_handle =
tokio::spawn(async move { risingwave_frontend::start(opts).await });
}
RisingWaveService::Compactor(_) => {
panic!("Compactor node unsupported in Risingwave standalone mode.");
RisingWaveService::Compactor(mut opts) => {
opts.insert(0, "compactor-node".into());
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let opts = risingwave_compactor::CompactorOpts::parse_from(opts);
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts).await });
}
RisingWaveService::ConnectorNode(_) => {
panic!("Connector node unsupported in Risingwave standalone mode.");
Expand Down Expand Up @@ -126,28 +154,32 @@ mod test {
compute_opts: "--listen-address 127.0.0.1 --port 8000".into(),
meta_opts: "--data-dir \"some path with spaces\" --port 8001".into(),
frontend_opts: "--some-option".into(),
compactor_opts: "--some-other-option".into(),
};
let actual = parse_opt_args(&opts);
check(
actual,
expect![[r#"
(
[
"--data-dir",
"some path with spaces",
"--port",
"8001",
],
[
"--listen-address",
"127.0.0.1",
"--port",
"8000",
],
[
"--some-option",
],
)"#]],
ParsedStandaloneOpts {
meta_opts: [
"--data-dir",
"some path with spaces",
"--port",
"8001",
],
compute_opts: [
"--listen-address",
"127.0.0.1",
"--port",
"8000",
],
frontend_opts: [
"--some-option",
],
compactor_opts: [
"--some-other-option",
],
}"#]],
);
}
}

0 comments on commit 194b606

Please sign in to comment.