diff --git a/Cargo.lock b/Cargo.lock index 95d83794211c..b3a760815161 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9664,6 +9664,7 @@ dependencies = [ "tokio-rustls 0.25.0", "tokio-stream", "tokio-test", + "tokio-util", "tonic 0.11.0", "tonic-reflection", "tower", diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e1ac35c98b06..9216dc35ea9d 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -454,9 +454,11 @@ impl StartCommand { ); let flownode = Arc::new(flow_builder.build().await); - let builder = - DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); - let datanode = builder.build().await.context(StartDatanodeSnafu)?; + let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone()) + .with_kv_backend(kv_backend.clone()) + .build() + .await + .context(StartDatanodeSnafu)?; let node_manager = Arc::new(StandaloneDatanodeManager { region_server: datanode.region_server(), diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 268bc7db4ae6..a72269f863e6 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; -use common_config::Configurable; +use common_config::{Configurable, Mode}; use common_runtime::Builder as RuntimeBuilder; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; @@ -140,11 +140,15 @@ where }; let user_provider = self.plugins.get::(); + let runtime = match opts.mode { + Mode::Standalone => Some(builder.runtime().clone()), + _ => None, + }; let greptime_request_handler = GreptimeRequestHandler::new( ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), user_provider.clone(), - builder.runtime().clone(), + runtime, ); let grpc_server = builder diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 369476e6e5ba..b30426d2e7ac 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -98,6 +98,7 @@ table.workspace = true tokio.workspace = true tokio-rustls = "0.25" tokio-stream = { workspace = true, features = ["net"] } +tokio-util.workspace = true tonic.workspace = true tonic-reflection = "0.11" tower = { workspace = true, features = ["full"] } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index aaf64511bd63..eac2d874159c 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -14,6 +14,7 @@ mod authorize; pub mod builder; +mod cancellation; mod database; pub mod flight; pub mod greptime_handler; diff --git a/src/servers/src/grpc/cancellation.rs b/src/servers/src/grpc/cancellation.rs new file mode 100644 index 000000000000..9c66ff81a248 --- /dev/null +++ b/src/servers/src/grpc/cancellation.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; + +use tokio::select; +use tokio_util::sync::CancellationToken; + +type Result = std::result::Result, tonic::Status>; + +pub(crate) async fn with_cancellation_handler( + request: Request, + cancellation: Cancellation, +) -> Result +where + Request: Future> + Send + 'static, + Cancellation: Future> + Send + 'static, + Response: Send + 'static, +{ + let token = CancellationToken::new(); + // Will call token.cancel() when the future is dropped, such as when the client cancels the request + let _drop_guard = token.clone().drop_guard(); + let select_task = tokio::spawn(async move { + // Can select on token cancellation on any cancellable future while handling the request, + // allowing for custom cleanup code or monitoring + select! { + res = request => res, + _ = token.cancelled() => cancellation.await, + } + }); + + select_task.await.unwrap() +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::sync::mpsc; + use tokio::time; + use tonic::Response; + + use super::*; + + #[tokio::test] + async fn test_request_completes_first() { + let request = async { Ok(Response::new("Request Completed")) }; + + let cancellation = async { + time::sleep(Duration::from_secs(1)).await; + Ok(Response::new("Cancelled")) + }; + + let result = with_cancellation_handler(request, cancellation).await; + assert_eq!(result.unwrap().into_inner(), "Request Completed"); + } + + #[tokio::test] + async fn test_cancellation_when_dropped() { + let (tx, mut rx) = mpsc::channel(2); + let tx_cloned = tx.clone(); + let request = async move { + time::sleep(Duration::from_secs(1)).await; + tx_cloned.send("Request Completed").await.unwrap(); + Ok(Response::new("Completed")) + }; + let cancellation = async move { + tx.send("Request Cancelled").await.unwrap(); + Ok(Response::new("Cancelled")) + }; + + let response_future = with_cancellation_handler(request, cancellation); + // It will drop the `response_future` and then call the `cancellation` future + let result = time::timeout(Duration::from_millis(50), response_future).await; + + assert!(result.is_err(), "Expected timeout error"); + assert_eq!("Request Cancelled", rx.recv().await.unwrap()) + } +} diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index 3e242fde1152..f8c9e298d4b5 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -18,11 +18,12 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader}; use async_trait::async_trait; use common_error::status_code::StatusCode; use common_query::OutputData; +use common_telemetry::warn; use futures::StreamExt; use tonic::{Request, Response, Status, Streaming}; use crate::grpc::greptime_handler::GreptimeRequestHandler; -use crate::grpc::TonicResult; +use crate::grpc::{cancellation, TonicResult}; pub(crate) struct DatabaseService { handler: GreptimeRequestHandler, @@ -40,55 +41,91 @@ impl GreptimeDatabase for DatabaseService { &self, request: Request, ) -> TonicResult> { - let request = request.into_inner(); - let output = self.handler.handle_request(request).await?; - let message = match output.data { - OutputData::AffectedRows(rows) => GreptimeResponse { - header: Some(ResponseHeader { - status: Some(api::v1::Status { - status_code: StatusCode::Success as _, - ..Default::default() + let remote_addr = request.remote_addr(); + let handler = self.handler.clone(); + let request_future = async move { + let request = request.into_inner(); + let output = handler.handle_request(request).await?; + let message = match output.data { + OutputData::AffectedRows(rows) => GreptimeResponse { + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), }), - }), - response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), - }, - OutputData::Stream(_) | OutputData::RecordBatches(_) => { - return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); - } + response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), + }, + OutputData::Stream(_) | OutputData::RecordBatches(_) => { + return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); + } + }; + + Ok(Response::new(message)) + }; + + let cancellation_future = async move { + warn!( + "GreptimeDatabase::Handle: request from {:?} cancelled by client", + remote_addr + ); + // If this future is executed it means the request future was dropped, + // so it doesn't actually matter what is returned here + Err(Status::cancelled( + "GreptimeDatabase::Handle: request cancelled by client", + )) }; - Ok(Response::new(message)) + cancellation::with_cancellation_handler(request_future, cancellation_future).await } async fn handle_requests( &self, request: Request>, ) -> Result, Status> { - let mut affected_rows = 0; + let remote_addr = request.remote_addr(); + let handler = self.handler.clone(); + let request_future = async move { + let mut affected_rows = 0; - let mut stream = request.into_inner(); - while let Some(request) = stream.next().await { - let request = request?; - let output = self.handler.handle_request(request).await?; - match output.data { - OutputData::AffectedRows(rows) => affected_rows += rows, - OutputData::Stream(_) | OutputData::RecordBatches(_) => { - return Err(Status::unimplemented( - "GreptimeDatabase::HandleRequests for query", - )); + let mut stream = request.into_inner(); + while let Some(request) = stream.next().await { + let request = request?; + let output = handler.handle_request(request).await?; + match output.data { + OutputData::AffectedRows(rows) => affected_rows += rows, + OutputData::Stream(_) | OutputData::RecordBatches(_) => { + return Err(Status::unimplemented( + "GreptimeDatabase::HandleRequests for query", + )); + } } } - } - let message = GreptimeResponse { - header: Some(ResponseHeader { - status: Some(api::v1::Status { - status_code: StatusCode::Success as _, - ..Default::default() + let message = GreptimeResponse { + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), }), - }), - response: Some(RawResponse::AffectedRows(AffectedRows { - value: affected_rows as u32, - })), + response: Some(RawResponse::AffectedRows(AffectedRows { + value: affected_rows as u32, + })), + }; + + Ok(Response::new(message)) + }; + + let cancellation_future = async move { + warn!( + "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client", + remote_addr + ); + // If this future is executed it means the request future was dropped, + // so it doesn't actually matter what is returned here + Err(Status::cancelled( + "GreptimeDatabase::HandleRequests: request cancelled by client", + )) }; - Ok(Response::new(message)) + cancellation::with_cancellation_handler(request_future, cancellation_future).await } } diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 081e180ebf61..b4e9f26f5b86 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -42,14 +42,14 @@ use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; pub struct GreptimeRequestHandler { handler: ServerGrpcQueryHandlerRef, user_provider: Option, - runtime: Arc, + runtime: Option>, } impl GreptimeRequestHandler { pub fn new( handler: ServerGrpcQueryHandlerRef, user_provider: Option, - runtime: Arc, + runtime: Option>, ) -> Self { Self { handler, @@ -73,16 +73,9 @@ impl GreptimeRequestHandler { let request_type = request_type(&query).to_string(); let db = query_ctx.get_db_string(); let timer = RequestTimer::new(db.clone(), request_type); - - // Executes requests in another runtime to - // 1. prevent the execution from being cancelled unexpected by Tonic runtime; - // - Refer to our blog for the rational behind it: - // https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html - // - Obtaining a `JoinHandle` to get the panic message (if there's any). - // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. - // 2. avoid the handler blocks the gRPC runtime incidentally. let tracing_context = TracingContext::from_current_span(); - let handle = self.runtime.spawn(async move { + + let result_future = async move { handler .do_query(query, query_ctx) .trace(tracing_context.attach(tracing::info_span!( @@ -98,12 +91,28 @@ impl GreptimeRequestHandler { } e }) - }); - - handle.await.context(JoinTaskSnafu).map_err(|e| { - timer.record(e.status_code()); - e - })? + }; + + match &self.runtime { + Some(runtime) => { + // Executes requests in another runtime to + // 1. prevent the execution from being cancelled unexpected by Tonic runtime; + // - Refer to our blog for the rational behind it: + // https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html + // - Obtaining a `JoinHandle` to get the panic message (if there's any). + // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. + // 2. avoid the handler blocks the gRPC runtime incidentally. + runtime + .spawn(result_future) + .await + .context(JoinTaskSnafu) + .map_err(|e| { + timer.record(e.status_code()); + e + })? + } + None => result_future.await, + } } } diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index 30c6eb97cbe0..e25ee209f0f5 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -21,12 +21,12 @@ use common_error::ext::ErrorExt; use common_runtime::Runtime; use common_telemetry::tracing::info_span; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, warn}; use snafu::{OptionExt, ResultExt}; -use tonic::{Request, Response}; +use tonic::{Request, Response, Status}; use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result}; -use crate::grpc::TonicResult; +use crate::grpc::{cancellation, TonicResult}; #[async_trait] pub trait RegionServerHandler: Send + Sync { @@ -94,8 +94,21 @@ impl RegionServer for RegionServerRequestHandler { &self, request: Request, ) -> TonicResult> { - let request = request.into_inner(); - let response = self.handle(request).await?; - Ok(Response::new(response)) + let remote_addr = request.remote_addr(); + let self_cloned = self.clone(); + let request_future = async move { + let request = request.into_inner(); + let response = self_cloned.handle(request).await?; + + Ok(Response::new(response)) + }; + + let cancellation_future = async move { + warn!("Region request from {:?} cancelled by client", remote_addr); + // If this future is executed it means the request future was dropped, + // so it doesn't actually matter what is returned here + Err(Status::cancelled("Region request cancelled by client")) + }; + cancellation::with_cancellation_handler(request_future, cancellation_future).await } } diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 9faad45b0c24..b2b198d851b8 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -60,7 +60,7 @@ impl MockGrpcServer { let service: FlightCraftWrapper<_> = GreptimeRequestHandler::new( self.query_handler.clone(), self.user_provider.clone(), - self.runtime.clone(), + Some(self.runtime.clone()), ) .into(); FlightServiceServer::new(service) diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 4c1b4641c9f4..2aa8b756d935 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -506,7 +506,7 @@ pub async fn setup_grpc_server_with( let greptime_request_handler = GreptimeRequestHandler::new( ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()), user_provider.clone(), - runtime.clone(), + Some(runtime.clone()), ); let flight_handler = Arc::new(greptime_request_handler.clone());