From 5502d437e66e76c4e3fb8d6e0bac28dc00b2a83f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 26 Mar 2024 13:23:31 +0000 Subject: [PATCH 1/2] fix(source): parse message with empty key and payload (#15678) (#15884) Co-authored-by: StrikeW Co-authored-by: tabVersion --- ci/scripts/e2e-source-test.sh | 10 +++ e2e_test/source/cdc/cdc.create_source_job.slt | 15 ++++ .../source/SourceValidateHandler.java | 5 ++ src/connector/src/parser/mod.rs | 80 ++++++++++++++++++- src/stream/src/executor/source/mod.rs | 4 +- 5 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 e2e_test/source/cdc/cdc.create_source_job.slt diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index aa6da360fbe52..758144e221c3d 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -86,10 +86,20 @@ echo "--- mysql & postgres cdc validate test" sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt' sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt' +echo "--- cdc share source test" # cdc share stream test cases export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt' +# create a share source and check whether heartbeat message is received +sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.create_source_job.slt' +table_id=`psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs`; +table_count=`psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs`; +if [ $table_count -eq 0 ]; then + echo "ERROR: internal table of cdc share source is empty!" + exit 1 +fi + echo "--- mysql & postgres load and check" sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt' # wait for cdc loading diff --git a/e2e_test/source/cdc/cdc.create_source_job.slt b/e2e_test/source/cdc/cdc.create_source_job.slt new file mode 100644 index 0000000000000..1f2300087110b --- /dev/null +++ b/e2e_test/source/cdc/cdc.create_source_job.slt @@ -0,0 +1,15 @@ +control substitution on + +# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` +statement ok +create source mysql_source with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'rwcdc', + password = '${MYSQL_PWD:}', + database.name = 'mytest', + server.id = '5001' +); + +sleep 2s diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index b192c7b253ddc..309ab8db7af4e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -95,6 +95,11 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re boolean isCdcSourceJob = request.getIsSourceJob(); boolean isBackfillTable = request.getIsBackfillTable(); + LOG.info( + "source_id: {}, is_cdc_source_job: {}, is_backfill_table: {}", + request.getSourceId(), + isCdcSourceJob, + isBackfillTable); TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 4097c86a2821b..c4cae0036fda9 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -80,6 +80,7 @@ mod upsert_parser; mod util; pub use debezium::DEBEZIUM_IGNORE_KEY; +use risingwave_common::buffer::BitmapBuilder; pub use unified::{AccessError, AccessResult}; /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. @@ -141,6 +142,63 @@ impl SourceStreamChunkBuilder { } } +/// A builder for building a [`StreamChunk`] that contains only heartbeat rows. +/// Some connectors may emit heartbeat messages to the downstream, and the cdc source +/// rely on the heartbeat messages to keep the source offset up-to-date with upstream. +pub struct HeartbeatChunkBuilder { + builder: SourceStreamChunkBuilder, +} + +impl HeartbeatChunkBuilder { + fn with_capacity(descs: Vec, cap: usize) -> Self { + let builders = descs + .iter() + .map(|desc| desc.data_type.create_array_builder(cap)) + .collect(); + + Self { + builder: SourceStreamChunkBuilder { + descs, + builders, + op_builder: Vec::with_capacity(cap), + }, + } + } + + fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> { + self.builder.row_writer() + } + + /// Consumes the builder and returns a [`StreamChunk`] with all rows marked as invisible + fn finish(self) -> StreamChunk { + // heartbeat chunk should be invisible + let builder = self.builder; + let visibility = BitmapBuilder::zeroed(builder.op_builder.len()); + StreamChunk::with_visibility( + builder.op_builder, + builder + .builders + .into_iter() + .map(|builder| builder.finish().into()) + .collect(), + visibility.finish(), + ) + } + + /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for + /// the builders of the next [`StreamChunk`]. + #[must_use] + fn take(&mut self, next_cap: usize) -> StreamChunk { + let descs = std::mem::take(&mut self.builder.descs); + let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); + builder.finish() + } + + fn is_empty(&self) -> bool { + self.builder.is_empty() + } +} + /// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`], /// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically. /// @@ -560,6 +618,10 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { self.parse_one(key, payload, writer) .map_ok(|_| ParseResult::Rows) } + + fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) { + _ = writer.insert(|_column| Ok(None)); + } } #[try_stream(ok = Vec, error = ConnectorError)] @@ -616,6 +678,7 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096; async fn into_chunk_stream(mut parser: P, data_stream: BoxSourceStream) { let columns = parser.columns().to_vec(); + let mut heartbeat_builder = HeartbeatChunkBuilder::with_capacity(columns.clone(), 0); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); struct Transaction { @@ -657,7 +720,15 @@ async fn into_chunk_stream(mut parser: P, data_stream let process_time_ms = chrono::Utc::now().timestamp_millis(); for (i, msg) in batch.into_iter().enumerate() { if msg.key.is_none() && msg.payload.is_none() { - tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message"); + tracing::debug!( + offset = msg.offset, + "got a empty message, could be a heartbeat" + ); + parser.emit_empty_row(heartbeat_builder.row_writer().with_meta(MessageMeta { + meta: &msg.meta, + split_id: &msg.split_id, + offset: &msg.offset, + })); continue; } @@ -750,6 +821,13 @@ async fn into_chunk_stream(mut parser: P, data_stream } } + // emit heartbeat for each message batch + // we must emit heartbeat chunk before the data chunk, + // otherwise the source offset could be backward due to the heartbeat + if !heartbeat_builder.is_empty() { + yield heartbeat_builder.take(0); + } + // If we are not in a transaction, we should yield the chunk now. if current_transaction.is_none() { yield_asap = false; diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 08f07ca192db6..eae70cfe657a5 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -60,7 +60,9 @@ pub fn get_split_offset_mapping_from_chunk( offset_idx: usize, ) -> Option> { let mut split_offset_mapping = HashMap::new(); - for (_, row) in chunk.rows() { + // All rows (including those visible or invisible) will be used to update the source offset. + for i in 0..chunk.capacity() { + let (_, row, _) = chunk.row_at(i); let split_id = row.datum_at(split_idx).unwrap().into_utf8().into(); let offset = row.datum_at(offset_idx).unwrap().into_utf8(); split_offset_mapping.insert(split_id, offset.to_string()); From a34347ee84547e18c6b6475ceb7811fad8ab93db Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 26 Mar 2024 16:08:26 +0000 Subject: [PATCH 2/2] refactor(cmd_all): refactor playground to use `single_node --in-memory` internally (#15897) (#15929) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> --- src/cmd_all/src/bin/risingwave.rs | 20 ++- src/cmd_all/src/common.rs | 20 --- src/cmd_all/src/lib.rs | 2 - src/cmd_all/src/playground.rs | 260 ------------------------------ src/cmd_all/src/single_node.rs | 9 ++ src/cmd_all/src/standalone.rs | 16 +- 6 files changed, 33 insertions(+), 294 deletions(-) delete mode 100644 src/cmd_all/src/playground.rs diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index c7be6f9ac25cd..4fc326f1089a3 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -20,7 +20,7 @@ use anyhow::Result; use clap::error::ErrorKind; use clap::{command, ArgMatches, Args, Command, FromArgMatches}; use risingwave_cmd::{compactor, compute, ctl, frontend, meta}; -use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts}; +use risingwave_cmd_all::{SingleNodeOpts, StandaloneOpts}; use risingwave_common::git_sha; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -123,7 +123,7 @@ impl Component { Self::Frontend => frontend(parse_opts(matches)), Self::Compactor => compactor(parse_opts(matches)), Self::Ctl => ctl(parse_opts(matches)), - Self::Playground => playground(parse_opts(matches)), + Self::Playground => single_node(SingleNodeOpts::new_for_playground()), Self::Standalone => standalone(parse_opts(matches)), Self::SingleNode => single_node(parse_opts(matches)), } @@ -151,7 +151,7 @@ impl Component { Component::Frontend => FrontendOpts::augment_args(cmd), Component::Compactor => CompactorOpts::augment_args(cmd), Component::Ctl => CtlOpts::augment_args(cmd), - Component::Playground => PlaygroundOpts::augment_args(cmd), + Component::Playground => cmd, Component::Standalone => StandaloneOpts::augment_args(cmd), Component::SingleNode => SingleNodeOpts::augment_args(cmd), } @@ -161,8 +161,14 @@ impl Component { fn commands() -> Vec { Self::iter() .map(|c| { + let is_playground = matches!(c, Component::Playground); let name: &'static str = c.into(); let command = Command::new(name).visible_aliases(c.aliases()); + let command = if is_playground { + command.hide(true) + } else { + command + }; c.augment_args(command) }) .collect() @@ -221,14 +227,6 @@ fn main() -> Result<()> { Ok(()) } -fn playground(opts: PlaygroundOpts) { - let settings = risingwave_rt::LoggerSettings::from_opts(&opts) - .with_target("risingwave_storage", Level::WARN) - .with_thread_name(true); - risingwave_rt::init_risingwave_logger(settings); - risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap(); -} - fn standalone(opts: StandaloneOpts) { let opts = risingwave_cmd_all::parse_standalone_opt_args(&opts); let settings = risingwave_rt::LoggerSettings::from_opts(&opts) diff --git a/src/cmd_all/src/common.rs b/src/cmd_all/src/common.rs index 68e9090578ab0..cfc55c0d17428 100644 --- a/src/cmd_all/src/common.rs +++ b/src/cmd_all/src/common.rs @@ -17,23 +17,3 @@ use std::ffi::OsString; pub fn osstrs + AsRef>(s: impl AsRef<[T]>) -> Vec { s.as_ref().iter().map(OsString::from).collect() } - -pub enum RisingWaveService { - Compute(Vec), - Meta(Vec), - Frontend(Vec), - #[allow(dead_code)] - Compactor(Vec), -} - -impl RisingWaveService { - /// Extend additional arguments to the service. - pub fn extend_args(&mut self, args: &[&str]) { - match self { - RisingWaveService::Compute(args0) - | RisingWaveService::Meta(args0) - | RisingWaveService::Frontend(args0) - | RisingWaveService::Compactor(args0) => args0.extend(args.iter().map(|s| s.into())), - } - } -} diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index 54ee3243bc662..a872d7de12b39 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -15,12 +15,10 @@ #![feature(lazy_cell)] mod common; -pub mod playground; mod standalone; pub mod single_node; -pub use playground::*; pub use single_node::*; pub use standalone::*; diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs deleted file mode 100644 index 1b03048d5d6e0..0000000000000 --- a/src/cmd_all/src/playground.rs +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::env; -use std::ffi::OsString; -use std::io::Write; -use std::path::Path; -use std::sync::LazyLock; - -use anyhow::Result; -use clap::Parser; -use risingwave_common::util::meta_addr::MetaAddressStrategy; -use tempfile::TempPath; -use tokio::signal; - -use crate::common::{osstrs as common_osstrs, RisingWaveService}; - -const IDLE_EXIT_SECONDS: u64 = 1800; - -/// Embed the config file and create a temporary file at runtime. -static CONFIG_PATH_WITH_IDLE_EXIT: LazyLock = LazyLock::new(|| { - let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); - write!( - file, - "[meta] -disable_recovery = true -dangerous_max_idle_secs = {IDLE_EXIT_SECONDS} -max_heartbeat_interval_secs = 600", - ) - .expect("failed to write config file"); - file.into_temp_path() -}); - -fn osstrs(s: [&str; N]) -> Vec { - common_osstrs(s) -} - -fn get_services(profile: &str) -> (Vec, bool) { - let mut services = match profile { - "playground" => vec![ - RisingWaveService::Meta(osstrs([ - "--dashboard-host", - "0.0.0.0:5691", - "--state-store", - "hummock+memory", - "--data-directory", - "hummock_001", - "--advertise-addr", - "127.0.0.1:5690", - ])), - RisingWaveService::Compute(osstrs([])), - RisingWaveService::Frontend(osstrs([])), - ], - "playground-3cn" => vec![ - RisingWaveService::Meta(osstrs([ - "--dashboard-host", - "0.0.0.0:5691", - "--advertise-addr", - "127.0.0.1:5690", - "--state-store", - "hummock+memory-shared", - "--data-directory", - "hummock_001", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "127.0.0.1:5687", - "--parallelism", - "4", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "127.0.0.1:5688", - "--parallelism", - "4", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "127.0.0.1:5689", - "--parallelism", - "4", - ])), - RisingWaveService::Frontend(osstrs([])), - ], - "online-docker-playground" | "docker-playground" => vec![ - RisingWaveService::Meta(osstrs([ - "--listen-addr", - "0.0.0.0:5690", - "--advertise-addr", - "127.0.0.1:5690", - "--dashboard-host", - "0.0.0.0:5691", - "--state-store", - "hummock+memory", - "--data-directory", - "hummock_001", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "0.0.0.0:5688", - "--advertise-addr", - "127.0.0.1:5688", - ])), - RisingWaveService::Frontend(osstrs([ - "--listen-addr", - "0.0.0.0:4566", - "--advertise-addr", - "127.0.0.1:4566", - ])), - ], - _ => { - tracing::warn!("Unknown playground profile. All components will be started using the default command line options."); - return get_services("playground"); - } - }; - let idle_exit = profile != "docker-playground"; - if idle_exit { - services.iter_mut().for_each(|s| { - s.extend_args(&[ - "--config-path", - &CONFIG_PATH_WITH_IDLE_EXIT.as_os_str().to_string_lossy(), - ]) - }) - } - (services, idle_exit) -} - -#[derive(Debug, Clone, Parser)] -#[command(about = "The quick way to start an in-memory RisingWave cluster for playing around")] -pub struct PlaygroundOpts { - /// The profile to use. - #[clap(short, long, env = "PLAYGROUND_PROFILE", default_value = "playground")] - profile: String, -} - -impl risingwave_common::opts::Opts for PlaygroundOpts { - fn name() -> &'static str { - "playground" - } - - fn meta_addr(&self) -> MetaAddressStrategy { - "http://0.0.0.0:5690".parse().unwrap() // hard-coded - } -} - -pub async fn playground(opts: PlaygroundOpts) -> Result<()> { - let profile = opts.profile; - - tracing::info!("launching playground with profile `{}`", profile); - - let (services, idle_exit) = get_services(&profile); - - for service in services { - match service { - RisingWaveService::Meta(mut opts) => { - opts.insert(0, "meta-node".into()); - tracing::info!("starting meta-node thread with cli args: {:?}", opts); - let opts = risingwave_meta_node::MetaNodeOpts::parse_from(opts); - let _meta_handle = tokio::spawn(async move { - risingwave_meta_node::start(opts).await; - tracing::warn!("meta is stopped, shutdown all nodes"); - // As a playground, it's fine to just kill everything. - if idle_exit { - eprintln!("{}", - console::style(format_args!( - "RisingWave playground exited after being idle for {IDLE_EXIT_SECONDS} seconds. Bye!" - )).bold()); - } - std::process::exit(0); - }); - // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - RisingWaveService::Compute(mut opts) => { - opts.insert(0, "compute-node".into()); - tracing::info!("starting compute-node thread with cli args: {:?}", opts); - let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts); - let _compute_handle = - tokio::spawn(async move { risingwave_compute::start(opts).await }); - } - RisingWaveService::Frontend(mut opts) => { - opts.insert(0, "frontend-node".into()); - tracing::info!("starting frontend-node thread with cli args: {:?}", opts); - let opts = risingwave_frontend::FrontendOpts::parse_from(opts); - let _frontend_handle = - tokio::spawn(async move { risingwave_frontend::start(opts).await }); - } - RisingWaveService::Compactor(mut opts) => { - opts.insert(0, "compactor".into()); - tracing::info!("starting compactor thread with cli args: {:?}", opts); - let opts = risingwave_compactor::CompactorOpts::parse_from(opts); - let _compactor_handle = - tokio::spawn(async move { risingwave_compactor::start(opts).await }); - } - } - } - - // wait for log messages to be flushed - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - eprintln!("-------------------------------"); - eprintln!("RisingWave playground is ready."); - eprint!( - "* {} RisingWave playground SHOULD NEVER be used in benchmarks and production environment!!!\n It is fully in-memory", - console::style("WARNING:").red().bold(), - ); - if idle_exit { - eprintln!( - " and will be automatically stopped after being idle for {}.", - console::style(format_args!("{IDLE_EXIT_SECONDS}s")).dim() - ); - } else { - eprintln!(); - } - let psql_cmd = if let Ok("1") = env::var("RISEDEV").as_deref() { - // Started with `./risedev playground`. - eprintln!( - "* Use {} instead if you want to start a full cluster.", - console::style("./risedev d").blue().bold() - ); - - // This is a specialization of `generate_risedev_env` in - // `src/risedevtool/src/risedev_env.rs`. - let risedev_env = r#" - RW_META_ADDR="http://0.0.0.0:5690" - RW_FRONTEND_LISTEN_ADDRESS="0.0.0.0" - RW_FRONTEND_PORT="4566" - "#; - std::fs::write( - Path::new(&env::var("PREFIX_CONFIG")?).join("risedev-env"), - risedev_env, - )?; - - "./risedev psql" - } else { - "psql -h localhost -p 4566 -d dev -U root" - }; - eprintln!( - "* Run {} in a different terminal to start Postgres interactive shell.", - console::style(psql_cmd).blue().bold() - ); - eprintln!("-------------------------------"); - - // TODO: should we join all handles? - // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now. - signal::ctrl_c().await.unwrap(); - tracing::info!("Ctrl+C received, now exiting"); - - Ok(()) -} diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 64b6d4b2fb94a..62c0e44ac83e7 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -53,6 +53,15 @@ pub struct SingleNodeOpts { node_opts: NodeSpecificOpts, } +impl SingleNodeOpts { + pub fn new_for_playground() -> Self { + let empty_args = vec![] as Vec; + let mut opts = SingleNodeOpts::parse_from(empty_args); + opts.in_memory = true; + opts + } +} + /// # Node-Specific Options /// /// ## Which node-specific options should be here? diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 28e2bc0ce9387..a4e02a8097644 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -14,6 +14,7 @@ use anyhow::Result; use clap::Parser; +use risingwave_common::config::MetaBackend; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -179,7 +180,9 @@ pub async fn standalone( ) -> Result<()> { tracing::info!("launching Risingwave in standalone mode"); + let mut is_in_memory = false; if let Some(opts) = meta_opts { + is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem)); tracing::info!("starting meta-node thread with cli args: {:?}", opts); let _meta_handle = tokio::spawn(async move { @@ -204,9 +207,20 @@ pub async fn standalone( } // wait for log messages to be flushed - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; eprintln!("-------------------------------"); eprintln!("RisingWave standalone mode is ready."); + if is_in_memory { + eprintln!( + "{}", + console::style( + "WARNING: You are using RisingWave's in-memory mode. +It SHOULD NEVER be used in benchmarks and production environment!!!" + ) + .red() + .bold() + ); + } // TODO: should we join all handles? // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now.