Skip to content

Commit

Permalink
refactor: move meta addr strategy to common (#14270)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Dec 29, 2023
1 parent 3ca10ce commit 375aa80
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 131 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ mod test {
listen_addr: "127.0.0.1:8000",
advertise_addr: None,
prometheus_listener_addr: "127.0.0.1:1234",
meta_address: "http://127.0.0.1:5690",
meta_address: List(
[
http://127.0.0.1:5690/,
],
),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
Expand All @@ -281,7 +285,11 @@ mod test {
listen_addr: "127.0.0.1:4566",
advertise_addr: None,
port: None,
meta_addr: "http://127.0.0.1:5690",
meta_addr: List(
[
http://127.0.0.1:5690/,
],
),
prometheus_listener_addr: "127.0.0.1:1234",
health_check_listener_addr: "127.0.0.1:6786",
config_path: "src/config/test.toml",
Expand Down
126 changes: 126 additions & 0 deletions src/common/src/util/meta_addr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Formatter};
use std::str::FromStr;

use itertools::Itertools;

const META_ADDRESS_LOAD_BALANCE_MODE_PREFIX: &str = "load-balance+";

/// The strategy for meta client to connect to meta node.
///
/// Used in the command line argument `--meta-address`.
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum MetaAddressStrategy {
LoadBalance(http::Uri),
List(Vec<http::Uri>),
}

/// Error type for parsing meta address strategy.
#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)]
pub enum MetaAddressStrategyParseError {
#[error("empty meta addresses")]
Empty,
#[error("there should be only one load balance address")]
MultipleLoadBalance,
#[error("failed to parse meta address `{1}`: {0}")]
UrlParse(#[source] http::uri::InvalidUri, String),
}

impl FromStr for MetaAddressStrategy {
type Err = MetaAddressStrategyParseError;

fn from_str(meta_addr: &str) -> Result<Self, Self::Err> {
if let Some(addr) = meta_addr.strip_prefix(META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) {
let addr = addr
.split(',')
.exactly_one()
.map_err(|_| MetaAddressStrategyParseError::MultipleLoadBalance)?;

let uri = addr.parse().into_url_parse(addr)?;

Ok(Self::LoadBalance(uri))
} else {
let addrs = meta_addr.split(',').peekable();

let uris: Vec<_> = addrs
.map(|addr| addr.parse().into_url_parse(addr))
.try_collect()?;

if uris.is_empty() {
return Err(MetaAddressStrategyParseError::Empty);
}

Ok(Self::List(uris))
}
}
}

impl fmt::Display for MetaAddressStrategy {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
MetaAddressStrategy::LoadBalance(addr) => {
write!(f, "{}{}", META_ADDRESS_LOAD_BALANCE_MODE_PREFIX, addr)?;
}
MetaAddressStrategy::List(addrs) => {
write!(f, "{}", addrs.iter().format(","))?;
}
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_meta_addr() {
let results = vec![
(
"load-balance+http://abc",
Some(MetaAddressStrategy::LoadBalance(
"http://abc".parse().unwrap(),
)),
),
("load-balance+http://abc,http://def", None),
("", None),
(
"http://abc",
Some(MetaAddressStrategy::List(vec!["http://abc"
.parse()
.unwrap()])),
),
(
"http://abc,http://def",
Some(MetaAddressStrategy::List(vec![
"http://abc".parse().unwrap(),
"http://def".parse().unwrap(),
])),
),
];
for (addr, result) in results {
let parsed_result = addr.parse();
match result {
None => {
assert!(parsed_result.is_err(), "{parsed_result:?}");
}
Some(strategy) => {
assert_eq!(strategy, parsed_result.unwrap());
}
}
}
}
}
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod future_utils;
pub mod hash_util;
pub mod iter_util;
pub mod memcmp_encoding;
pub mod meta_addr;
pub mod panic;
pub mod pretty_bytes;
pub mod prost;
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::pin::Pin;

use clap::{Parser, ValueEnum};
use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -71,7 +72,7 @@ pub struct ComputeNodeOpts {
pub prometheus_listener_addr: String,

#[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
pub meta_address: String,
pub meta_address: MetaAddressStrategy,

/// Endpoint of the connector node
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn compute_node_serve(

// Register to the cluster. We're not ready to serve until activate is called.
let (meta_client, system_params) = MetaClient::register_new(
&opts.meta_address,
opts.meta_address,
WorkerType::ComputeNode,
&advertise_addr,
Property {
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'.";
/// Create meta client from options, and register as rise-ctl worker
pub async fn create_meta_client(&self) -> Result<MetaClient> {
let (client, _) = MetaClient::register_new(
&self.meta_addr,
self.meta_addr.parse()?,
WorkerType::RiseCtl,
&get_new_ctl_identity(),
Property::default(),
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mod scheduler;
pub mod session;
mod stream_fragmenter;
use risingwave_common::config::{MetricLevel, OverrideConfig};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
pub use stream_fragmenter::build_graph;
mod utils;
pub use utils::{explain_stream_graph, WithOptions};
Expand Down Expand Up @@ -101,7 +102,7 @@ pub struct FrontendOpts {

/// The address via which we will attempt to connect to a leader meta node.
#[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
pub meta_addr: String,
pub meta_addr: MetaAddressStrategy,

#[clap(
long,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl FrontendEnv {

// Register in meta by calling `AddWorkerNode` RPC.
let (meta_client, system_params_reader) = MetaClient::register_new(
opts.meta_addr.clone().as_str(),
opts.meta_addr,
WorkerType::Frontend,
&frontend_address,
Default::default(),
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-trait = "0.1"
easy-ext = "1"
either = "1.9.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
hyper = "0.14"
itertools = "0.12.0"
lru = "0.10.1"
Expand All @@ -30,6 +31,7 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
7 changes: 6 additions & 1 deletion src/rpc_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::util::meta_addr::MetaAddressStrategyParseError;
use thiserror::Error;

pub type Result<T, E = RpcError> = std::result::Result<T, E>;
Expand All @@ -28,6 +29,9 @@ pub enum RpcError {
#[error(transparent)]
GrpcStatus(Box<TonicStatusWrapper>),

#[error(transparent)]
MetaAddressParse(#[from] MetaAddressStrategyParseError),

#[error(transparent)]
Internal(
#[from]
Expand All @@ -36,7 +40,8 @@ pub enum RpcError {
),
}

static_assertions::const_assert_eq!(std::mem::size_of::<RpcError>(), 16);
// TODO: use `thiserror_ext::Box`
static_assertions::const_assert_eq!(std::mem::size_of::<RpcError>(), 32);

impl From<tonic::transport::Error> for RpcError {
fn from(e: tonic::transport::Error) -> Self {
Expand Down
Loading

0 comments on commit 375aa80

Please sign in to comment.