Skip to content

Commit

Permalink
feat: support gRPC cancellation (#4092)
Browse files Browse the repository at this point in the history
* feat: support cancellation

* chore: add unit test for cancellation

* chore: minor refactor

* feat: we do not need to spawn in distributed mode

---------

Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
fengjiachun and waynexia authored Jun 6, 2024
1 parent b03cb38 commit 4719569
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,11 @@ impl StartCommand {
);
let flownode = Arc::new(flow_builder.build().await);

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;

let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use common_config::{Configurable, Mode};
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
Expand Down Expand Up @@ -140,11 +140,15 @@ where
};

let user_provider = self.plugins.get::<UserProviderRef>();
let runtime = match opts.mode {
Mode::Standalone => Some(builder.runtime().clone()),
_ => None,
};

let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
builder.runtime().clone(),
runtime,
);

let grpc_server = builder
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ table.workspace = true
tokio.workspace = true
tokio-rustls = "0.25"
tokio-stream = { workspace = true, features = ["net"] }
tokio-util.workspace = true
tonic.workspace = true
tonic-reflection = "0.11"
tower = { workspace = true, features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod authorize;
pub mod builder;
mod cancellation;
mod database;
pub mod flight;
pub mod greptime_handler;
Expand Down
90 changes: 90 additions & 0 deletions src/servers/src/grpc/cancellation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use tokio::select;
use tokio_util::sync::CancellationToken;

type Result<T> = std::result::Result<tonic::Response<T>, tonic::Status>;

pub(crate) async fn with_cancellation_handler<Request, Cancellation, Response>(
request: Request,
cancellation: Cancellation,
) -> Result<Response>
where
Request: Future<Output = Result<Response>> + Send + 'static,
Cancellation: Future<Output = Result<Response>> + Send + 'static,
Response: Send + 'static,
{
let token = CancellationToken::new();
// Will call token.cancel() when the future is dropped, such as when the client cancels the request
let _drop_guard = token.clone().drop_guard();
let select_task = tokio::spawn(async move {
// Can select on token cancellation on any cancellable future while handling the request,
// allowing for custom cleanup code or monitoring
select! {
res = request => res,
_ = token.cancelled() => cancellation.await,
}
});

select_task.await.unwrap()
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use tokio::sync::mpsc;
use tokio::time;
use tonic::Response;

use super::*;

#[tokio::test]
async fn test_request_completes_first() {
let request = async { Ok(Response::new("Request Completed")) };

let cancellation = async {
time::sleep(Duration::from_secs(1)).await;
Ok(Response::new("Cancelled"))
};

let result = with_cancellation_handler(request, cancellation).await;
assert_eq!(result.unwrap().into_inner(), "Request Completed");
}

#[tokio::test]
async fn test_cancellation_when_dropped() {
let (tx, mut rx) = mpsc::channel(2);
let tx_cloned = tx.clone();
let request = async move {
time::sleep(Duration::from_secs(1)).await;
tx_cloned.send("Request Completed").await.unwrap();
Ok(Response::new("Completed"))
};
let cancellation = async move {
tx.send("Request Cancelled").await.unwrap();
Ok(Response::new("Cancelled"))
};

let response_future = with_cancellation_handler(request, cancellation);
// It will drop the `response_future` and then call the `cancellation` future
let result = time::timeout(Duration::from_millis(50), response_future).await;

assert!(result.is_err(), "Expected timeout error");
assert_eq!("Request Cancelled", rx.recv().await.unwrap())
}
}
113 changes: 75 additions & 38 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::warn;
use futures::StreamExt;
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::TonicResult;
use crate::grpc::{cancellation, TonicResult};

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
Expand All @@ -40,55 +41,91 @@ impl GreptimeDatabase for DatabaseService {
&self,
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
let output = handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
};

Ok(Response::new(message))
};

let cancellation_future = async move {
warn!(
"GreptimeDatabase::Handle: request from {:?} cancelled by client",
remote_addr
);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled(
"GreptimeDatabase::Handle: request cancelled by client",
))
};
Ok(Response::new(message))
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}

async fn handle_requests(
&self,
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let mut affected_rows = 0;
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
}
}
}
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
};

Ok(Response::new(message))
};

let cancellation_future = async move {
warn!(
"GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
remote_addr
);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled(
"GreptimeDatabase::HandleRequests: request cancelled by client",
))
};
Ok(Response::new(message))
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}
}
43 changes: 26 additions & 17 deletions src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
pub struct GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
runtime: Option<Arc<Runtime>>,
}

impl GreptimeRequestHandler {
pub fn new(
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
runtime: Option<Arc<Runtime>>,
) -> Self {
Self {
handler,
Expand All @@ -73,16 +73,9 @@ impl GreptimeRequestHandler {
let request_type = request_type(&query).to_string();
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);

// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// - Refer to our blog for the rational behind it:
// https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let tracing_context = TracingContext::from_current_span();
let handle = self.runtime.spawn(async move {

let result_future = async move {
handler
.do_query(query, query_ctx)
.trace(tracing_context.attach(tracing::info_span!(
Expand All @@ -98,12 +91,28 @@ impl GreptimeRequestHandler {
}
e
})
});

handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
e
})?
};

match &self.runtime {
Some(runtime) => {
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// - Refer to our blog for the rational behind it:
// https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
runtime
.spawn(result_future)
.await
.context(JoinTaskSnafu)
.map_err(|e| {
timer.record(e.status_code());
e
})?
}
None => result_future.await,
}
}
}

Expand Down
Loading

0 comments on commit 4719569

Please sign in to comment.