diff --git a/Cargo.lock b/Cargo.lock index c4c882ea9bde3..094e5b4e5a67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9117,6 +9117,7 @@ dependencies = [ "easy-ext", "either", "futures", + "http 0.2.9", "hyper", "itertools 0.12.0", "lru 0.10.1", @@ -9130,6 +9131,7 @@ dependencies = [ "risingwave_pb", "static_assertions", "thiserror", + "thiserror-ext", "tokio-retry", "tokio-stream", "tower", diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index d472430e34d7d..389731b08d11a 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -261,7 +261,11 @@ mod test { listen_addr: "127.0.0.1:8000", advertise_addr: None, prometheus_listener_addr: "127.0.0.1:1234", - meta_address: "http://127.0.0.1:5690", + meta_address: List( + [ + http://127.0.0.1:5690/, + ], + ), connector_rpc_endpoint: None, connector_rpc_sink_payload_format: None, config_path: "src/config/test.toml", @@ -281,7 +285,11 @@ mod test { listen_addr: "127.0.0.1:4566", advertise_addr: None, port: None, - meta_addr: "http://127.0.0.1:5690", + meta_addr: List( + [ + http://127.0.0.1:5690/, + ], + ), prometheus_listener_addr: "127.0.0.1:1234", health_check_listener_addr: "127.0.0.1:6786", config_path: "src/config/test.toml", diff --git a/src/common/src/util/meta_addr.rs b/src/common/src/util/meta_addr.rs new file mode 100644 index 0000000000000..f579be0a6e3c8 --- /dev/null +++ b/src/common/src/util/meta_addr.rs @@ -0,0 +1,126 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Formatter}; +use std::str::FromStr; + +use itertools::Itertools; + +const META_ADDRESS_LOAD_BALANCE_MODE_PREFIX: &str = "load-balance+"; + +/// The strategy for meta client to connect to meta node. +/// +/// Used in the command line argument `--meta-address`. +#[derive(Debug, Eq, PartialEq, Clone)] +pub enum MetaAddressStrategy { + LoadBalance(http::Uri), + List(Vec), +} + +/// Error type for parsing meta address strategy. +#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)] +pub enum MetaAddressStrategyParseError { + #[error("empty meta addresses")] + Empty, + #[error("there should be only one load balance address")] + MultipleLoadBalance, + #[error("failed to parse meta address `{1}`: {0}")] + UrlParse(#[source] http::uri::InvalidUri, String), +} + +impl FromStr for MetaAddressStrategy { + type Err = MetaAddressStrategyParseError; + + fn from_str(meta_addr: &str) -> Result { + if let Some(addr) = meta_addr.strip_prefix(META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) { + let addr = addr + .split(',') + .exactly_one() + .map_err(|_| MetaAddressStrategyParseError::MultipleLoadBalance)?; + + let uri = addr.parse().into_url_parse(addr)?; + + Ok(Self::LoadBalance(uri)) + } else { + let addrs = meta_addr.split(',').peekable(); + + let uris: Vec<_> = addrs + .map(|addr| addr.parse().into_url_parse(addr)) + .try_collect()?; + + if uris.is_empty() { + return Err(MetaAddressStrategyParseError::Empty); + } + + Ok(Self::List(uris)) + } + } +} + +impl fmt::Display for MetaAddressStrategy { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + MetaAddressStrategy::LoadBalance(addr) => { + write!(f, "{}{}", META_ADDRESS_LOAD_BALANCE_MODE_PREFIX, addr)?; + } + MetaAddressStrategy::List(addrs) => { + write!(f, "{}", addrs.iter().format(","))?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_meta_addr() { + let results = vec![ + ( + "load-balance+http://abc", + Some(MetaAddressStrategy::LoadBalance( + "http://abc".parse().unwrap(), + )), + ), + ("load-balance+http://abc,http://def", None), + ("", None), + ( + "http://abc", + Some(MetaAddressStrategy::List(vec!["http://abc" + .parse() + .unwrap()])), + ), + ( + "http://abc,http://def", + Some(MetaAddressStrategy::List(vec![ + "http://abc".parse().unwrap(), + "http://def".parse().unwrap(), + ])), + ), + ]; + for (addr, result) in results { + let parsed_result = addr.parse(); + match result { + None => { + assert!(parsed_result.is_err(), "{parsed_result:?}"); + } + Some(strategy) => { + assert_eq!(strategy, parsed_result.unwrap()); + } + } + } + } +} diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 8a47ad8b7c121..54522b724adc0 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -25,6 +25,7 @@ mod future_utils; pub mod hash_util; pub mod iter_util; pub mod memcmp_encoding; +pub mod meta_addr; pub mod panic; pub mod pretty_bytes; pub mod prost; diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 9694f1eb474fa..f4b1fa3e87b4d 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -35,6 +35,7 @@ use std::pin::Pin; use clap::{Parser, ValueEnum}; use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig}; +use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use serde::{Deserialize, Serialize}; @@ -71,7 +72,7 @@ pub struct ComputeNodeOpts { pub prometheus_listener_addr: String, #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] - pub meta_address: String, + pub meta_address: MetaAddressStrategy, /// Endpoint of the connector node #[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")] diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 76c7f251aa334..df2d51cf046d6 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -104,7 +104,7 @@ pub async fn compute_node_serve( // Register to the cluster. We're not ready to serve until activate is called. let (meta_client, system_params) = MetaClient::register_new( - &opts.meta_address, + opts.meta_address, WorkerType::ComputeNode, &advertise_addr, Property { diff --git a/src/ctl/src/common/meta_service.rs b/src/ctl/src/common/meta_service.rs index c437de4000d9f..a28ee4dfc1c51 100644 --- a/src/ctl/src/common/meta_service.rs +++ b/src/ctl/src/common/meta_service.rs @@ -56,7 +56,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'."; /// Create meta client from options, and register as rise-ctl worker pub async fn create_meta_client(&self) -> Result { let (client, _) = MetaClient::register_new( - &self.meta_addr, + self.meta_addr.parse()?, WorkerType::RiseCtl, &get_new_ctl_identity(), Property::default(), diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 4f78199bfc078..1555814320b27 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -55,6 +55,7 @@ mod scheduler; pub mod session; mod stream_fragmenter; use risingwave_common::config::{MetricLevel, OverrideConfig}; +use risingwave_common::util::meta_addr::MetaAddressStrategy; pub use stream_fragmenter::build_graph; mod utils; pub use utils::{explain_stream_graph, WithOptions}; @@ -101,7 +102,7 @@ pub struct FrontendOpts { /// The address via which we will attempt to connect to a leader meta node. #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] - pub meta_addr: String, + pub meta_addr: MetaAddressStrategy, #[clap( long, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index ba786e2bb34a3..6a535d062c77e 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -214,7 +214,7 @@ impl FrontendEnv { // Register in meta by calling `AddWorkerNode` RPC. let (meta_client, system_params_reader) = MetaClient::register_new( - opts.meta_addr.clone().as_str(), + opts.meta_addr, WorkerType::Frontend, &frontend_address, Default::default(), diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 72ac5433b9685..450bc894586ef 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -19,6 +19,7 @@ async-trait = "0.1" easy-ext = "1" either = "1.9.0" futures = { version = "0.3", default-features = false, features = ["alloc"] } +http = "0.2" hyper = "0.14" itertools = "0.12.0" lru = "0.10.1" @@ -30,6 +31,7 @@ risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } static_assertions = "1" thiserror = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index 8b10faab1f349..a544c98a9362e 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::error::{ErrorCode, RwError}; +use risingwave_common::util::meta_addr::MetaAddressStrategyParseError; use thiserror::Error; pub type Result = std::result::Result; @@ -28,6 +29,9 @@ pub enum RpcError { #[error(transparent)] GrpcStatus(Box), + #[error(transparent)] + MetaAddressParse(#[from] MetaAddressStrategyParseError), + #[error(transparent)] Internal( #[from] @@ -36,7 +40,8 @@ pub enum RpcError { ), } -static_assertions::const_assert_eq!(std::mem::size_of::(), 16); +// TODO: use `thiserror_ext::Box` +static_assertions::const_assert_eq!(std::mem::size_of::(), 32); impl From for RpcError { fn from(e: tonic::transport::Error) -> Self { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8ddb2caad18b9..07c5fa4d5d8e1 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -13,17 +13,16 @@ // limitations under the License. use std::collections::HashMap; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Display}; use std::num::NonZeroUsize; use std::sync::Arc; +use std::thread; use std::time::{Duration, SystemTime}; -use std::{fmt, thread}; use anyhow::anyhow; use async_trait::async_trait; use either::Either; use futures::stream::BoxStream; -use itertools::Itertools; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; @@ -32,6 +31,7 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::RW_VERSION; @@ -115,8 +115,6 @@ pub struct MetaClient { } impl MetaClient { - const META_ADDRESS_LOAD_BALANCE_MODE_PREFIX: &'static str = "load-balance+"; - pub fn worker_id(&self) -> u32 { self.worker_id } @@ -183,53 +181,14 @@ impl MetaClient { Ok(resp.version) } - pub(crate) fn parse_meta_addr(meta_addr: &str) -> Result { - if meta_addr.starts_with(Self::META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) { - let addr = meta_addr - .strip_prefix(Self::META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) - .unwrap(); - - let addr = addr.split(',').exactly_one().map_err(|_| { - RpcError::Internal(anyhow!( - "meta address {} in load-balance mode should be exactly one", - addr - )) - })?; - - let _url = url::Url::parse(addr).map_err(|e| { - RpcError::Internal(anyhow!("could not parse meta address {}, {}", addr, e)) - })?; - - Ok(MetaAddressStrategy::LoadBalance(addr.to_string())) - } else { - let addrs: Vec<_> = meta_addr.split(',').map(str::to_string).collect(); - - if addrs.is_empty() { - return Err(RpcError::Internal(anyhow!( - "empty meta addresses {:?}", - addrs - ))); - } - - for addr in &addrs { - let _url = url::Url::parse(addr).map_err(|e| { - RpcError::Internal(anyhow!("could not parse meta address {}, {}", addr, e)) - })?; - } - - Ok(MetaAddressStrategy::List(addrs)) - } - } - /// Register the current node to the cluster and set the corresponding worker id. pub async fn register_new( - meta_addr: &str, + addr_strategy: MetaAddressStrategy, worker_type: WorkerType, addr: &HostAddr, property: Property, meta_config: &MetaConfig, ) -> Result<(Self, SystemParamsReader)> { - let addr_strategy = Self::parse_meta_addr(meta_addr)?; tracing::info!("register meta client using strategy: {}", addr_strategy); // Retry until reaching `max_heartbeat_interval_secs` @@ -1546,44 +1505,26 @@ struct GrpcMetaClient { core: Arc>, } -#[derive(Debug, Eq, PartialEq)] -pub enum MetaAddressStrategy { - LoadBalance(String), - List(Vec), -} - -impl fmt::Display for MetaAddressStrategy { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - MetaAddressStrategy::LoadBalance(addr) => { - write!(f, "LoadBalance({})", addr)?; - } - MetaAddressStrategy::List(addrs) => { - write!(f, "List({:?})", addrs)?; - } - } - Ok(()) - } -} - type MetaMemberClient = MetaMemberServiceClient; struct MetaMemberGroup { - members: LruCache>, + members: LruCache>, } struct MetaMemberManagement { core_ref: Arc>, members: Either, - current_leader: String, + current_leader: http::Uri, meta_config: MetaConfig, } impl MetaMemberManagement { const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5); - fn host_address_to_url(addr: HostAddress) -> String { + fn host_address_to_uri(addr: HostAddress) -> http::Uri { format!("http://{}:{}", addr.host, addr.port) + .parse() + .unwrap() } async fn recreate_core(&self, channel: Channel) { @@ -1606,7 +1547,7 @@ impl MetaMemberManagement { match client { Some(cached_client) => cached_client.to_owned(), None => { - let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone())?; + let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone()); let channel = GrpcMetaClient::connect_to_endpoint(endpoint).await?; let new_client: MetaMemberClient = MetaMemberServiceClient::new(channel); @@ -1642,7 +1583,7 @@ impl MetaMemberManagement { leader = Some(member.clone()); } - let addr = Self::host_address_to_url(member.address.unwrap()); + let addr = Self::host_address_to_uri(member.address.unwrap()); // We don't clean any expired addrs here to deal with some extreme situations. if !member_group.members.contains(&addr) { tracing::info!("new meta member joined: {}", addr); @@ -1655,7 +1596,7 @@ impl MetaMemberManagement { }; if let Some(leader) = leader_addr { - let discovered_leader = Self::host_address_to_url(leader.address.unwrap()); + let discovered_leader = Self::host_address_to_uri(leader.address.unwrap()); if discovered_leader != self.current_leader { tracing::info!("new meta leader {} discovered", discovered_leader); @@ -1666,7 +1607,7 @@ impl MetaMemberManagement { ); let channel = tokio_retry::Retry::spawn(retry_strategy, || async { - let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone())?; + let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone()); GrpcMetaClient::connect_to_endpoint(endpoint).await }) .await?; @@ -1692,7 +1633,7 @@ impl GrpcMetaClient { fn start_meta_member_monitor( &self, - init_leader_addr: String, + init_leader_addr: http::Uri, members: Either, force_refresh_receiver: Receiver>>, meta_config: MetaConfig, @@ -1798,16 +1739,17 @@ impl GrpcMetaClient { Ok(client) } - fn addr_to_endpoint(addr: String) -> Result { - let endpoint = Endpoint::from_shared(addr)?; - Ok(endpoint.initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)) + fn addr_to_endpoint(addr: http::Uri) -> Endpoint { + Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) } - pub(crate) async fn try_build_rpc_channel(addrs: Vec) -> Result<(Channel, String)> { + pub(crate) async fn try_build_rpc_channel( + addrs: impl IntoIterator, + ) -> Result<(Channel, http::Uri)> { let endpoints: Vec<_> = addrs .into_iter() - .map(|addr| Self::addr_to_endpoint(addr.clone()).map(|endpoint| (endpoint, addr))) - .try_collect()?; + .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr)) + .collect(); let endpoints = endpoints.clone(); @@ -2001,41 +1943,3 @@ impl GrpcMetaClient { impl GrpcMetaClient { for_all_meta_rpc! { meta_rpc_client_method_impl } } - -#[cfg(test)] -mod tests { - use crate::meta_client::MetaAddressStrategy; - use crate::MetaClient; - - #[test] - fn test_parse_meta_addr() { - let results = vec![ - ( - "load-balance+http://abc", - Some(MetaAddressStrategy::LoadBalance("http://abc".to_string())), - ), - ("load-balance+http://abc,http://def", None), - ("load-balance+http://abc:xxx", None), - ("", None), - ( - "http://abc,http://def", - Some(MetaAddressStrategy::List(vec![ - "http://abc".to_string(), - "http://def".to_string(), - ])), - ), - ("http://abc:xx,http://def", None), - ]; - for (addr, result) in results { - let parsed_result = MetaClient::parse_meta_addr(addr); - match result { - None => { - assert!(parsed_result.is_err()); - } - Some(strategy) => { - assert_eq!(strategy, parsed_result.unwrap()) - } - } - } - } -} diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 83b288b08a34c..beb00d6728d69 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -21,6 +21,7 @@ use clap::Parser; use risingwave_common::config::{ AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig, }; +use risingwave_common::util::meta_addr::MetaAddressStrategy; use crate::server::{compactor_serve, shared_compactor_serve}; @@ -56,7 +57,7 @@ pub struct CompactorOpts { pub prometheus_listener_addr: String, #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] - pub meta_address: String, + pub meta_address: MetaAddressStrategy, #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")] pub compaction_worker_threads_number: Option, diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 274d793ccf63a..39b48514171ba 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -184,7 +184,7 @@ pub async fn compactor_serve( // Register to the cluster. let (meta_client, system_params_reader) = MetaClient::register_new( - &opts.meta_address, + opts.meta_address, WorkerType::Compactor, &advertise_addr, Default::default(), diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 813e773fd17bf..0a59d51512f6c 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -237,7 +237,7 @@ async fn init_metadata_for_replay( tracing::info!("Ctrl+C received, now exiting"); std::process::exit(0); }, - ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, advertise_addr, Default::default(), &meta_config) => { + ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), &meta_config) => { (meta_client, _) = ret.unwrap(); }, } @@ -248,7 +248,7 @@ async fn init_metadata_for_replay( let tables = meta_client.risectl_list_state_tables().await?; let (new_meta_client, _) = MetaClient::register_new( - new_meta_endpoint, + new_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), @@ -280,7 +280,7 @@ async fn pull_version_deltas( // Register to the cluster. // We reuse the RiseCtl worker type here let (meta_client, _) = MetaClient::register_new( - cluster_meta_endpoint, + cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), @@ -329,7 +329,7 @@ async fn start_replay( // Register to the cluster. // We reuse the RiseCtl worker type here let (meta_client, system_params) = MetaClient::register_new( - &opts.meta_address, + opts.meta_address.parse()?, WorkerType::RiseCtl, &advertise_addr, Default::default(),