From 194b6066c1c399c8148b86f3b1af0e6b3b82de9f Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Wed, 4 Oct 2023 16:23:05 +0800 Subject: [PATCH] feat(standalone): add compactor (#12623) --- ci/scripts/run-e2e-test.sh | 2 +- ci/scripts/standalone-utils.sh | 10 ++- risedev.yml | 19 +---- .../scripts/e2e-full-standalone-demo.sh | 18 ++--- src/cmd_all/scripts/standalone-demo-dev.sh | 7 +- src/cmd_all/scripts/standalone-demo-full.sh | 7 +- src/cmd_all/src/standalone.rs | 76 +++++++++++++------ 7 files changed, 86 insertions(+), 53 deletions(-) diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 47b2f8adddcb..76306a97ae6c 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -29,7 +29,7 @@ fi cluster_start() { if [[ $mode == "standalone" ]]; then start_standalone "$PREFIX_LOG"/standalone.log & - cargo make ci-start standalone-minio-etcd-compactor + cargo make ci-start standalone-minio-etcd else cargo make ci-start "$mode" fi diff --git a/ci/scripts/standalone-utils.sh b/ci/scripts/standalone-utils.sh index ca2f8c788f65..2b13242f33fb 100755 --- a/ci/scripts/standalone-utils.sh +++ b/ci/scripts/standalone-utils.sh @@ -6,6 +6,9 @@ export RW_PREFIX=$PWD/.risingwave export PREFIX_BIN=$RW_PREFIX/bin export PREFIX_LOG=$RW_PREFIX/log +# You can fill up this section by consulting +# .risingwave/log/risedev.log, after calling `./risedev d full`. +# It is expected that `minio`, `etcd` will be started after this is called. start_standalone() { RUST_BACKTRACE=1 \ "$PREFIX_BIN"/risingwave/standalone \ @@ -35,7 +38,12 @@ start_standalone() { --advertise-addr 127.0.0.1:4566 \ --prometheus-listener-addr 127.0.0.1:2222 \ --health-check-listener-addr 127.0.0.1:6786 \ - --meta-addr http://127.0.0.1:5690" >"$1" 2>&1 + --meta-addr http://127.0.0.1:5690" \ + --compactor-opts=" \ + --listen-addr 127.0.0.1:6660 \ + --prometheus-listener-addr 127.0.0.1:1260 \ + --advertise-addr 127.0.0.1:6660 \ + --meta-address http://127.0.0.1:5690" >"$1" 2>&1 } stop_standalone() { diff --git a/risedev.yml b/risedev.yml index 2f0a793b9121..381ba957c3b2 100644 --- a/risedev.yml +++ b/risedev.yml @@ -137,6 +137,7 @@ profile: - use: frontend user-managed: true - use: compactor + user-managed: true - use: prometheus - use: grafana - use: zookeeper @@ -145,7 +146,7 @@ profile: persist-data: true - use: connector-node - standalone-minio-etcd-compactor: + standalone-minio-etcd: steps: - use: minio - use: etcd @@ -156,23 +157,7 @@ profile: - use: frontend user-managed: true - use: compactor - - standalone-full-peripherals-without-kafka: - steps: - - use: minio - - use: etcd - - use: meta-node - user-managed: true - - use: compute-node user-managed: true - - use: frontend - user-managed: true - - use: compactor - - use: prometheus - - use: grafana - - use: zookeeper - persist-data: true - - use: connector-node hdfs: steps: diff --git a/src/cmd_all/scripts/e2e-full-standalone-demo.sh b/src/cmd_all/scripts/e2e-full-standalone-demo.sh index 95e903745e9f..6c8c01740731 100755 --- a/src/cmd_all/scripts/e2e-full-standalone-demo.sh +++ b/src/cmd_all/scripts/e2e-full-standalone-demo.sh @@ -62,16 +62,14 @@ echo "--- Starting standalone cluster" ./risedev standalone-demo-full >"$LOG_PREFIX"/standalone.log 2>&1 & STANDALONE_PID=$! -# Wait for rw cluster to finish startup -sleep 10 - -# Make sure the env file is present -set +e -while [ ! -f "$RW_PREFIX"/config/risedev-env ]; do - echo "Waiting for risedev-env to be configured." - sleep 1 -done -set -e +sleep 15 + +# FIXME: Integrate standalone into risedev, so we can reuse risedev-env functionality here. +cat << EOF > "$RW_PREFIX"/config/risedev-env +RW_META_ADDR="http://0.0.0.0:5690" +RW_FRONTEND_LISTEN_ADDRESS="0.0.0.0" +RW_FRONTEND_PORT="4566" +EOF echo "--- Setting up table" ./risedev psql -c " diff --git a/src/cmd_all/scripts/standalone-demo-dev.sh b/src/cmd_all/scripts/standalone-demo-dev.sh index 1819f4065da5..87c1e0ab9b61 100755 --- a/src/cmd_all/scripts/standalone-demo-dev.sh +++ b/src/cmd_all/scripts/standalone-demo-dev.sh @@ -34,4 +34,9 @@ cargo run -p risingwave_cmd_all \ --advertise-addr 127.0.0.1:4566 \ --prometheus-listener-addr 127.0.0.1:2222 \ --health-check-listener-addr 127.0.0.1:6786 \ - --meta-addr http://127.0.0.1:5690" \ No newline at end of file + --meta-addr http://127.0.0.1:5690" \ + --compactor-opts=" \ + --listen-addr 127.0.0.1:6660 \ + --prometheus-listener-addr 127.0.0.1:1260 \ + --advertise-addr 127.0.0.1:6660 \ + --meta-address http://127.0.0.1:5690" \ No newline at end of file diff --git a/src/cmd_all/scripts/standalone-demo-full.sh b/src/cmd_all/scripts/standalone-demo-full.sh index 4fa3fe183251..46ca69b98259 100755 --- a/src/cmd_all/scripts/standalone-demo-full.sh +++ b/src/cmd_all/scripts/standalone-demo-full.sh @@ -41,7 +41,12 @@ start_standalone() { --advertise-addr 127.0.0.1:4566 \ --prometheus-listener-addr 127.0.0.1:2222 \ --health-check-listener-addr 127.0.0.1:6786 \ - --meta-addr http://127.0.0.1:5690" + --meta-addr http://127.0.0.1:5690" \ + --compactor-opts=" \ + --listen-addr 127.0.0.1:6660 \ + --prometheus-listener-addr 127.0.0.1:1260 \ + --advertise-addr 127.0.0.1:6660 \ + --meta-address http://127.0.0.1:5690" } start_standalone \ No newline at end of file diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index bd0db110e2f8..ca4542d5d0c5 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -32,21 +32,45 @@ pub struct StandaloneOpts { #[clap(short, long, env = "STANDALONE_FRONTEND_OPTS", default_value = "")] /// Frontend node options frontend_opts: String, + + #[clap(long, env = "STANDALONE_COMPACTOR_OPTS", default_value = "")] + /// Frontend node options + compactor_opts: String, } -fn parse_opt_args(opts: &StandaloneOpts) -> (Vec, Vec, Vec) { +#[derive(Debug)] +pub struct ParsedStandaloneOpts { + pub meta_opts: Vec, + pub compute_opts: Vec, + pub frontend_opts: Vec, + pub compactor_opts: Vec, +} + +fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts { let meta_opts = split(&opts.meta_opts).unwrap(); let compute_opts = split(&opts.compute_opts).unwrap(); let frontend_opts = split(&opts.frontend_opts).unwrap(); - (meta_opts, compute_opts, frontend_opts) + let compactor_opts = split(&opts.compactor_opts).unwrap(); + ParsedStandaloneOpts { + meta_opts, + compute_opts, + frontend_opts, + compactor_opts, + } } fn get_services(opts: &StandaloneOpts) -> Vec { - let (meta_opts, compute_opts, frontend_opts) = parse_opt_args(opts); + let ParsedStandaloneOpts { + meta_opts, + compute_opts, + frontend_opts, + compactor_opts, + } = parse_opt_args(opts); let services = vec![ RisingWaveService::Meta(osstrs(meta_opts)), RisingWaveService::Compute(osstrs(compute_opts)), RisingWaveService::Frontend(osstrs(frontend_opts)), + RisingWaveService::Compactor(osstrs(compactor_opts)), ]; services } @@ -83,8 +107,12 @@ pub async fn standalone(opts: StandaloneOpts) -> Result<()> { let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await }); } - RisingWaveService::Compactor(_) => { - panic!("Compactor node unsupported in Risingwave standalone mode."); + RisingWaveService::Compactor(mut opts) => { + opts.insert(0, "compactor-node".into()); + tracing::info!("starting compactor-node thread with cli args: {:?}", opts); + let opts = risingwave_compactor::CompactorOpts::parse_from(opts); + let _compactor_handle = + tokio::spawn(async move { risingwave_compactor::start(opts).await }); } RisingWaveService::ConnectorNode(_) => { panic!("Connector node unsupported in Risingwave standalone mode."); @@ -126,28 +154,32 @@ mod test { compute_opts: "--listen-address 127.0.0.1 --port 8000".into(), meta_opts: "--data-dir \"some path with spaces\" --port 8001".into(), frontend_opts: "--some-option".into(), + compactor_opts: "--some-other-option".into(), }; let actual = parse_opt_args(&opts); check( actual, expect![[r#" - ( - [ - "--data-dir", - "some path with spaces", - "--port", - "8001", - ], - [ - "--listen-address", - "127.0.0.1", - "--port", - "8000", - ], - [ - "--some-option", - ], - )"#]], + ParsedStandaloneOpts { + meta_opts: [ + "--data-dir", + "some path with spaces", + "--port", + "8001", + ], + compute_opts: [ + "--listen-address", + "127.0.0.1", + "--port", + "8000", + ], + frontend_opts: [ + "--some-option", + ], + compactor_opts: [ + "--some-other-option", + ], + }"#]], ); } }