diff --git a/Cargo.lock b/Cargo.lock index 19854624d08ba..247852213c422 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4036,12 +4036,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "iter-chunks" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7abddfc4e19bc38f3922e41b341fedb4e1470e922f024c4e5ae5922f56c7593" - [[package]] name = "itertools" version = "0.10.5" @@ -6893,7 +6887,6 @@ dependencies = [ "anyhow", "assert_matches", "async-recursion", - "async-stream", "async-trait", "criterion", "either", @@ -6927,7 +6920,6 @@ dependencies = [ "tokio-metrics", "tokio-stream", "tracing", - "uuid", "workspace-hack", ] @@ -6977,7 +6969,7 @@ dependencies = [ "risingwave_ctl", "risingwave_expr_impl", "risingwave_frontend", - "risingwave_meta", + "risingwave_meta_node", "risingwave_rt", "task_stats_alloc", "tikv-jemallocator", @@ -7003,7 +6995,7 @@ dependencies = [ "risingwave_ctl", "risingwave_expr_impl", "risingwave_frontend", - "risingwave_meta", + "risingwave_meta_node", "risingwave_rt", "shell-words", "strum 0.25.0", @@ -7172,6 +7164,7 @@ dependencies = [ "risingwave_hummock_sdk", "risingwave_hummock_test", "risingwave_meta", + "risingwave_meta_node", "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", @@ -7185,14 +7178,12 @@ dependencies = [ name = "risingwave_compactor" version = "1.3.0-alpha" dependencies = [ - "anyhow", "async-trait", "await-tree", "clap", "madsim-tokio", "madsim-tonic", "parking_lot 0.12.1", - "prometheus", "risingwave_common", "risingwave_common_heap_profiling", "risingwave_common_service", @@ -7201,7 +7192,6 @@ dependencies = [ "risingwave_rpc_client", "risingwave_storage", "serde", - "serde_json", "tracing", "workspace-hack", ] @@ -7266,7 +7256,6 @@ dependencies = [ "aws-smithy-http", "aws-types", "base64 0.21.4", - "bincode 1.3.3", "byteorder", "bytes", "chrono", @@ -7290,15 +7279,12 @@ dependencies = [ "jsonschema-transpiler", "madsim-rdkafka", "madsim-tokio", - "madsim-tonic", "maplit", "moka", "mysql_async", "mysql_common", "nexmark", - "nkeys", "num-bigint", - "opendal 0.39.0", "parking_lot 0.12.1", "paste", "prometheus", @@ -7493,7 +7479,6 @@ dependencies = [ "madsim-tonic", "maplit", "md5", - "more-asserts", "num-integer", "parking_lot 0.12.1", "parse-display", @@ -7672,17 +7657,14 @@ dependencies = [ "num-integer", "num-traits", "parking_lot 0.12.1", - "parse-display", "prometheus", "prometheus-http-query", "prost 0.12.1", "rand", - "regex", "reqwest", "risingwave_backup", "risingwave_common", "risingwave_common_heap_profiling", - "risingwave_common_service", "risingwave_connector", "risingwave_hummock_sdk", "risingwave_object_store", @@ -7695,9 +7677,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "static_assertions", "sync-point", - "tempfile", "thiserror", "tokio-retry", "tokio-stream", @@ -7709,6 +7689,56 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "risingwave_meta_node" +version = "1.3.0-alpha" +dependencies = [ + "anyhow", + "clap", + "either", + "futures", + "itertools 0.11.0", + "madsim-etcd-client", + "madsim-tokio", + "madsim-tonic", + "model_migration", + "prometheus-http-query", + "regex", + "risingwave_common", + "risingwave_common_heap_profiling", + "risingwave_common_service", + "risingwave_meta", + "risingwave_meta_service", + "risingwave_pb", + "risingwave_rpc_client", + "sea-orm", + "tracing", + "workspace-hack", +] + +[[package]] +name = "risingwave_meta_service" +version = "1.3.0-alpha" +dependencies = [ + "anyhow", + "async-trait", + "either", + "futures", + "itertools 0.11.0", + "madsim-tokio", + "madsim-tonic", + "regex", + "risingwave_common", + "risingwave_connector", + "risingwave_meta", + "risingwave_pb", + "sea-orm", + "sync-point", + "tokio-stream", + "tracing", + "workspace-hack", +] + [[package]] name = "risingwave_object_store" version = "1.3.0-alpha" @@ -7824,7 +7854,6 @@ name = "risingwave_rt" version = "1.3.0-alpha" dependencies = [ "await-tree", - "chrono", "console", "console-subscriber", "either", @@ -7836,7 +7865,6 @@ dependencies = [ "opentelemetry-semantic-conventions", "parking_lot 0.12.1", "pprof", - "prometheus", "risingwave_common", "risingwave_variables", "rlimit", @@ -7878,7 +7906,7 @@ dependencies = [ "risingwave_e2e_extended_mode_test", "risingwave_expr_impl", "risingwave_frontend", - "risingwave_meta", + "risingwave_meta_node", "risingwave_pb", "risingwave_rpc_client", "risingwave_sqlparser", @@ -7902,7 +7930,6 @@ dependencies = [ "anyhow", "assert_matches", "criterion", - "easy-ext", "futures", "futures-async-stream", "itertools 0.11.0", @@ -7996,7 +8023,6 @@ dependencies = [ name = "risingwave_storage" version = "1.3.0-alpha" dependencies = [ - "anyhow", "arc-swap", "async-trait", "auto_enums", @@ -8069,7 +8095,6 @@ dependencies = [ "await-tree", "bytes", "criterion", - "dyn-clone", "educe", "either", "enum-as-inner", @@ -8078,7 +8103,6 @@ dependencies = [ "futures-async-stream", "governor", "hytra", - "iter-chunks", "itertools 0.11.0", "local_stats_alloc", "lru 0.7.6", @@ -8087,9 +8111,7 @@ dependencies = [ "maplit", "memcomparable", "multimap 0.9.0", - "num-traits", "parking_lot 0.12.1", - "parse-display", "pin-project", "prometheus", "prost 0.12.1", @@ -8108,7 +8130,6 @@ dependencies = [ "serde_json", "serde_yaml", "smallvec", - "spin 0.9.8", "static_assertions", "task_stats_alloc", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index a975a0186e712..74a097d4eb9d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "src/cmd_all", "src/common", "src/common/common_service", + "src/common/heap_profiling", "src/compute", "src/connector", "src/ctl", @@ -18,6 +19,8 @@ members = [ "src/java_binding", "src/jni_core", "src/meta", + "src/meta/node", + "src/meta/service", "src/meta/src/model_v2/migration", "src/object_store", "src/prost", @@ -139,6 +142,8 @@ risingwave_hummock_sdk = { path = "./src/storage/hummock_sdk" } risingwave_hummock_test = { path = "./src/storage/hummock_test" } risingwave_hummock_trace = { path = "./src/storage/hummock_trace" } risingwave_meta = { path = "./src/meta" } +risingwave_meta_service = { path = "./src/meta/service" } +risingwave_meta_node = { path = "./src/meta/node" } risingwave_object_store = { path = "./src/object_store" } risingwave_pb = { path = "./src/prost" } risingwave_rpc_client = { path = "./src/rpc_client" } diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 17a19d4771f60..fef154450a563 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -17,7 +17,6 @@ normal = ["workspace-hack"] anyhow = "1" assert_matches = "1" async-recursion = "1" -async-stream = "0.3.5" async-trait = "0.1" either = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } @@ -53,7 +52,6 @@ tokio-metrics = "0.3.0" tokio-stream = "0.1" tonic = { workspace = true } tracing = "0.1" -uuid = "1" [target.'cfg(enable_task_local_alloc)'.dependencies] task_stats_alloc = { path = "../utils/task_stats_alloc" } diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 17fc6cfab2bc1..9104c96c951f5 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -15,7 +15,6 @@ #![expect(dead_code)] #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(exact_size_is_empty)] #![feature(type_alias_impl_trait)] #![cfg_attr(coverage, feature(no_coverage))] diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 78dc7040f2ada..8ba72d6a24af4 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -27,7 +27,7 @@ risingwave_compute = { workspace = true } risingwave_ctl = { workspace = true } risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } -risingwave_meta = { workspace = true } +risingwave_meta_node = { workspace = true } risingwave_rt = { workspace = true } tikv-jemallocator = { workspace = true, features = [ "unprefixed_malloc_on_supported_platforms", diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 7dd34a8364c9a..8d3629eaf657f 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -16,7 +16,7 @@ use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; use risingwave_ctl::CliOpts as CtlOpts; use risingwave_frontend::FrontendOpts; -use risingwave_meta::MetaNodeOpts; +use risingwave_meta_node::MetaNodeOpts; use risingwave_rt::{init_risingwave_logger, main_okk, LoggerSettings}; /// Define the `main` function for a component. @@ -48,7 +48,7 @@ pub fn compute(opts: ComputeNodeOpts) { pub fn meta(opts: MetaNodeOpts) { init_risingwave_logger(LoggerSettings::new("meta")); - main_okk(risingwave_meta::start(opts)); + main_okk(risingwave_meta_node::start(opts)); } pub fn frontend(opts: FrontendOpts) { diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index 922fd3b5812bc..9a4b34c094196 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -31,7 +31,7 @@ risingwave_compute = { workspace = true } risingwave_ctl = { workspace = true } risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } -risingwave_meta = { workspace = true } +risingwave_meta_node = { workspace = true } risingwave_rt = { workspace = true } shell-words = "1.1.0" strum = "0.25" diff --git a/src/cmd_all/src/README.md b/src/cmd_all/src/README.md index fbbae4439f97f..0284817b99a92 100644 --- a/src/cmd_all/src/README.md +++ b/src/cmd_all/src/README.md @@ -26,5 +26,5 @@ You may run and reference the [demo script](../scripts/e2e-full-standalone-demo. Standalone mode simply passes the options to the corresponding node, and starts them in the same process. -For example `--meta-opts` is parsed, and then Meta Node's entrypoint, `risingwave_meta::start`, is called with the parsed options. +For example `--meta-opts` is parsed, and then Meta Node's entrypoint, `risingwave_meta_node::start`, is called with the parsed options. If any option is missing, the corresponding node will not be started. \ No newline at end of file diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 1d92ca768f88d..3e9088e16b9e2 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -25,7 +25,7 @@ use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; use risingwave_ctl::CliOpts as CtlOpts; use risingwave_frontend::FrontendOpts; -use risingwave_meta::MetaNodeOpts; +use risingwave_meta_node::MetaNodeOpts; use strum::IntoEnumIterator; use strum_macros::{Display, EnumIter, EnumString, IntoStaticStr}; use tracing::Level; diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index b1fd1a30ef461..76ca89be17c76 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -159,9 +159,9 @@ pub async fn playground(opts: PlaygroundOpts) -> Result<()> { RisingWaveService::Meta(mut opts) => { opts.insert(0, "meta-node".into()); tracing::info!("starting meta-node thread with cli args: {:?}", opts); - let opts = risingwave_meta::MetaNodeOpts::parse_from(opts); + let opts = risingwave_meta_node::MetaNodeOpts::parse_from(opts); let _meta_handle = tokio::spawn(async move { - risingwave_meta::start(opts).await; + risingwave_meta_node::start(opts).await; tracing::warn!("meta is stopped, shutdown all nodes"); // As a playground, it's fine to just kill everything. if idle_exit { diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index f7c8068cf33b9..8ebe2c7112c49 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -17,7 +17,7 @@ use clap::Parser; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; use risingwave_frontend::FrontendOpts; -use risingwave_meta::MetaNodeOpts; +use risingwave_meta_node::MetaNodeOpts; use shell_words::split; use tokio::signal; @@ -142,7 +142,7 @@ pub async fn standalone(opts: StandaloneOpts) -> Result<()> { tracing::info!("starting meta-node thread with cli args: {:?}", opts); let _meta_handle = tokio::spawn(async move { - risingwave_meta::start(opts).await; + risingwave_meta_node::start(opts).await; tracing::warn!("meta is stopped, shutdown all nodes"); }); // wait for the service to be ready diff --git a/src/common/heap_profiling/Cargo.toml b/src/common/heap_profiling/Cargo.toml index 6c1b9957555bd..c7123eaac5817 100644 --- a/src/common/heap_profiling/Cargo.toml +++ b/src/common/heap_profiling/Cargo.toml @@ -15,16 +15,16 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -tikv-jemalloc-ctl = { workspace = true } -risingwave_common = {workspace =true} -tokio = { version = "0.2", package = "madsim-tokio" } -tracing = "0.1" +anyhow = "1" chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } -anyhow = "1" parking_lot = "0.12" +risingwave_common = { workspace = true } +tikv-jemalloc-ctl = { workspace = true } +tokio = { version = "0.2", package = "madsim-tokio" } +tracing = "0.1" [lints] workspace = true diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 228fb6200b667..2a3575d8dae78 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -18,7 +18,6 @@ )] #![feature(extract_if)] #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(is_sorted)] #![feature(type_alias_impl_trait)] #![feature(test)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index bdd84ae402746..65bf59eedf19e 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(generators)] #![feature(type_alias_impl_trait)] #![feature(let_chains)] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index f0890080c8c5d..abb7486de3091 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -34,7 +34,6 @@ aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } aws-types = { workspace = true } base64 = "0.21" -bincode = "1" byteorder = "1" bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", default-features = false, features = [ @@ -65,7 +64,7 @@ icelake = { workspace = true } indexmap = { version = "1.9.3", features = ["serde"] } itertools = "0.11" jni = { version = "0.21.1", features = ["invocation"] } -jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } +jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.12", features = ["future"] } mysql_async = { version = "0.32", default-features = false, features = [ @@ -75,9 +74,7 @@ mysql_common = { version = "0.30", default-features = false, features = [ "chrono", ] } nexmark = { version = "0.2", features = ["serde"] } -nkeys = "0.3.2" num-bigint = "0.4" -opendal = "0.39" parking_lot = "0.12" paste = "1" prometheus = { version = "0.13", features = ["process"] } @@ -127,7 +124,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ tokio-retry = "0.3" tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } -tonic = { workspace = true } tonic_0_9 = { package = "tonic", version = "0.9" } tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 4dd1691b00f89..8201e10731a28 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -19,7 +19,6 @@ #![feature(stmt_expr_attributes)] #![feature(box_patterns)] #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] #![feature(lazy_cell)] #![feature(result_option_inspect)] diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 7ff97bdd375de..37f9f6326faea 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -36,7 +36,6 @@ iana-time-zone = "0.1" itertools = "0.11" maplit = "1" md5 = "0.7.0" -more-asserts = "0.3" num-integer = "0.1" parking_lot = "0.12" parse-display = "0.8" diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 5bdcdfd633129..67e9a95026cc7 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -40,17 +40,14 @@ model_migration = { path = "src/model_v2/migration" } num-integer = "0.1" num-traits = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } -parse-display = "0.8" prometheus = "0.13" prometheus-http-query = "0.7" prost = { workspace = true } rand = "0.8" -regex = "1" reqwest = "0.11" risingwave_backup = { workspace = true } risingwave_common = { workspace = true } risingwave_common_heap_profiling = { workspace = true } -risingwave_common_service = { workspace = true } risingwave_connector = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_object_store = { workspace = true } @@ -102,8 +99,6 @@ assert_matches = "1" maplit = "1.0.2" rand = "0.8" risingwave_test_runner = { workspace = true } -static_assertions = "1" -tempfile = "3" [features] test = [] diff --git a/src/meta/README.md b/src/meta/README.md new file mode 100644 index 0000000000000..3782d765532a4 --- /dev/null +++ b/src/meta/README.md @@ -0,0 +1,9 @@ +## Organization of the meta crates + +We split the meta module into smaller crates in order to speed up compilation. + +- `meta/node` is the final meta node server +- `meta/service` is tonic grpc service implementations. We may further split this into parallel sub-crates. +- The remaining part `meta/src` is the implementation details imported by services. In the future, we can also try to re-organize this into smaller units. + +Refer to [#12924](https://github.com/risingwavelabs/risingwave/pull/12924) for more details. diff --git a/src/meta/node/Cargo.toml b/src/meta/node/Cargo.toml new file mode 100644 index 0000000000000..8c2a5aeadbe41 --- /dev/null +++ b/src/meta/node/Cargo.toml @@ -0,0 +1,57 @@ +[package] +name = "risingwave_meta_node" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +anyhow = "1" +clap = { version = "4", features = ["derive", "env"] } +either = "1" +etcd-client = { workspace = true } +futures = { version = "0.3", default-features = false, features = ["alloc"] } +itertools = "0.11" +model_migration = { path = "../src/model_v2/migration" } +prometheus-http-query = "0.7" +regex = "1" +risingwave_common = { workspace = true } +risingwave_common_heap_profiling = { workspace = true } +risingwave_common_service = { workspace = true } +risingwave_meta = { workspace = true } +risingwave_meta_service = { workspace = true } +risingwave_pb = { workspace = true } +risingwave_rpc_client = { workspace = true } +sea-orm = { version = "0.12.0", features = [ + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "runtime-tokio-native-tls", + "macros", +] } +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", +] } +tonic = { workspace = true } +tracing = "0.1" + +[target.'cfg(not(madsim))'.dependencies] +workspace-hack = { path = "../../workspace-hack" } + +[dev-dependencies] + +[lints] +workspace = true diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs new file mode 100644 index 0000000000000..55c7b27b0c80a --- /dev/null +++ b/src/meta/node/src/lib.rs @@ -0,0 +1,344 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(lint_reasons)] +#![feature(let_chains)] +#![cfg_attr(coverage, feature(no_coverage))] + +mod server; +use std::time::Duration; + +use clap::Parser; +pub use error::{MetaError, MetaResult}; +use risingwave_common::config::OverrideConfig; +use risingwave_common::util::resource_util; +use risingwave_common::{GIT_SHA, RW_VERSION}; +use risingwave_common_heap_profiling::HeapProfiler; +use risingwave_meta::*; +use risingwave_meta_service::*; +pub use rpc::{ElectionClient, ElectionMember, EtcdElectionClient}; +use server::{rpc_serve, MetaStoreSqlBackend}; + +use crate::manager::MetaOpts; + +#[derive(Debug, Clone, Parser, OverrideConfig)] +#[command(version, about = "The central metadata management service")] +pub struct MetaNodeOpts { + #[clap(long, env = "RW_VPC_ID")] + vpc_id: Option, + + #[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")] + security_group_id: Option, + + #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")] + listen_addr: String, + + /// The address for contacting this instance of the service. + /// This would be synonymous with the service's "public address" + /// or "identifying address". + /// It will serve as a unique identifier in cluster + /// membership and leader election. Must be specified for etcd backend. + #[clap(long, env = "RW_ADVERTISE_ADDR")] + advertise_addr: String, + + #[clap(long, env = "RW_DASHBOARD_HOST")] + dashboard_host: Option, + + #[clap(long, env = "RW_PROMETHEUS_HOST")] + prometheus_host: Option, + + #[clap(long, env = "RW_ETCD_ENDPOINTS", default_value_t = String::from(""))] + etcd_endpoints: String, + + /// Enable authentication with etcd. By default disabled. + #[clap(long, env = "RW_ETCD_AUTH")] + etcd_auth: bool, + + /// Username of etcd, required when --etcd-auth is enabled. + #[clap(long, env = "RW_ETCD_USERNAME", default_value = "")] + etcd_username: String, + + /// Password of etcd, required when --etcd-auth is enabled. + #[clap(long, env = "RW_ETCD_PASSWORD", default_value = "")] + etcd_password: String, + + /// Endpoint of the SQL service, make it non-option when SQL service is required. + #[clap(long, env = "RW_SQL_ENDPOINT")] + sql_endpoint: Option, + + #[clap(long, env = "RW_DASHBOARD_UI_PATH")] + dashboard_ui_path: Option, + + /// For dashboard service to fetch cluster info. + #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")] + prometheus_endpoint: Option, + + /// Endpoint of the connector node, there will be a sidecar connector node + /// colocated with Meta node in the cloud environment + #[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")] + pub connector_rpc_endpoint: Option, + + /// Default tag for the endpoint created when creating a privatelink connection. + /// Will be appended to the tags specified in the `tags` field in with clause in `create + /// connection`. + #[clap(long, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")] + pub privatelink_endpoint_default_tags: Option, + + /// The path of `risingwave.toml` configuration file. + /// + /// If empty, default configuration values will be used. + #[clap(long, env = "RW_CONFIG_PATH", default_value = "")] + pub config_path: String, + + #[clap(long, env = "RW_BACKEND", value_enum)] + #[override_opts(path = meta.backend)] + backend: Option, + + /// The interval of periodic barrier. + #[clap(long, env = "RW_BARRIER_INTERVAL_MS")] + #[override_opts(path = system.barrier_interval_ms)] + barrier_interval_ms: Option, + + /// Target size of the Sstable. + #[clap(long, env = "RW_SSTABLE_SIZE_MB")] + #[override_opts(path = system.sstable_size_mb)] + sstable_size_mb: Option, + + /// Size of each block in bytes in SST. + #[clap(long, env = "RW_BLOCK_SIZE_KB")] + #[override_opts(path = system.block_size_kb)] + block_size_kb: Option, + + /// False positive probability of bloom filter. + #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")] + #[override_opts(path = system.bloom_false_positive)] + bloom_false_positive: Option, + + /// State store url + #[clap(long, env = "RW_STATE_STORE")] + #[override_opts(path = system.state_store)] + state_store: Option, + + /// Remote directory for storing data and metadata objects. + #[clap(long, env = "RW_DATA_DIRECTORY")] + #[override_opts(path = system.data_directory)] + data_directory: Option, + + /// Whether config object storage bucket lifecycle to purge stale data. + #[clap(long, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")] + #[override_opts(path = meta.do_not_config_object_storage_lifecycle)] + do_not_config_object_storage_lifecycle: Option, + + /// Remote storage url for storing snapshots. + #[clap(long, env = "RW_BACKUP_STORAGE_URL")] + #[override_opts(path = system.backup_storage_url)] + backup_storage_url: Option, + + /// Remote directory for storing snapshots. + #[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")] + #[override_opts(path = system.backup_storage_directory)] + backup_storage_directory: Option, + + #[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)] + #[override_opts(path = storage.object_store_streaming_read_timeout_ms)] + pub object_store_streaming_read_timeout_ms: Option, + #[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)] + #[override_opts(path = storage.object_store_streaming_upload_timeout_ms)] + pub object_store_streaming_upload_timeout_ms: Option, + #[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)] + #[override_opts(path = storage.object_store_upload_timeout_ms)] + pub object_store_upload_timeout_ms: Option, + #[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)] + #[override_opts(path = storage.object_store_read_timeout_ms)] + pub object_store_read_timeout_ms: Option, + + /// Enable heap profile dump when memory usage is high. + #[clap(long, env = "RW_HEAP_PROFILING_DIR")] + #[override_opts(path = server.heap_profiling.dir)] + pub heap_profiling_dir: Option, +} + +use std::future::Future; +use std::pin::Pin; + +use risingwave_common::config::{load_config, MetaBackend, RwConfig}; +use tracing::info; + +/// Start meta node +pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { + // WARNING: don't change the function signature. Making it `async fn` will cause + // slow compile in release mode. + Box::pin(async move { + info!("Starting meta node"); + info!("> options: {:?}", opts); + let config = load_config(&opts.config_path, &opts); + info!("> config: {:?}", config); + info!("> version: {} ({})", RW_VERSION, GIT_SHA); + let listen_addr = opts.listen_addr.parse().unwrap(); + let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); + let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap()); + let backend = match config.meta.backend { + MetaBackend::Etcd => MetaStoreBackend::Etcd { + endpoints: opts + .etcd_endpoints + .split(',') + .map(|x| x.to_string()) + .collect(), + credentials: match opts.etcd_auth { + true => Some((opts.etcd_username, opts.etcd_password)), + false => None, + }, + }, + MetaBackend::Mem => MetaStoreBackend::Mem, + }; + let sql_backend = opts + .sql_endpoint + .map(|endpoint| MetaStoreSqlBackend { endpoint }); + + validate_config(&config); + + let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); + let heap_profiler = + HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); + // Run a background heap profiler + heap_profiler.start(); + + let max_heartbeat_interval = + Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64); + let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000; + let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums; + let privatelink_endpoint_default_tags = + opts.privatelink_endpoint_default_tags.map(|tags| { + tags.split(',') + .map(|s| { + let key_val = s.split_once('=').unwrap(); + (key_val.0.to_string(), key_val.1.to_string()) + }) + .collect() + }); + + let add_info = AddressInfo { + advertise_addr: opts.advertise_addr, + listen_addr, + prometheus_addr, + dashboard_addr, + ui_path: opts.dashboard_ui_path, + }; + + let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve( + add_info, + backend, + sql_backend, + max_heartbeat_interval, + config.meta.meta_leader_lease_secs, + MetaOpts { + enable_recovery: !config.meta.disable_recovery, + in_flight_barrier_nums, + max_idle_ms, + compaction_deterministic_test: config.meta.enable_compaction_deterministic, + default_parallelism: config.meta.default_parallelism, + vacuum_interval_sec: config.meta.vacuum_interval_sec, + vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms, + hummock_version_checkpoint_interval_sec: config + .meta + .hummock_version_checkpoint_interval_sec, + min_delta_log_num_for_hummock_version_checkpoint: config + .meta + .min_delta_log_num_for_hummock_version_checkpoint, + min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec, + full_gc_interval_sec: config.meta.full_gc_interval_sec, + collect_gc_watermark_spin_interval_sec: config + .meta + .collect_gc_watermark_spin_interval_sec, + enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check, + periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec, + node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec, + prometheus_endpoint: opts.prometheus_endpoint, + vpc_id: opts.vpc_id, + security_group_id: opts.security_group_id, + connector_rpc_endpoint: opts.connector_rpc_endpoint, + privatelink_endpoint_default_tags, + periodic_space_reclaim_compaction_interval_sec: config + .meta + .periodic_space_reclaim_compaction_interval_sec, + telemetry_enabled: config.server.telemetry_enabled, + periodic_ttl_reclaim_compaction_interval_sec: config + .meta + .periodic_ttl_reclaim_compaction_interval_sec, + periodic_tombstone_reclaim_compaction_interval_sec: config + .meta + .periodic_tombstone_reclaim_compaction_interval_sec, + periodic_split_compact_group_interval_sec: config + .meta + .periodic_split_compact_group_interval_sec, + split_group_size_limit: config.meta.split_group_size_limit, + min_table_split_size: config.meta.move_table_size_limit, + table_write_throughput_threshold: config.meta.table_write_throughput_threshold, + min_table_split_write_throughput: config.meta.min_table_split_write_throughput, + partition_vnode_count: config.meta.partition_vnode_count, + do_not_config_object_storage_lifecycle: config + .meta + .do_not_config_object_storage_lifecycle, + compaction_task_max_heartbeat_interval_secs: config + .meta + .compaction_task_max_heartbeat_interval_secs, + compaction_config: Some(config.meta.compaction_config), + }, + config.system.into_init_system_params(), + ) + .await + .unwrap(); + + tracing::info!("Meta server listening at {}", listen_addr); + + match leader_lost_handle { + None => { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("receive ctrl+c"); + shutdown_send.send(()).unwrap(); + join_handle.await.unwrap() + } + res = &mut join_handle => res.unwrap(), + }; + } + Some(mut handle) => { + tokio::select! { + _ = &mut handle => { + tracing::info!("receive leader lost signal"); + // When we lose leadership, we will exit as soon as possible. + } + _ = tokio::signal::ctrl_c() => { + tracing::info!("receive ctrl+c"); + shutdown_send.send(()).unwrap(); + join_handle.await.unwrap(); + handle.abort(); + } + res = &mut join_handle => { + res.unwrap(); + handle.abort(); + }, + }; + } + }; + }) +} + +fn validate_config(config: &RwConfig) { + if config.meta.meta_leader_lease_secs <= 2 { + let error_msg = "meta leader lease secs should be larger than 2"; + tracing::error!(error_msg); + panic!("{}", error_msg); + } +} diff --git a/src/meta/src/rpc/server.rs b/src/meta/node/src/server.rs similarity index 92% rename from src/meta/src/rpc/server.rs rename to src/meta/node/src/server.rs index 1a15defe43bf5..d5cbfa3e3b26a 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/node/src/server.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -27,6 +26,24 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; +use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; +use risingwave_meta::rpc::ElectionClientRef; +use risingwave_meta_service::backup_service::BackupServiceImpl; +use risingwave_meta_service::cloud_service::CloudServiceImpl; +use risingwave_meta_service::cluster_service::ClusterServiceImpl; +use risingwave_meta_service::ddl_service::DdlServiceImpl; +use risingwave_meta_service::health_service::HealthServiceImpl; +use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl; +use risingwave_meta_service::hummock_service::HummockServiceImpl; +use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl; +use risingwave_meta_service::notification_service::NotificationServiceImpl; +use risingwave_meta_service::scale_service::ScaleServiceImpl; +use risingwave_meta_service::serving_service::ServingServiceImpl; +use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl; +use risingwave_meta_service::stream_service::StreamServiceImpl; +use risingwave_meta_service::system_params_service::SystemParamsServiceImpl; +use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl; +use risingwave_meta_service::user_service::UserServiceImpl; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer; use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer; @@ -50,12 +67,6 @@ use tokio::sync::watch; use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; use tokio::task::JoinHandle; -use super::intercept::MetricsMiddlewareLayer; -use super::service::health_service::HealthServiceImpl; -use super::service::notification_service::NotificationServiceImpl; -use super::service::scale_service::ScaleServiceImpl; -use super::service::serving_service::ServingServiceImpl; -use super::DdlServiceImpl; use crate::backup_restore::BackupManager; use crate::barrier::{BarrierScheduler, GlobalBarrierManager}; use crate::controller::system_param::SystemParamsController; @@ -68,21 +79,9 @@ use crate::manager::{ }; use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election::etcd::EtcdElectionClient; -use crate::rpc::election::ElectionClient; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, }; -use crate::rpc::service::backup_service::BackupServiceImpl; -use crate::rpc::service::cloud_service::CloudServiceImpl; -use crate::rpc::service::cluster_service::ClusterServiceImpl; -use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl; -use crate::rpc::service::hummock_service::HummockServiceImpl; -use crate::rpc::service::meta_member_service::MetaMemberServiceImpl; -use crate::rpc::service::sink_coordination_service::SinkCoordinationServiceImpl; -use crate::rpc::service::stream_service::StreamServiceImpl; -use crate::rpc::service::system_params_service::SystemParamsServiceImpl; -use crate::rpc::service::telemetry_service::TelemetryInfoServiceImpl; -use crate::rpc::service::user_service::UserServiceImpl; use crate::serving::ServingVnodeMapping; use crate::storage::{ EtcdMetaStore, MemStore, MetaStore, MetaStoreBoxExt, MetaStoreRef, @@ -91,43 +90,13 @@ use crate::storage::{ use crate::stream::{GlobalStreamManager, SourceManager}; use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher}; use crate::{hummock, serving, MetaError, MetaResult}; - -#[derive(Debug)] -pub enum MetaStoreBackend { - Etcd { - endpoints: Vec, - credentials: Option<(String, String)>, - }, - Mem, -} - #[derive(Debug)] pub struct MetaStoreSqlBackend { pub(crate) endpoint: String, } -#[derive(Clone)] -pub struct AddressInfo { - pub advertise_addr: String, - pub listen_addr: SocketAddr, - pub prometheus_addr: Option, - pub dashboard_addr: Option, - pub ui_path: Option, -} - -impl Default for AddressInfo { - fn default() -> Self { - Self { - advertise_addr: "".to_string(), - listen_addr: SocketAddr::V4("127.0.0.1:0000".parse().unwrap()), - prometheus_addr: None, - dashboard_addr: None, - ui_path: None, - } - } -} - -pub type ElectionClientRef = Arc; +use risingwave_meta::MetaStoreBackend; +use risingwave_meta_service::AddressInfo; pub async fn rpc_serve( address_info: AddressInfo, @@ -636,25 +605,19 @@ pub async fn start_service_as_election_leader( // compaction_scheduler, &env.opts, )); - sub_tasks.push( - start_worker_info_monitor( - cluster_manager.clone(), - election_client.clone(), - Duration::from_secs(env.opts.node_num_monitor_interval_sec), - meta_metrics.clone(), - ) - .await, - ); - sub_tasks.push( - start_fragment_info_monitor( - cluster_manager.clone(), - catalog_manager, - fragment_manager.clone(), - hummock_manager.clone(), - meta_metrics.clone(), - ) - .await, - ); + sub_tasks.push(start_worker_info_monitor( + cluster_manager.clone(), + election_client.clone(), + Duration::from_secs(env.opts.node_num_monitor_interval_sec), + meta_metrics.clone(), + )); + sub_tasks.push(start_fragment_info_monitor( + cluster_manager.clone(), + catalog_manager, + fragment_manager.clone(), + hummock_manager.clone(), + meta_metrics.clone(), + )); if let Some(system_params_ctl) = system_params_controller { sub_tasks.push(SystemParamsController::start_params_notifier( system_params_ctl, diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml new file mode 100644 index 0000000000000..1760ccd56a85a --- /dev/null +++ b/src/meta/service/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "risingwave_meta_service" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +anyhow = "1" +async-trait = "0.1" +either = "1" +futures = { version = "0.3", default-features = false, features = ["alloc"] } +itertools = "0.11" +regex = "1" +risingwave_common = { workspace = true } +risingwave_connector = { workspace = true } +risingwave_meta = { workspace = true } +risingwave_pb = { workspace = true } +sea-orm = { version = "0.12.0", features = [ + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "runtime-tokio-native-tls", + "macros", +] } +sync-point = { path = "../../utils/sync-point" } +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", +] } +tokio-stream = { version = "0.1", features = ["net"] } +tonic = { workspace = true } +tracing = "0.1" + +[target.'cfg(not(madsim))'.dependencies] +workspace-hack = { path = "../../workspace-hack" } + +[dev-dependencies] + +[lints] +workspace = true diff --git a/src/meta/src/rpc/service/backup_service.rs b/src/meta/service/src/backup_service.rs similarity index 100% rename from src/meta/src/rpc/service/backup_service.rs rename to src/meta/service/src/backup_service.rs diff --git a/src/meta/src/rpc/service/cloud_service.rs b/src/meta/service/src/cloud_service.rs similarity index 100% rename from src/meta/src/rpc/service/cloud_service.rs rename to src/meta/service/src/cloud_service.rs diff --git a/src/meta/src/rpc/service/cluster_service.rs b/src/meta/service/src/cluster_service.rs similarity index 100% rename from src/meta/src/rpc/service/cluster_service.rs rename to src/meta/service/src/cluster_service.rs diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/service/src/ddl_service.rs similarity index 100% rename from src/meta/src/rpc/service/ddl_service.rs rename to src/meta/service/src/ddl_service.rs diff --git a/src/meta/src/rpc/service/health_service.rs b/src/meta/service/src/health_service.rs similarity index 93% rename from src/meta/src/rpc/service/health_service.rs rename to src/meta/service/src/health_service.rs index bdb01c1ef0760..338091a72de38 100644 --- a/src/meta/src/rpc/service/health_service.rs +++ b/src/meta/service/src/health_service.rs @@ -19,6 +19,12 @@ use tonic::{Request, Response, Status}; pub struct HealthServiceImpl {} +impl Default for HealthServiceImpl { + fn default() -> Self { + Self::new() + } +} + impl HealthServiceImpl { pub fn new() -> Self { Self {} diff --git a/src/meta/src/rpc/service/heartbeat_service.rs b/src/meta/service/src/heartbeat_service.rs similarity index 100% rename from src/meta/src/rpc/service/heartbeat_service.rs rename to src/meta/service/src/heartbeat_service.rs diff --git a/src/meta/src/rpc/service/hummock_service.rs b/src/meta/service/src/hummock_service.rs similarity index 99% rename from src/meta/src/rpc/service/hummock_service.rs rename to src/meta/service/src/hummock_service.rs index 3ae90421d2d87..74dc37b82d21e 100644 --- a/src/meta/src/rpc/service/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -27,7 +27,7 @@ use tonic::{Request, Response, Status, Streaming}; use crate::hummock::compaction::selector::ManualCompactionOption; use crate::hummock::{HummockManagerRef, VacuumManagerRef}; use crate::manager::FragmentManagerRef; -use crate::rpc::service::RwReceiverStream; +use crate::RwReceiverStream; pub struct HummockServiceImpl { hummock_manager: HummockManagerRef, vacuum_manager: VacuumManagerRef, diff --git a/src/meta/src/rpc/service/mod.rs b/src/meta/service/src/lib.rs similarity index 71% rename from src/meta/src/rpc/service/mod.rs rename to src/meta/service/src/lib.rs index 4484a8ca68a88..0d473a6ed031f 100644 --- a/src/meta/src/rpc/service/mod.rs +++ b/src/meta/service/src/lib.rs @@ -12,6 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(lint_reasons)] +#![feature(let_chains)] +#![feature(lazy_cell)] +#![feature(impl_trait_in_assoc_type)] +#![cfg_attr(coverage, feature(no_coverage))] + +use risingwave_meta::*; + pub mod backup_service; pub mod cloud_service; pub mod cluster_service; @@ -59,3 +67,25 @@ impl Stream for RwReceiverStream { .map(|opt| opt.map(|res| res.map_err(Into::into))) } } + +use std::net::SocketAddr; + +#[derive(Clone)] +pub struct AddressInfo { + pub advertise_addr: String, + pub listen_addr: SocketAddr, + pub prometheus_addr: Option, + pub dashboard_addr: Option, + pub ui_path: Option, +} +impl Default for AddressInfo { + fn default() -> Self { + Self { + advertise_addr: "".to_string(), + listen_addr: SocketAddr::V4("127.0.0.1:0000".parse().unwrap()), + prometheus_addr: None, + dashboard_addr: None, + ui_path: None, + } + } +} diff --git a/src/meta/src/rpc/service/meta_member_service.rs b/src/meta/service/src/meta_member_service.rs similarity index 97% rename from src/meta/src/rpc/service/meta_member_service.rs rename to src/meta/service/src/meta_member_service.rs index 6fb138b535410..25c4c7ad4cc84 100644 --- a/src/meta/src/rpc/service/meta_member_service.rs +++ b/src/meta/service/src/meta_member_service.rs @@ -14,13 +14,13 @@ use either::Either; use risingwave_common::util::addr::HostAddr; +use risingwave_meta::rpc::ElectionClientRef; use risingwave_pb::common::HostAddress; use risingwave_pb::meta::meta_member_service_server::MetaMemberService; use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember}; use tonic::{Request, Response, Status}; -use crate::rpc::server::{AddressInfo, ElectionClientRef}; - +use crate::AddressInfo; #[derive(Clone)] pub struct MetaMemberServiceImpl { election_client_or_self: Either, diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/service/src/notification_service.rs similarity index 100% rename from src/meta/src/rpc/service/notification_service.rs rename to src/meta/service/src/notification_service.rs diff --git a/src/meta/src/rpc/service/scale_service.rs b/src/meta/service/src/scale_service.rs similarity index 100% rename from src/meta/src/rpc/service/scale_service.rs rename to src/meta/service/src/scale_service.rs diff --git a/src/meta/src/rpc/service/serving_service.rs b/src/meta/service/src/serving_service.rs similarity index 100% rename from src/meta/src/rpc/service/serving_service.rs rename to src/meta/service/src/serving_service.rs diff --git a/src/meta/src/rpc/service/sink_coordination_service.rs b/src/meta/service/src/sink_coordination_service.rs similarity index 92% rename from src/meta/src/rpc/service/sink_coordination_service.rs rename to src/meta/service/src/sink_coordination_service.rs index f7d56af9c063f..72c4cb2ff9af4 100644 --- a/src/meta/src/rpc/service/sink_coordination_service.rs +++ b/src/meta/service/src/sink_coordination_service.rs @@ -20,12 +20,12 @@ use tonic::{Request, Response, Status, Streaming}; use crate::manager::sink_coordination::SinkCoordinatorManager; #[derive(Clone)] -pub(crate) struct SinkCoordinationServiceImpl { +pub struct SinkCoordinationServiceImpl { sink_manager: SinkCoordinatorManager, } impl SinkCoordinationServiceImpl { - pub(crate) fn new(sink_manager: SinkCoordinatorManager) -> Self { + pub fn new(sink_manager: SinkCoordinatorManager) -> Self { Self { sink_manager } } } diff --git a/src/meta/src/rpc/service/stream_service.rs b/src/meta/service/src/stream_service.rs similarity index 100% rename from src/meta/src/rpc/service/stream_service.rs rename to src/meta/service/src/stream_service.rs diff --git a/src/meta/src/rpc/service/system_params_service.rs b/src/meta/service/src/system_params_service.rs similarity index 100% rename from src/meta/src/rpc/service/system_params_service.rs rename to src/meta/service/src/system_params_service.rs diff --git a/src/meta/src/rpc/service/telemetry_service.rs b/src/meta/service/src/telemetry_service.rs similarity index 100% rename from src/meta/src/rpc/service/telemetry_service.rs rename to src/meta/service/src/telemetry_service.rs diff --git a/src/meta/src/rpc/service/user_service.rs b/src/meta/service/src/user_service.rs similarity index 100% rename from src/meta/src/rpc/service/user_service.rs rename to src/meta/service/src/user_service.rs diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 262da65f19b0c..8924992c2e18e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -139,7 +139,7 @@ pub struct GlobalBarrierManager { cluster_manager: ClusterManagerRef, - pub(crate) catalog_manager: CatalogManagerRef, + pub catalog_manager: CatalogManagerRef, fragment_manager: FragmentManagerRef, @@ -151,7 +151,7 @@ pub struct GlobalBarrierManager { metrics: Arc, - pub(crate) env: MetaSrvEnv, + pub env: MetaSrvEnv, tracker: Mutex, } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index bce901cd6f459..b3b57f2be58f1 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -107,7 +107,7 @@ impl GlobalBarrierManager { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub(crate) async fn recovery( + pub async fn recovery( &self, prev_epoch: TracedEpoch, paused_reason: Option, diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 7c9fefd15606b..c4718d97d40f6 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -393,7 +393,7 @@ impl ScheduledBarriers { } /// Make the `checkpoint` of the next barrier must be true - pub(crate) fn force_checkpoint_in_next_barrier(&self) { + pub fn force_checkpoint_in_next_barrier(&self) { self.inner.force_checkpoint.store(true, Ordering::Relaxed) } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 74f01497cc048..3fcb6a7d004ae 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -37,7 +37,7 @@ impl From for MetaError { #[derive(Clone)] pub struct SqlMetaStore { - pub(crate) conn: DatabaseConnection, + pub conn: DatabaseConnection, } impl SqlMetaStore { diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 246b00af771af..a056414034243 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -43,8 +43,8 @@ use crate::hummock::level_handler::LevelHandler; use crate::hummock::model::CompactionGroup; pub struct CompactStatus { - pub(crate) compaction_group_id: CompactionGroupId, - pub(crate) level_handlers: Vec, + pub compaction_group_id: CompactionGroupId, + pub level_handlers: Vec, } impl Debug for CompactStatus { diff --git a/src/meta/src/hummock/compaction/selector/mod.rs b/src/meta/src/hummock/compaction/selector/mod.rs index 1bc49afc55653..48d237eb5cf2f 100644 --- a/src/meta/src/hummock/compaction/selector/mod.rs +++ b/src/meta/src/hummock/compaction/selector/mod.rs @@ -335,10 +335,7 @@ pub mod tests { l0 } - pub(crate) fn assert_compaction_task( - compact_task: &CompactionTask, - level_handlers: &[LevelHandler], - ) { + pub fn assert_compaction_task(compact_task: &CompactionTask, level_handlers: &[LevelHandler]) { for i in &compact_task.input.input_levels { for t in &i.table_infos { assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id)); diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 4e6bb094d5a59..6aa64292b9db1 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -36,7 +36,7 @@ const HUMMOCK_INIT_FLAG_KEY: &[u8] = b"hummock_init_flag"; impl HummockManager { /// # Panics /// if checkpoint is not found. - pub(crate) async fn read_checkpoint(&self) -> Result { + pub async fn read_checkpoint(&self) -> Result { use prost::Message; let data = match self .object_store @@ -173,23 +173,23 @@ impl HummockManager { .map_err(Into::into) } - pub(crate) fn pause_version_checkpoint(&self) { + pub fn pause_version_checkpoint(&self) { self.pause_version_checkpoint.store(true, Ordering::Relaxed); tracing::info!("hummock version checkpoint is paused."); } - pub(crate) fn resume_version_checkpoint(&self) { + pub fn resume_version_checkpoint(&self) { self.pause_version_checkpoint .store(false, Ordering::Relaxed); tracing::info!("hummock version checkpoint is resumed."); } - pub(crate) fn is_version_checkpoint_paused(&self) -> bool { + pub fn is_version_checkpoint_paused(&self) -> bool { self.pause_version_checkpoint.load(Ordering::Relaxed) } #[named] - pub(crate) async fn get_checkpoint_version(&self) -> HummockVersion { + pub async fn get_checkpoint_version(&self) -> HummockVersion { let versioning_guard = read_lock!(self, versioning).await; versioning_guard .checkpoint diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 21751bb968421..b069a31ce5bd3 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -112,7 +112,7 @@ impl HummockManager { Ok(invalid_context_ids) } - pub(crate) async fn commit_epoch_sanity_check( + pub async fn commit_epoch_sanity_check( &self, epoch: HummockEpoch, sstables: &Vec, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 3a99dd898d1c9..d07337ca47e2b 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -253,7 +253,7 @@ pub enum CompactionResumeTrigger { } impl HummockManager { - pub(crate) async fn new( + pub async fn new( env: MetaSrvEnv, cluster_manager: ClusterManagerRef, fragment_manager: FragmentManagerRef, diff --git a/src/meta/src/hummock/manager/worker.rs b/src/meta/src/hummock/manager/worker.rs index 9f9b0fd911bd4..bc2103635b59f 100644 --- a/src/meta/src/hummock/manager/worker.rs +++ b/src/meta/src/hummock/manager/worker.rs @@ -34,7 +34,7 @@ pub enum HummockManagerEvent { } impl HummockManager { - pub(crate) async fn start_worker( + pub async fn start_worker( self: &HummockManagerRef, mut receiver: HummockManagerEventReceiver, ) -> JoinHandle<()> { diff --git a/src/meta/src/hummock/model/compaction_group_config.rs b/src/meta/src/hummock/model/compaction_group_config.rs index 8331abac62017..fa1bd1f88b3bd 100644 --- a/src/meta/src/hummock/model/compaction_group_config.rs +++ b/src/meta/src/hummock/model/compaction_group_config.rs @@ -23,8 +23,8 @@ use crate::model::{MetadataModel, MetadataModelResult}; #[derive(Debug, Clone, PartialEq)] pub struct CompactionGroup { - pub(crate) group_id: CompactionGroupId, - pub(crate) compaction_config: Arc, + pub group_id: CompactionGroupId, + pub compaction_config: Arc, } impl CompactionGroup { diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index a5c52d5e3e0f3..afe66d27ad8e8 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -14,12 +14,10 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(type_alias_impl_trait)] -#![feature(extract_if)] -#![feature(custom_test_frameworks)] #![feature(lint_reasons)] #![feature(map_try_insert)] +#![feature(extract_if)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] #![feature(result_option_inspect)] @@ -29,6 +27,7 @@ #![feature(assert_matches)] #![feature(try_blocks)] #![cfg_attr(coverage, feature(no_coverage))] +#![feature(custom_test_frameworks)] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] #![feature(is_sorted)] #![feature(impl_trait_in_assoc_type)] @@ -36,341 +35,31 @@ #![feature(async_fn_in_trait)] pub mod backup_restore; -mod barrier; +pub mod barrier; pub mod controller; #[cfg(not(madsim))] // no need in simulation test -mod dashboard; -mod error; +pub mod dashboard; +pub mod error; pub mod hummock; pub mod manager; pub mod model; pub mod model_v2; -mod rpc; -pub(crate) mod serving; +pub mod rpc; +pub mod serving; pub mod storage; -mod stream; -pub(crate) mod telemetry; - -use std::time::Duration; +pub mod stream; +pub mod telemetry; -use clap::Parser; pub use error::{MetaError, MetaResult}; -use risingwave_common::config::OverrideConfig; -use risingwave_common::util::resource_util; -use risingwave_common::{GIT_SHA, RW_VERSION}; -use risingwave_common_heap_profiling::HeapProfiler; pub use rpc::{ElectionClient, ElectionMember, EtcdElectionClient}; use crate::manager::MetaOpts; -use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend, MetaStoreSqlBackend}; - -#[derive(Debug, Clone, Parser, OverrideConfig)] -#[command(version, about = "The central metadata management service")] -pub struct MetaNodeOpts { - #[clap(long, env = "RW_VPC_ID")] - vpc_id: Option, - - #[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")] - security_group_id: Option, - - #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")] - listen_addr: String, - - /// The address for contacting this instance of the service. - /// This would be synonymous with the service's "public address" - /// or "identifying address". - /// It will serve as a unique identifier in cluster - /// membership and leader election. Must be specified for etcd backend. - #[clap(long, env = "RW_ADVERTISE_ADDR")] - advertise_addr: String, - - #[clap(long, env = "RW_DASHBOARD_HOST")] - dashboard_host: Option, - - #[clap(long, env = "RW_PROMETHEUS_HOST")] - prometheus_host: Option, - - #[clap(long, env = "RW_ETCD_ENDPOINTS", default_value_t = String::from(""))] - etcd_endpoints: String, - - /// Enable authentication with etcd. By default disabled. - #[clap(long, env = "RW_ETCD_AUTH")] - etcd_auth: bool, - - /// Username of etcd, required when --etcd-auth is enabled. - #[clap(long, env = "RW_ETCD_USERNAME", default_value = "")] - etcd_username: String, - - /// Password of etcd, required when --etcd-auth is enabled. - #[clap(long, env = "RW_ETCD_PASSWORD", default_value = "")] - etcd_password: String, - - /// Endpoint of the SQL service, make it non-option when SQL service is required. - #[clap(long, env = "RW_SQL_ENDPOINT")] - sql_endpoint: Option, - - #[clap(long, env = "RW_DASHBOARD_UI_PATH")] - dashboard_ui_path: Option, - - /// For dashboard service to fetch cluster info. - #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")] - prometheus_endpoint: Option, - - /// Endpoint of the connector node, there will be a sidecar connector node - /// colocated with Meta node in the cloud environment - #[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")] - pub connector_rpc_endpoint: Option, - - /// Default tag for the endpoint created when creating a privatelink connection. - /// Will be appended to the tags specified in the `tags` field in with clause in `create - /// connection`. - #[clap(long, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")] - pub privatelink_endpoint_default_tags: Option, - - /// The path of `risingwave.toml` configuration file. - /// - /// If empty, default configuration values will be used. - #[clap(long, env = "RW_CONFIG_PATH", default_value = "")] - pub config_path: String, - - #[clap(long, env = "RW_BACKEND", value_enum)] - #[override_opts(path = meta.backend)] - backend: Option, - - /// The interval of periodic barrier. - #[clap(long, env = "RW_BARRIER_INTERVAL_MS")] - #[override_opts(path = system.barrier_interval_ms)] - barrier_interval_ms: Option, - - /// Target size of the Sstable. - #[clap(long, env = "RW_SSTABLE_SIZE_MB")] - #[override_opts(path = system.sstable_size_mb)] - sstable_size_mb: Option, - - /// Size of each block in bytes in SST. - #[clap(long, env = "RW_BLOCK_SIZE_KB")] - #[override_opts(path = system.block_size_kb)] - block_size_kb: Option, - - /// False positive probability of bloom filter. - #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")] - #[override_opts(path = system.bloom_false_positive)] - bloom_false_positive: Option, - - /// State store url - #[clap(long, env = "RW_STATE_STORE")] - #[override_opts(path = system.state_store)] - state_store: Option, - - /// Remote directory for storing data and metadata objects. - #[clap(long, env = "RW_DATA_DIRECTORY")] - #[override_opts(path = system.data_directory)] - data_directory: Option, - - /// Whether config object storage bucket lifecycle to purge stale data. - #[clap(long, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")] - #[override_opts(path = meta.do_not_config_object_storage_lifecycle)] - do_not_config_object_storage_lifecycle: Option, - - /// Remote storage url for storing snapshots. - #[clap(long, env = "RW_BACKUP_STORAGE_URL")] - #[override_opts(path = system.backup_storage_url)] - backup_storage_url: Option, - - /// Remote directory for storing snapshots. - #[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")] - #[override_opts(path = system.backup_storage_directory)] - backup_storage_directory: Option, - - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_read_timeout_ms)] - pub object_store_streaming_read_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_upload_timeout_ms)] - pub object_store_streaming_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_upload_timeout_ms)] - pub object_store_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_read_timeout_ms)] - pub object_store_read_timeout_ms: Option, - - /// Enable heap profile dump when memory usage is high. - #[clap(long, env = "RW_HEAP_PROFILING_DIR")] - #[override_opts(path = server.heap_profiling.dir)] - pub heap_profiling_dir: Option, -} - -use std::future::Future; -use std::pin::Pin; - -use risingwave_common::config::{load_config, MetaBackend, RwConfig}; -use tracing::info; - -/// Start meta node -pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { - // WARNING: don't change the function signature. Making it `async fn` will cause - // slow compile in release mode. - Box::pin(async move { - info!("Starting meta node"); - info!("> options: {:?}", opts); - let config = load_config(&opts.config_path, &opts); - info!("> config: {:?}", config); - info!("> version: {} ({})", RW_VERSION, GIT_SHA); - let listen_addr = opts.listen_addr.parse().unwrap(); - let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); - let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap()); - let backend = match config.meta.backend { - MetaBackend::Etcd => MetaStoreBackend::Etcd { - endpoints: opts - .etcd_endpoints - .split(',') - .map(|x| x.to_string()) - .collect(), - credentials: match opts.etcd_auth { - true => Some((opts.etcd_username, opts.etcd_password)), - false => None, - }, - }, - MetaBackend::Mem => MetaStoreBackend::Mem, - }; - let sql_backend = opts - .sql_endpoint - .map(|endpoint| MetaStoreSqlBackend { endpoint }); - - validate_config(&config); - - let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); - let heap_profiler = - HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); - // Run a background heap profiler - heap_profiler.start(); - - let max_heartbeat_interval = - Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64); - let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000; - let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums; - let privatelink_endpoint_default_tags = - opts.privatelink_endpoint_default_tags.map(|tags| { - tags.split(',') - .map(|s| { - let key_val = s.split_once('=').unwrap(); - (key_val.0.to_string(), key_val.1.to_string()) - }) - .collect() - }); - - let add_info = AddressInfo { - advertise_addr: opts.advertise_addr, - listen_addr, - prometheus_addr, - dashboard_addr, - ui_path: opts.dashboard_ui_path, - }; - - let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve( - add_info, - backend, - sql_backend, - max_heartbeat_interval, - config.meta.meta_leader_lease_secs, - MetaOpts { - enable_recovery: !config.meta.disable_recovery, - in_flight_barrier_nums, - max_idle_ms, - compaction_deterministic_test: config.meta.enable_compaction_deterministic, - default_parallelism: config.meta.default_parallelism, - vacuum_interval_sec: config.meta.vacuum_interval_sec, - vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms, - hummock_version_checkpoint_interval_sec: config - .meta - .hummock_version_checkpoint_interval_sec, - min_delta_log_num_for_hummock_version_checkpoint: config - .meta - .min_delta_log_num_for_hummock_version_checkpoint, - min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec, - full_gc_interval_sec: config.meta.full_gc_interval_sec, - collect_gc_watermark_spin_interval_sec: config - .meta - .collect_gc_watermark_spin_interval_sec, - enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check, - periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec, - node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec, - prometheus_endpoint: opts.prometheus_endpoint, - vpc_id: opts.vpc_id, - security_group_id: opts.security_group_id, - connector_rpc_endpoint: opts.connector_rpc_endpoint, - privatelink_endpoint_default_tags, - periodic_space_reclaim_compaction_interval_sec: config - .meta - .periodic_space_reclaim_compaction_interval_sec, - telemetry_enabled: config.server.telemetry_enabled, - periodic_ttl_reclaim_compaction_interval_sec: config - .meta - .periodic_ttl_reclaim_compaction_interval_sec, - periodic_tombstone_reclaim_compaction_interval_sec: config - .meta - .periodic_tombstone_reclaim_compaction_interval_sec, - periodic_split_compact_group_interval_sec: config - .meta - .periodic_split_compact_group_interval_sec, - split_group_size_limit: config.meta.split_group_size_limit, - min_table_split_size: config.meta.move_table_size_limit, - table_write_throughput_threshold: config.meta.table_write_throughput_threshold, - min_table_split_write_throughput: config.meta.min_table_split_write_throughput, - partition_vnode_count: config.meta.partition_vnode_count, - do_not_config_object_storage_lifecycle: config - .meta - .do_not_config_object_storage_lifecycle, - compaction_task_max_heartbeat_interval_secs: config - .meta - .compaction_task_max_heartbeat_interval_secs, - compaction_config: Some(config.meta.compaction_config), - }, - config.system.into_init_system_params(), - ) - .await - .unwrap(); - - tracing::info!("Meta server listening at {}", listen_addr); - - match leader_lost_handle { - None => { - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::info!("receive ctrl+c"); - shutdown_send.send(()).unwrap(); - join_handle.await.unwrap() - } - res = &mut join_handle => res.unwrap(), - }; - } - Some(mut handle) => { - tokio::select! { - _ = &mut handle => { - tracing::info!("receive leader lost signal"); - // When we lose leadership, we will exit as soon as possible. - } - _ = tokio::signal::ctrl_c() => { - tracing::info!("receive ctrl+c"); - shutdown_send.send(()).unwrap(); - join_handle.await.unwrap(); - handle.abort(); - } - res = &mut join_handle => { - res.unwrap(); - handle.abort(); - }, - }; - } - }; - }) -} -fn validate_config(config: &RwConfig) { - if config.meta.meta_leader_lease_secs <= 2 { - let error_msg = "meta leader lease secs should be larger than 2"; - tracing::error!(error_msg); - panic!("{}", error_msg); - } +#[derive(Debug)] +pub enum MetaStoreBackend { + Etcd { + endpoints: Vec, + credentials: Option<(String, String)>, + }, + Mem, } diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index 760c24e0df7bc..35642ed0ec143 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -18,20 +18,17 @@ mod env; mod id; mod idle; mod notification; -pub(crate) mod sink_coordination; +pub mod sink_coordination; mod streaming_job; mod system_param; -pub(crate) use catalog::*; -pub use cluster::WorkerKey; -pub(crate) use cluster::*; -pub use env::MetaSrvEnv; -pub(crate) use env::*; -pub(crate) use id::*; -pub(crate) use idle::*; -pub(crate) use notification::*; -pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef}; -pub(crate) use streaming_job::*; -pub(crate) use system_param::*; +pub use catalog::*; +pub use cluster::{WorkerKey, *}; +pub use env::{MetaSrvEnv, *}; +pub use id::*; +pub use idle::*; +pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *}; +pub use streaming_job::*; +pub use system_param::*; pub use super::model_v2::prelude; diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 0185fdc4f4630..79f4f5b753aa2 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -46,7 +46,7 @@ macro_rules! send_await_with_err_check { }; } -pub(crate) struct CoordinatorWorker { +pub struct CoordinatorWorker { param: SinkParam, request_streams: Vec, response_senders: Vec, @@ -54,7 +54,7 @@ pub(crate) struct CoordinatorWorker { } impl CoordinatorWorker { - pub(crate) async fn run( + pub async fn run( first_writer_request: NewSinkWriterRequest, request_rx: UnboundedReceiver, ) { @@ -91,7 +91,7 @@ impl CoordinatorWorker { }); } - pub(crate) async fn execute_coordinator( + pub async fn execute_coordinator( first_writer_request: NewSinkWriterRequest, request_rx: UnboundedReceiver, coordinator: impl SinkCommitCoordinator, diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index a7d15bd452f34..720a698fa8e72 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -69,7 +69,7 @@ pub struct SinkCoordinatorManager { } impl SinkCoordinatorManager { - pub(crate) fn start_worker() -> (Self, (JoinHandle<()>, Sender<()>)) { + pub fn start_worker() -> (Self, (JoinHandle<()>, Sender<()>)) { Self::start_worker_with_spawn_worker(|writer_request, manager_request_stream| { tokio::spawn(CoordinatorWorker::run( writer_request, @@ -91,7 +91,7 @@ impl SinkCoordinatorManager { ) } - pub(crate) async fn handle_new_request( + pub async fn handle_new_request( &self, mut request_stream: SinkWriterRequestStream, ) -> Result>, Status> { @@ -143,11 +143,11 @@ impl SinkCoordinatorManager { info!("successfully stop coordinator: {:?}", sink_id); } - pub(crate) async fn reset(&self) { + pub async fn reset(&self) { self.stop_coordinator(None).await; } - pub(crate) async fn stop_sink_coordinator(&self, sink_id: SinkId) { + pub async fn stop_sink_coordinator(&self, sink_id: SinkId) { self.stop_coordinator(Some(sink_id)).await; } } diff --git a/src/meta/src/manager/sink_coordination/mod.rs b/src/meta/src/manager/sink_coordination/mod.rs index 30786c8721e97..fe861e2175343 100644 --- a/src/meta/src/manager/sink_coordination/mod.rs +++ b/src/meta/src/manager/sink_coordination/mod.rs @@ -16,19 +16,19 @@ mod coordinator_worker; mod manager; use futures::stream::BoxStream; -pub(crate) use manager::SinkCoordinatorManager; +pub use manager::SinkCoordinatorManager; use risingwave_common::buffer::Bitmap; use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse}; use tokio::sync::mpsc::Sender; use tonic::Status; -pub(crate) type SinkWriterRequestStream = BoxStream<'static, Result>; -pub(crate) type SinkCoordinatorResponseSender = Sender>; +pub type SinkWriterRequestStream = BoxStream<'static, Result>; +pub type SinkCoordinatorResponseSender = Sender>; -pub(crate) struct NewSinkWriterRequest { - pub(crate) request_stream: SinkWriterRequestStream, - pub(crate) response_tx: SinkCoordinatorResponseSender, - pub(crate) param: SinkParam, - pub(crate) vnode_bitmap: Bitmap, +pub struct NewSinkWriterRequest { + pub request_stream: SinkWriterRequestStream, + pub response_tx: SinkCoordinatorResponseSender, + pub param: SinkParam, + pub vnode_bitmap: Bitmap, } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 6b3e71fe20092..611de4120a787 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -31,7 +31,7 @@ pub enum StreamingJob { } impl StreamingJob { - pub(crate) fn mark_created(&mut self) { + pub fn mark_created(&mut self) { let created_at_epoch = Some(Epoch::now().0); match self { StreamingJob::MaterializedView(table) => table.created_at_epoch = created_at_epoch, @@ -48,7 +48,7 @@ impl StreamingJob { } } - pub(crate) fn mark_initialized(&mut self) { + pub fn mark_initialized(&mut self) { let initialized_at_epoch = Some(Epoch::now().0); match self { StreamingJob::MaterializedView(table) => { diff --git a/src/meta/src/model/cluster.rs b/src/meta/src/model/cluster.rs index 882f48b6dc8c4..3d654a1d6b8c9 100644 --- a/src/meta/src/model/cluster.rs +++ b/src/meta/src/model/cluster.rs @@ -128,8 +128,14 @@ const CLUSTER_ID_KEY: &[u8] = "cluster_id".as_bytes(); #[derive(Clone, Debug)] pub struct ClusterId(String); +impl Default for ClusterId { + fn default() -> Self { + Self::new() + } +} + impl ClusterId { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self(Uuid::new_v4().to_string()) } @@ -139,15 +145,13 @@ impl ClusterId { )) } - pub(crate) async fn from_meta_store( + pub async fn from_meta_store( meta_store: &S, ) -> MetadataModelResult> { Self::from_snapshot::(&meta_store.snapshot().await).await } - pub(crate) async fn from_snapshot( - s: &S::Snapshot, - ) -> MetadataModelResult> { + pub async fn from_snapshot(s: &S::Snapshot) -> MetadataModelResult> { match s.get_cf(CLUSTER_ID_CF_NAME, CLUSTER_ID_KEY).await { Ok(bytes) => Ok(Some(Self::from_bytes(bytes)?)), Err(e) => match e { @@ -157,10 +161,7 @@ impl ClusterId { } } - pub(crate) async fn put_at_meta_store( - &self, - meta_store: &S, - ) -> MetadataModelResult<()> { + pub async fn put_at_meta_store(&self, meta_store: &S) -> MetadataModelResult<()> { Ok(meta_store .put_cf( CLUSTER_ID_CF_NAME, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 5dd8f53e249b0..10fe5abe8aeaa 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -48,22 +48,22 @@ pub struct TableFragments { state: State, /// The table fragments. - pub(crate) fragments: BTreeMap, + pub fragments: BTreeMap, /// The status of actors - pub(crate) actor_status: BTreeMap, + pub actor_status: BTreeMap, /// The splits of actors - pub(crate) actor_splits: HashMap>, + pub actor_splits: HashMap>, /// The environment associated with this stream plan and its fragments - pub(crate) env: StreamEnvironment, + pub env: StreamEnvironment, } #[derive(Debug, Clone, Default)] pub struct StreamEnvironment { /// The timezone used to interpret timestamps and dates for conversion - pub(crate) timezone: Option, + pub timezone: Option, } impl StreamEnvironment { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3ad5df55a997c..722cce94b866a 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -178,7 +178,7 @@ impl CreatingStreamingJobPermit { } impl DdlController { - pub(crate) async fn new( + pub async fn new( env: MetaSrvEnv, catalog_manager: CatalogManagerRef, stream_manager: GlobalStreamManagerRef, @@ -217,7 +217,7 @@ impl DdlController { /// has been interrupted during executing, the request will be cancelled by tonic. Since we have /// a lot of logic for revert, status management, notification and so on, ensuring consistency /// would be a huge hassle and pain if we don't spawn here. - pub(crate) async fn run_command(&self, command: DdlCommand) -> MetaResult { + pub async fn run_command(&self, command: DdlCommand) -> MetaResult { self.check_barrier_manager_status().await?; let ctrl = self.clone(); let fut = async move { @@ -263,7 +263,7 @@ impl DdlController { tokio::spawn(fut).await.unwrap() } - pub(crate) async fn get_ddl_progress(&self) -> Vec { + pub async fn get_ddl_progress(&self) -> Vec { self.barrier_manager.get_ddl_progress().await } diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index b6bd02b179c26..af8081f170089 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -30,13 +30,13 @@ pub struct SqlBackendElectionClient { } #[derive(sqlx::FromRow, Debug)] -pub(crate) struct ElectionRow { +pub struct ElectionRow { service: String, id: String, } #[async_trait::async_trait] -pub(crate) trait SqlDriver: Send + Sync + 'static { +pub trait SqlDriver: Send + Sync + 'static { async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()>; async fn try_campaign(&self, service_name: &str, id: &str, ttl: i64) @@ -48,7 +48,7 @@ pub(crate) trait SqlDriver: Send + Sync + 'static { async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()>; } -pub(crate) trait SqlDriverCommon { +pub trait SqlDriverCommon { const ELECTION_LEADER_TABLE_NAME: &'static str = "election_leader"; const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_members"; diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index f11bbe7f41138..3183007753cbd 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -37,7 +37,7 @@ use tokio::task::JoinHandle; use crate::hummock::HummockManagerRef; use crate::manager::{CatalogManagerRef, ClusterManagerRef, FragmentManagerRef}; -use crate::rpc::server::ElectionClientRef; +use crate::rpc::ElectionClientRef; #[derive(Clone)] pub struct MetaMetrics { @@ -690,7 +690,7 @@ impl Default for MetaMetrics { } } -pub async fn start_worker_info_monitor( +pub fn start_worker_info_monitor( cluster_manager: ClusterManagerRef, election_client: Option, interval: Duration, @@ -738,7 +738,7 @@ pub async fn start_worker_info_monitor( (join_handle, shutdown_tx) } -pub async fn start_fragment_info_monitor( +pub fn start_fragment_info_monitor( cluster_manager: ClusterManagerRef, catalog_manager: CatalogManagerRef, fragment_manager: FragmentManagerRef, diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 36380c4d2dafb..99f1b51eaafce 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -12,19 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod cloud_provider; +pub mod cloud_provider; pub mod ddl_controller; pub mod election; -mod intercept; +pub mod intercept; pub mod metrics; -pub mod server; -pub mod service; + +pub type ElectionClientRef = std::sync::Arc; pub use election::etcd::EtcdElectionClient; pub use election::{ElectionClient, ElectionMember}; -pub use service::cluster_service::ClusterServiceImpl; -pub use service::ddl_service::DdlServiceImpl; -pub use service::heartbeat_service::HeartbeatServiceImpl; -pub use service::hummock_service::HummockServiceImpl; -pub use service::notification_service::NotificationServiceImpl; -pub use service::stream_service::StreamServiceImpl; diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index f6d1a5b1aa714..521a8b9ad1c0d 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -103,7 +103,7 @@ fn to_deleted_fragment_parallel_unit_mapping( .collect() } -pub(crate) async fn on_meta_start( +pub async fn on_meta_start( notification_manager: NotificationManagerRef, cluster_manager: ClusterManagerRef, fragment_manager: FragmentManagerRef, @@ -126,7 +126,7 @@ pub(crate) async fn on_meta_start( ); } -pub(crate) async fn start_serving_vnode_mapping_worker( +pub async fn start_serving_vnode_mapping_worker( notification_manager: NotificationManagerRef, cluster_manager: ClusterManagerRef, fragment_manager: FragmentManagerRef, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index a125d61d91703..afe6186165e22 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -97,7 +97,7 @@ pub struct RescheduleOptions { pub resolve_no_shuffle_upstream: bool, } -pub(crate) struct RescheduleContext { +pub struct RescheduleContext { /// Index used to map `ParallelUnitId` to `WorkerId` parallel_unit_id_to_worker_id: BTreeMap, /// Meta information for all Actors @@ -171,7 +171,7 @@ impl RescheduleContext { /// assert to fail and should be skipped from the upper level. /// /// The return value is the bitmap distribution after scaling, which covers all virtual node indexes -pub(crate) fn rebalance_actor_vnode( +pub fn rebalance_actor_vnode( actors: &[StreamActor], actors_to_remove: &BTreeSet, actors_to_create: &BTreeSet, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index fa3159f6657ec..1cd666e5d7160 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -47,7 +47,7 @@ pub type SourceManagerRef = Arc; pub type SplitAssignment = HashMap>>; pub struct SourceManager { - pub(crate) paused: Mutex<()>, + pub paused: Mutex<()>, env: MetaSrvEnv, barrier_scheduler: BarrierScheduler, core: Mutex, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 46d6c5cb1ea22..44198f65635cb 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -159,26 +159,26 @@ pub struct ReplaceTableContext { /// `GlobalStreamManager` manages all the streams in the system. pub struct GlobalStreamManager { - pub(crate) env: MetaSrvEnv, + pub env: MetaSrvEnv, /// Manages definition and status of fragments and actors pub(super) fragment_manager: FragmentManagerRef, /// Broadcasts and collect barriers - pub(crate) barrier_scheduler: BarrierScheduler, + pub barrier_scheduler: BarrierScheduler, /// Maintains information of the cluster - pub(crate) cluster_manager: ClusterManagerRef, + pub cluster_manager: ClusterManagerRef, /// Maintains streaming sources from external system like kafka - pub(crate) source_manager: SourceManagerRef, + pub source_manager: SourceManagerRef, /// Creating streaming job info. creating_job_info: CreatingStreamingJobInfoRef, hummock_manager: HummockManagerRef, - pub(crate) reschedule_lock: RwLock<()>, + pub reschedule_lock: RwLock<()>, } impl GlobalStreamManager { diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs index 774b3cdda8146..fbbc89c2ff0ec 100644 --- a/src/meta/src/telemetry.rs +++ b/src/meta/src/telemetry.rs @@ -35,7 +35,7 @@ struct NodeCount { } #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct MetaTelemetryReport { +pub struct MetaTelemetryReport { #[serde(flatten)] base: TelemetryReportBase, node_count: NodeCount, @@ -45,12 +45,12 @@ pub(crate) struct MetaTelemetryReport { impl TelemetryReport for MetaTelemetryReport {} -pub(crate) struct MetaTelemetryInfoFetcher { +pub struct MetaTelemetryInfoFetcher { tracking_id: ClusterId, } impl MetaTelemetryInfoFetcher { - pub(crate) fn new(tracking_id: ClusterId) -> Self { + pub fn new(tracking_id: ClusterId) -> Self { Self { tracking_id } } } @@ -63,13 +63,13 @@ impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher { } #[derive(Clone)] -pub(crate) struct MetaReportCreator { +pub struct MetaReportCreator { cluster_mgr: Arc, meta_backend: MetaBackend, } impl MetaReportCreator { - pub(crate) fn new(cluster_mgr: Arc, meta_backend: MetaBackend) -> Self { + pub fn new(cluster_mgr: Arc, meta_backend: MetaBackend) -> Self { Self { cluster_mgr, meta_backend, @@ -79,6 +79,7 @@ impl MetaReportCreator { #[async_trait::async_trait] impl TelemetryReportCreator for MetaReportCreator { + #[expect(refining_impl_trait)] async fn create_report( &self, tracking_id: String, diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 6bbcbd2ebf3e2..3e744bb61608d 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -16,7 +16,6 @@ //! response gRPC message structs. #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(result_option_inspect)] #![feature(type_alias_impl_trait)] #![feature(associated_type_defaults)] diff --git a/src/source/Cargo.toml b/src/source/Cargo.toml index bf60bc45f7395..aedb0b9158908 100644 --- a/src/source/Cargo.toml +++ b/src/source/Cargo.toml @@ -15,7 +15,6 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -easy-ext = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } itertools = "0.11" diff --git a/src/source/src/lib.rs b/src/source/src/lib.rs index 1a4f5d5f47280..1a32888cdf651 100644 --- a/src/source/src/lib.rs +++ b/src/source/src/lib.rs @@ -14,7 +14,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] #![feature(result_option_inspect)] #![feature(generators)] diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 0797c462d61e5..f1022ab2fd935 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -14,7 +14,6 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -anyhow = "1" arc-swap = "1" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 330dfbc4de44c..3e0549db188a2 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -14,7 +14,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(binary_heap_drain_sorted)] #![feature(type_alias_impl_trait)] #![feature(extract_if)] #![feature(custom_test_frameworks)] diff --git a/src/storage/compactor/Cargo.toml b/src/storage/compactor/Cargo.toml index 54d48a88923a6..e6e985b2ba424 100644 --- a/src/storage/compactor/Cargo.toml +++ b/src/storage/compactor/Cargo.toml @@ -15,12 +15,10 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -anyhow = "1" async-trait = "0.1" await-tree = { workspace = true } clap = { version = "4", features = ["derive"] } parking_lot = "0.12" -prometheus = { version = "0.13" } risingwave_common = { workspace = true } risingwave_common_heap_profiling = { workspace = true } risingwave_common_service = { workspace = true } @@ -29,7 +27,6 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_storage = { workspace = true } serde = { version = "1", features = ["derive"] } -serde_json = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", "rt", diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 72b925170a6ef..c5ffe656ab893 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -14,7 +14,6 @@ #![feature(allocator_api)] #![feature(arc_unwrap_or_clone)] -#![feature(binary_heap_drain_sorted)] #![feature(bound_as_ref)] #![feature(bound_map)] #![feature(custom_test_frameworks)] diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 9039f34d89a5d..9e9e77b92ceec 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -21,7 +21,6 @@ async-stream = "0.3" async-trait = "0.1" await-tree = { workspace = true } bytes = "1" -dyn-clone = "1" educe = "0.4" either = "1" enum-as-inner = "0.6" @@ -33,16 +32,13 @@ governor = { version = "0.6", default-features = false, features = [ "jitter", ] } hytra = "0.1.2" -iter-chunks = "0.1" itertools = "0.11" local_stats_alloc = { path = "../utils/local_stats_alloc" } lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } maplit = "1.0.2" memcomparable = "0.2" multimap = "0.9" -num-traits = "0.2" parking_lot = "0.12" -parse-display = "0.8" pin-project = "1" prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true } @@ -57,7 +53,6 @@ risingwave_source = { workspace = true } risingwave_storage = { workspace = true } serde_json = "1" smallvec = "1" -spin = "0.9" static_assertions = "1" thiserror = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index afa1d3faeee22..5a68b1b712b26 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -18,7 +18,6 @@ #![feature(type_alias_impl_trait)] #![feature(more_qualified_paths)] #![feature(lint_reasons)] -#![feature(binary_heap_drain_sorted)] #![feature(let_chains)] #![feature(hash_extract_if)] #![feature(extract_if)] diff --git a/src/tests/compaction_test/Cargo.toml b/src/tests/compaction_test/Cargo.toml index dd3e5d0a53699..87ad5946b26d5 100644 --- a/src/tests/compaction_test/Cargo.toml +++ b/src/tests/compaction_test/Cargo.toml @@ -27,6 +27,7 @@ risingwave_compactor = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_hummock_test = { workspace = true } risingwave_meta = { workspace = true } +risingwave_meta_node = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 3e2f993cf9613..cf3e35b48c692 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -129,7 +129,7 @@ pub async fn compaction_test_main( } pub async fn start_meta_node(listen_addr: String, state_store: String, config_path: String) { - let meta_opts = risingwave_meta::MetaNodeOpts::parse_from([ + let meta_opts = risingwave_meta_node::MetaNodeOpts::parse_from([ "meta-node", "--listen-addr", &listen_addr, @@ -154,7 +154,7 @@ pub async fn start_meta_node(listen_addr: String, state_store: String, config_pa "enable_compaction_deterministic should be set" ); - risingwave_meta::start(meta_opts).await + risingwave_meta_node::start(meta_opts).await } async fn start_compactor_node( diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index c79c324ba51ec..b81be65edae42 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -37,7 +37,7 @@ risingwave_ctl = { workspace = true } risingwave_e2e_extended_mode_test = { path = "../e2e_extended_mode" } risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } -risingwave_meta = { workspace = true } +risingwave_meta_node = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 4eb60e7af14dc..6cc6168513cd4 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -255,7 +255,7 @@ impl Cluster { // meta node for i in 1..=conf.meta_nodes { - let opts = risingwave_meta::MetaNodeOpts::parse_from([ + let opts = risingwave_meta_node::MetaNodeOpts::parse_from([ "meta-node", "--config-path", conf.config_path.as_str(), @@ -276,7 +276,7 @@ impl Cluster { .create_node() .name(format!("meta-{i}")) .ip([192, 168, 1, i as u8].into()) - .init(move || risingwave_meta::start(opts.clone())) + .init(move || risingwave_meta_node::start(opts.clone())) .build(); } diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml index a977f35e67dd2..8bd4e49d808a9 100644 --- a/src/utils/runtime/Cargo.toml +++ b/src/utils/runtime/Cargo.toml @@ -16,10 +16,6 @@ normal = ["workspace-hack"] [dependencies] await-tree = { workspace = true } -chrono = { version = "0.4", default-features = false, features = [ - "clock", - "std", -] } console = "0.15" console-subscriber = "0.2.0" either = "1" @@ -29,7 +25,6 @@ opentelemetry-otlp = { version = "0.13" } opentelemetry-semantic-conventions = "0.12" parking_lot = { version = "0.12", features = ["deadlock_detection"] } pprof = { version = "0.13", features = ["flamegraph"] } -prometheus = { version = "0.13" } risingwave_common = { workspace = true } risingwave_variables = { workspace = true } rlimit = "0.10"