Skip to content

Commit

Permalink
use meta address strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Dec 29, 2023
1 parent ecd8912 commit 5319511
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 18 deletions.
5 changes: 3 additions & 2 deletions src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::LazyLock;

use anyhow::Result;
use clap::Parser;
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use tempfile::TempPath;
use tokio::signal;

Expand Down Expand Up @@ -148,8 +149,8 @@ impl risingwave_common::opts::Opts for PlaygroundOpts {
"playground"
}

fn meta_addr(&self) -> &str {
"0.0.0.0:5690" // hard-coded
fn meta_addr(&self) -> MetaAddressStrategy {
"http://0.0.0.0:5690".parse().unwrap() // hard-coded
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::Result;
use clap::Parser;
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
Expand Down Expand Up @@ -71,7 +72,7 @@ impl risingwave_common::opts::Opts for ParsedStandaloneOpts {
"standalone"
}

fn meta_addr(&self) -> &str {
fn meta_addr(&self) -> MetaAddressStrategy {
if let Some(opts) = self.meta_opts.as_ref() {
opts.meta_addr()
} else if let Some(opts) = self.compute_opts.as_ref() {
Expand All @@ -81,7 +82,7 @@ impl risingwave_common::opts::Opts for ParsedStandaloneOpts {
} else if let Some(opts) = self.compactor_opts.as_ref() {
opts.meta_addr()
} else {
""
unreachable!("at least one service should be specified as checked during parsing")
}
}
}
Expand Down Expand Up @@ -143,6 +144,15 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
meta_opts.prometheus_host = Some(prometheus_listener_addr.clone());
}
}

if meta_opts.is_none()
&& compute_opts.is_none()
&& frontend_opts.is_none()
&& compactor_opts.is_none()
{
panic!("No service is specified to start.");
}

ParsedStandaloneOpts {
meta_opts,
compute_opts,
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::util::meta_addr::MetaAddressStrategy;

/// Accessor trait for a component's command-line options.
pub trait Opts {
/// The name of the component.
fn name() -> &'static str;

/// The address to the meta node.
///
/// Should include the port and not start with the protocol like `http://`.
fn meta_addr(&self) -> &str;
fn meta_addr(&self) -> MetaAddressStrategy;
}
16 changes: 16 additions & 0 deletions src/common/src/util/meta_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ impl fmt::Display for MetaAddressStrategy {
}
}

impl MetaAddressStrategy {
/// Returns `Some` if there's exactly one address.
pub fn exactly_one(&self) -> Option<&http::Uri> {
match self {
MetaAddressStrategy::LoadBalance(lb) => Some(lb),
MetaAddressStrategy::List(list) => {
if list.len() == 1 {
list.first()
} else {
None
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ impl risingwave_common::opts::Opts for ComputeNodeOpts {
"compute"
}

fn meta_addr(&self) -> &str {
self.meta_address.trim_start_matches("http://")
fn meta_addr(&self) -> MetaAddressStrategy {
self.meta_address.clone()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ impl risingwave_common::opts::Opts for FrontendOpts {
"frontend"
}

fn meta_addr(&self) -> &str {
self.meta_addr.trim_start_matches("http://")
fn meta_addr(&self) -> MetaAddressStrategy {
self.meta_addr.clone()
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use clap::Parser;
pub use error::{MetaError, MetaResult};
use redact::Secret;
use risingwave_common::config::OverrideConfig;
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
Expand All @@ -43,6 +44,7 @@ pub struct MetaNodeOpts {
#[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")]
security_group_id: Option<String>,

// TODO: use `SocketAddr`
#[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
pub listen_addr: String,

Expand Down Expand Up @@ -169,8 +171,10 @@ impl risingwave_common::opts::Opts for MetaNodeOpts {
"meta"
}

fn meta_addr(&self) -> &str {
&self.listen_addr
fn meta_addr(&self) -> MetaAddressStrategy {
format!("http://{}", self.listen_addr)
.parse()
.expect("invalid listen address")
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl risingwave_common::opts::Opts for CompactorOpts {
"compactor"
}

fn meta_addr(&self) -> &str {
self.meta_address.trim_start_matches("http://")
fn meta_addr(&self) -> MetaAddressStrategy {
self.meta_address.clone()
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/utils/runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter};

pub struct LoggerSettings {
/// The name of the service.
/// The name of the service. Used to identify the service in distributed tracing.
name: String,
/// Enable tokio console output.
enable_tokio_console: bool,
Expand Down Expand Up @@ -63,12 +63,14 @@ impl LoggerSettings {
pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
let mut settings = Self::new(O::name());
if settings.tracing_endpoint.is_none() // no explicit env var is set
&& !opts.meta_addr().is_empty() // meta address is valid
&& let Some(addr) = opts.meta_addr().exactly_one() // meta address is valid
&& !Deployment::current().is_ci()
// not in CI
{
// Use embedded collector in the meta service.
settings.tracing_endpoint = Some(format!("http://{}", opts.meta_addr()));
// TODO: when there's multiple meta nodes for high availability, we may send
// to a wrong node here.
settings.tracing_endpoint = Some(addr.to_string());
}
settings
}
Expand Down

0 comments on commit 5319511

Please sign in to comment.