Skip to content

Commit

Permalink
feat(standalone): support optionally including components (#12626)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 5, 2023
1 parent ffa0758 commit e365f2f
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 52 deletions.
57 changes: 54 additions & 3 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,25 @@ fi

cluster_start() {
if [[ $mode == "standalone" ]]; then
mkdir -p "$PREFIX_LOG"
cargo make clean-data
cargo make pre-start-dev
start_standalone "$PREFIX_LOG"/standalone.log &
cargo make ci-start standalone-minio-etcd
cargo make dev standalone-minio-etcd
else
cargo make ci-start "$mode"
fi
}

cluster_stop() {
if [[ $mode == "standalone" ]]; then
if [[ $mode == "standalone" ]]
then
stop_standalone
# Don't check standalone logs, they will exceed the limit.
cargo make kill
else
cargo make ci-kill
fi
cargo make ci-kill
}

download_and_prepare_rw "$profile" common
Expand Down Expand Up @@ -180,6 +187,13 @@ if [[ "$mode" == "standalone" ]]; then
run_sql() {
psql -h localhost -p 4566 -d dev -U root -c "$@"
}
compactor_is_online() {
set +e
grep -q "risingwave_cmd_all::standalone: starting compactor-node thread" "${PREFIX_LOG}/standalone.log"
local EXIT_CODE=$?
set -e
return $EXIT_CODE
}

echo "--- e2e, standalone, cluster-persistence-test"
cluster_start
Expand Down Expand Up @@ -210,6 +224,43 @@ if [[ "$mode" == "standalone" ]]; then
echo "--- Kill cluster"
cluster_stop

wait

# Test that we can optionally include nodes in standalone mode.
echo "--- e2e, standalone, cluster-opts-test"

echo "test standalone without compactor"
mkdir -p "$PREFIX_LOG"
cargo make clean-data
cargo make pre-start-dev
start_standalone_without_compactor "$PREFIX_LOG"/standalone.log &
cargo make dev standalone-minio-etcd-compactor
wait_standalone
if compactor_is_online
then
echo "ERROR: Compactor should not be online."
exit 1
fi
cluster_stop
echo "test standalone without compactor [TEST PASSED]"

wait

echo "test standalone with compactor"
mkdir -p "$PREFIX_LOG"
cargo make clean-data
cargo make pre-start-dev
start_standalone "$PREFIX_LOG"/standalone.log &
cargo make dev standalone-minio-etcd
wait_standalone
if ! compactor_is_online
then
echo "ERROR: Compactor should be online."
exit 1
fi
cluster_stop
echo "test standalone with compactor [TEST PASSED]"

# Make sure any remaining background task exits.
wait
fi
38 changes: 36 additions & 2 deletions ci/scripts/standalone-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,43 @@ export RW_PREFIX=$PWD/.risingwave
export PREFIX_BIN=$RW_PREFIX/bin
export PREFIX_LOG=$RW_PREFIX/log

# NOTE(kwannoel): Compared to start_standalone below, we omitted the compactor-opts,
# so it should not start.
start_standalone_without_compactor() {
RUST_BACKTRACE=1 \
"$PREFIX_BIN"/risingwave/standalone \
--meta-opts=" \
--listen-addr 127.0.0.1:5690 \
--advertise-addr 127.0.0.1:5690 \
--dashboard-host 127.0.0.1:5691 \
--prometheus-host 127.0.0.1:1250 \
--connector-rpc-endpoint 127.0.0.1:50051 \
--backend etcd \
--etcd-endpoints 127.0.0.1:2388 \
--state-store hummock+minio://hummockadmin:[email protected]:9301/hummock001 \
--data-directory hummock_001 \
--dashboard-ui-path $RW_PREFIX/ui" \
--compute-opts=" \
--listen-addr 127.0.0.1:5688 \
--prometheus-listener-addr 127.0.0.1:1222 \
--advertise-addr 127.0.0.1:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 127.0.0.1:50051 \
--parallelism 4 \
--total-memory-bytes 8589934592 \
--role both \
--meta-address http://127.0.0.1:5690" \
--frontend-opts=" \
--listen-addr 127.0.0.1:4566 \
--advertise-addr 127.0.0.1:4566 \
--prometheus-listener-addr 127.0.0.1:2222 \
--health-check-listener-addr 127.0.0.1:6786 \
--meta-addr http://127.0.0.1:5690" >"$1" 2>&1
}

# You can fill up this section by consulting
# .risingwave/log/risedev.log, after calling `./risedev d full`.
# It is expected that `minio`, `etcd` will be started after this is called.
# .risingwave/log/risedev.log, after calling ./risedev d full.
# It is expected that minio, etcd will be started after this is called.
start_standalone() {
RUST_BACKTRACE=1 \
"$PREFIX_BIN"/risingwave/standalone \
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
timeout_in_minutes: 30
retry: *auto-retry

- label: "fuzz test"
Expand Down
12 changes: 12 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ profile:
- use: compactor
user-managed: true

standalone-minio-etcd-compactor:
steps:
- use: minio
- use: etcd
- use: meta-node
user-managed: true
- use: compute-node
user-managed: true
- use: frontend
user-managed: true
- use: compactor

hdfs:
steps:
# - use: etcd
Expand Down
1 change: 0 additions & 1 deletion src/cmd_all/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub enum RisingWaveService {
Compute(Vec<OsString>),
Meta(Vec<OsString>),
Frontend(Vec<OsString>),
#[allow(dead_code)]
Compactor(Vec<OsString>),
ConnectorNode(Vec<OsString>),
}
Expand Down
111 changes: 66 additions & 45 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,38 @@ use tokio::signal;

use crate::common::{osstrs, RisingWaveService};

#[derive(Debug, Clone, Parser)]
#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
pub struct StandaloneOpts {
/// Compute node options
#[clap(short, long, env = "STANDALONE_COMPUTE_OPTS", default_value = "")]
compute_opts: String,
#[clap(short, long, env = "STANDALONE_COMPUTE_OPTS")]
compute_opts: Option<String>,

#[clap(short, long, env = "STANDALONE_META_OPTS", default_value = "")]
#[clap(short, long, env = "STANDALONE_META_OPTS")]
/// Meta node options
meta_opts: String,
meta_opts: Option<String>,

#[clap(short, long, env = "STANDALONE_FRONTEND_OPTS", default_value = "")]
#[clap(short, long, env = "STANDALONE_FRONTEND_OPTS")]
/// Frontend node options
frontend_opts: String,
frontend_opts: Option<String>,

#[clap(long, env = "STANDALONE_COMPACTOR_OPTS", default_value = "")]
#[clap(long, env = "STANDALONE_COMPACTOR_OPTS")]
/// Frontend node options
compactor_opts: String,
compactor_opts: Option<String>,
}

#[derive(Debug)]
pub struct ParsedStandaloneOpts {
pub meta_opts: Vec<String>,
pub compute_opts: Vec<String>,
pub frontend_opts: Vec<String>,
pub compactor_opts: Vec<String>,
pub meta_opts: Option<Vec<String>>,
pub compute_opts: Option<Vec<String>>,
pub frontend_opts: Option<Vec<String>>,
pub compactor_opts: Option<Vec<String>>,
}

fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
let meta_opts = split(&opts.meta_opts).unwrap();
let compute_opts = split(&opts.compute_opts).unwrap();
let frontend_opts = split(&opts.frontend_opts).unwrap();
let compactor_opts = split(&opts.compactor_opts).unwrap();
let meta_opts = opts.meta_opts.as_ref().map(|s| split(s).unwrap());
let compute_opts = opts.compute_opts.as_ref().map(|s| split(s).unwrap());
let frontend_opts = opts.frontend_opts.as_ref().map(|s| split(s).unwrap());
let compactor_opts = opts.compactor_opts.as_ref().map(|s| split(s).unwrap());
ParsedStandaloneOpts {
meta_opts,
compute_opts,
Expand All @@ -66,12 +66,19 @@ fn get_services(opts: &StandaloneOpts) -> Vec<RisingWaveService> {
frontend_opts,
compactor_opts,
} = parse_opt_args(opts);
let services = vec![
RisingWaveService::Meta(osstrs(meta_opts)),
RisingWaveService::Compute(osstrs(compute_opts)),
RisingWaveService::Frontend(osstrs(frontend_opts)),
RisingWaveService::Compactor(osstrs(compactor_opts)),
];
let mut services = vec![];
if let Some(meta_opts) = meta_opts {
services.push(RisingWaveService::Meta(osstrs(meta_opts)));
}
if let Some(compute_opts) = compute_opts {
services.push(RisingWaveService::Compute(osstrs(compute_opts)));
}
if let Some(frontend_opts) = frontend_opts {
services.push(RisingWaveService::Frontend(osstrs(frontend_opts)));
}
if let Some(compactor_opts) = compactor_opts {
services.push(RisingWaveService::Compactor(osstrs(compactor_opts)));
}
services
}

Expand Down Expand Up @@ -150,35 +157,49 @@ mod test {

#[test]
fn test_parse_opt_args() {
// Test parsing into standalone-level opts.
let raw_opts = "
--compute-opts=--listen-address 127.0.0.1 --port 8000
--meta-opts=--data-dir \"some path with spaces\" --port 8001
--frontend-opts=--some-option
";
let actual = StandaloneOpts::parse_from(raw_opts.lines());
let opts = StandaloneOpts {
compute_opts: "--listen-address 127.0.0.1 --port 8000".into(),
meta_opts: "--data-dir \"some path with spaces\" --port 8001".into(),
frontend_opts: "--some-option".into(),
compactor_opts: "--some-other-option".into(),
compute_opts: Some("--listen-address 127.0.0.1 --port 8000".into()),
meta_opts: Some("--data-dir \"some path with spaces\" --port 8001".into()),
frontend_opts: Some("--some-option".into()),
compactor_opts: None,
};
assert_eq!(actual, opts);

// Test parsing into node-level opts.
let actual = parse_opt_args(&opts);
check(
actual,
expect![[r#"
ParsedStandaloneOpts {
meta_opts: [
"--data-dir",
"some path with spaces",
"--port",
"8001",
],
compute_opts: [
"--listen-address",
"127.0.0.1",
"--port",
"8000",
],
frontend_opts: [
"--some-option",
],
compactor_opts: [
"--some-other-option",
],
meta_opts: Some(
[
"--data-dir",
"some path with spaces",
"--port",
"8001",
],
),
compute_opts: Some(
[
"--listen-address",
"127.0.0.1",
"--port",
"8000",
],
),
frontend_opts: Some(
[
"--some-option",
],
),
compactor_opts: None,
}"#]],
);
}
Expand Down

0 comments on commit e365f2f

Please sign in to comment.