Skip to content

Commit

Permalink
feat: add tls support for grpc service
Browse files Browse the repository at this point in the history
  • Loading branch information
poltao committed May 16, 2024
1 parent 9f4a6c6 commit a4bca94
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 14 deletions.
10 changes: 9 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -132,6 +133,12 @@ struct StartCommand {
http_timeout: Option<u64>,
#[clap(long, default_value = "GREPTIMEDB_DATANODE")]
env_prefix: String,
#[clap(long)]
tls_mode: Option<TlsMode>,
#[clap(long)]
tls_cert_path: Option<String>,
#[clap(long)]
tls_key_path: Option<String>,
}

impl StartCommand {
Expand Down Expand Up @@ -220,6 +227,7 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout)
}

opts.tls = TlsOption::new(self.tls_mode.clone(), self.tls_cert_path.clone(), self.tls_key_path.clone());
// Disable dashboard in datanode.
opts.http.disable_dashboard = true;

Expand Down Expand Up @@ -258,7 +266,7 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;

let services = DatanodeServiceBuilder::new(&opts)
.with_default_grpc_server(&datanode.region_server())
.with_default_grpc_server(&datanode.region_server()).context(StartDatanodeSnafu)?
.enable_http_service()
.build()
.await
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl StartCommand {

if let Some(addr) = &self.rpc_addr {
opts.grpc.addr.clone_from(addr);
opts.grpc.tls = tls_opts.clone();
}

if let Some(addr) = &self.mysql_addr {
Expand Down
3 changes: 3 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::tls::TlsOption;
use servers::Mode;

pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256);
Expand Down Expand Up @@ -236,6 +237,7 @@ pub struct DatanodeOptions {
pub enable_telemetry: bool,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub tls: TlsOption,
}

impl Default for DatanodeOptions {
Expand Down Expand Up @@ -263,6 +265,7 @@ impl Default for DatanodeOptions {
enable_telemetry: true,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
tls: TlsOption::default(),
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ pub enum Error {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},

#[snafu(display("Invalid tls config"))]
InvalidTlsConfig {
#[snafu(source)]
error: common_grpc::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -401,6 +409,7 @@ impl ErrorExt for Error {
| MissingWalDirConfig { .. }
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
| InvalidTlsConfig { .. } => StatusCode::InvalidArguments,

PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
StatusCode::Unexpected
Expand Down
20 changes: 12 additions & 8 deletions src/datanode/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use servers::server::{ServerHandler, ServerHandlers};
use snafu::ResultExt;

use crate::config::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result, TomlFormatSnafu};
use crate::error::{InvalidTlsConfigSnafu, ParseAddrSnafu, Result, TomlFormatSnafu};
use crate::region_server::RegionServer;

pub struct DatanodeServiceBuilder<'a> {
Expand All @@ -49,10 +49,10 @@ impl<'a> DatanodeServiceBuilder<'a> {
}
}

pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Self {
let grpc_server = Self::grpc_server_builder(self.opts, region_server).build();
self.grpc_server = Some(grpc_server);
self
pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Result<Self> {
let grpc_server_build = Self::grpc_server_builder(self.opts, region_server)?;
self.grpc_server = Some(grpc_server_build.build());
Ok(self)
}

pub fn enable_http_service(self) -> Self {
Expand Down Expand Up @@ -91,14 +91,18 @@ impl<'a> DatanodeServiceBuilder<'a> {
pub fn grpc_server_builder(
opts: &DatanodeOptions,
region_server: &RegionServer,
) -> GrpcServerBuilder {
) -> Result<GrpcServerBuilder> {
let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
tls: opts.tls.clone(),
};

GrpcServerBuilder::new(config, region_server.runtime())
let build = GrpcServerBuilder::new(config, region_server.runtime())
.with_tls_config(opts.tls.clone())
.context(InvalidTlsConfigSnafu)?
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()));
Ok(build)
}
}
11 changes: 10 additions & 1 deletion src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ pub enum Error {
location: Location,
name: String,
},

#[snafu(display("Invalid tls config"))]
InvalidTlsConfig {
#[snafu(source)]
error: common_grpc::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -374,7 +382,8 @@ impl ErrorExt for Error {
| Error::IllegalAuthConfig { .. }
| Error::EmptyData { .. }
| Error::ColumnNoneDefaultValue { .. }
| Error::IncompleteGrpcRequest { .. } => StatusCode::InvalidArguments,
| Error::IncompleteGrpcRequest { .. }
| Error::InvalidTlsConfig { .. } => StatusCode::InvalidArguments,

Error::NotSupported { .. } => StatusCode::Unsupported,

Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ where
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
tls: opts.tls.clone(),
};

Ok(GrpcServerBuilder::new(grpc_config, grpc_runtime))
let mut builder = GrpcServerBuilder::new(grpc_config, grpc_runtime);
builder = builder
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;
Ok(builder)
}

pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/service_config/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use serde::{Deserialize, Serialize};
use servers::tls::TlsOption;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct GrpcOptions {
Expand All @@ -26,6 +27,8 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
}

impl Default for GrpcOptions {
Expand All @@ -35,6 +38,7 @@ impl Default for GrpcOptions {
runtime_size: 8,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
tls: TlsOption::default(),
}
}
}
13 changes: 11 additions & 2 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tonic::transport::server::{Routes, TcpIncoming};
use tonic::transport::ServerTlsConfig;
use tonic::{Request, Response, Status};
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};

Expand All @@ -44,6 +45,7 @@ use crate::error::{
};
use crate::metrics::MetricsMiddlewareLayer;
use crate::server::Server;
use crate::tls::TlsOption;

type TonicResult<T> = std::result::Result<T, Status>;

Expand All @@ -55,6 +57,8 @@ pub struct GrpcServer {
serve_state: Mutex<Option<Receiver<Result<()>>>>,
// handlers
routes: Mutex<Option<Routes>>,
// tls config
tls_config: Option<ServerTlsConfig>,
}

/// Grpc Server configuration
Expand All @@ -64,13 +68,15 @@ pub struct GrpcServerConfig {
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
pub tls: TlsOption,
}

impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
tls: TlsOption::default(),
}
}
}
Expand Down Expand Up @@ -172,8 +178,11 @@ impl Server for GrpcServer {
.layer(MetricsMiddlewareLayer)
.into_inner();

let builder = tonic::transport::Server::builder()
.layer(metrics_layer)
let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
if let Some(tls_config) = self.tls_config.clone() {
builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
}
let builder = builder
.add_routes(routes)
.add_service(self.create_healthcheck_service())
.add_service(self.create_reflection_service());
Expand Down
23 changes: 23 additions & 0 deletions src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use auth::UserProviderRef;
use common_grpc::error::{InvalidConfigFilePathSnafu, Result};
use common_runtime::Runtime;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use snafu::ResultExt;
use tokio::sync::Mutex;
use tonic::transport::server::RoutesBuilder;
use tonic::transport::{Identity, ServerTlsConfig};
use tower::ServiceBuilder;

use super::flight::{FlightCraftRef, FlightCraftWrapper};
Expand All @@ -36,6 +39,7 @@ use crate::grpc::otlp::OtlpService;
use crate::grpc::prom_query_gateway::PrometheusGatewayService;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::tls::{TlsMode, TlsOption};

/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
/// This macro will automatically add some gRPC properties to the service.
Expand All @@ -57,6 +61,7 @@ pub struct GrpcServerBuilder {
config: GrpcServerConfig,
runtime: Arc<Runtime>,
routes_builder: RoutesBuilder,
tls_config: Option<ServerTlsConfig>,
}

impl GrpcServerBuilder {
Expand All @@ -65,6 +70,7 @@ impl GrpcServerBuilder {
config,
runtime,
routes_builder: RoutesBuilder::default(),
tls_config: None,
}
}

Expand Down Expand Up @@ -142,11 +148,28 @@ impl GrpcServerBuilder {
&mut self.routes_builder
}

pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
let tls_config = match tls_option.mode {
TlsMode::Require => {
let cert = std::fs::read_to_string(tls_option.cert_path)
.context(InvalidConfigFilePathSnafu)?;
let key = std::fs::read_to_string(tls_option.key_path)
.context(InvalidConfigFilePathSnafu)?;
let identity = Identity::from_pem(cert, key);
Some(ServerTlsConfig::new().identity(identity))
}
_ => None,
};
self.tls_config = tls_config;
Ok(self)
}

pub fn build(self) -> GrpcServer {
GrpcServer {
routes: Mutex::new(Some(self.routes_builder.routes())),
shutdown_tx: Mutex::new(None),
serve_state: Mutex::new(None),
tls_config: self.tls_config,
}
}
}
5 changes: 5 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use servers::http::prometheus::{
PrometheusResponse,
};
use servers::server::Server;
use servers::tls::TlsOption;
use tests_integration::database::Database;
use tests_integration::test_util::{
setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType,
Expand Down Expand Up @@ -128,6 +129,7 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
tls: TlsOption::default(),
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;
Expand All @@ -146,6 +148,7 @@ pub async fn test_grpc_message_size_limit_send(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 50,
tls: TlsOption::default(),
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;
Expand All @@ -165,6 +168,7 @@ pub async fn test_grpc_message_size_limit_recv(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 10,
max_send_message_size: 1024,
tls: TlsOption::default(),
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;
Expand Down Expand Up @@ -643,6 +647,7 @@ pub async fn test_grpc_timezone(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
tls: TlsOption::default(),
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;
Expand Down

0 comments on commit a4bca94

Please sign in to comment.