Skip to content

Commit

Permalink
feat: support cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jun 3, 2024
1 parent 4e5dd1e commit db27e7c
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ table.workspace = true
tokio.workspace = true
tokio-rustls = "0.25"
tokio-stream = { workspace = true, features = ["net"] }
tokio-util.workspace = true
tonic.workspace = true
tonic-reflection = "0.11"
tower = { workspace = true, features = ["full"] }
Expand Down
128 changes: 91 additions & 37 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::greptime_response::Response as RawResponse;
use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::warn;
use futures::StreamExt;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
Expand All @@ -40,55 +45,104 @@ impl GreptimeDatabase for DatabaseService {
&self,
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
let output = handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
};

Ok(Response::new(message))
};
Ok(Response::new(message))

let cancellation_future = async move {
warn!("Request from {:?} cancelled by client", remote_addr);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled("Request cancelled by client"))
};
with_cancellation_handler(request_future, cancellation_future).await
}

async fn handle_requests(
&self,
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let mut affected_rows = 0;
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
}
}
}
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
};

Ok(Response::new(message))
};
Ok(Response::new(message))

let cancellation_future = async move {
warn!("Request from {:?} cancelled by client", remote_addr);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled("Request cancelled by client"))
};
with_cancellation_handler(request_future, cancellation_future).await
}
}

async fn with_cancellation_handler<Request, Cancellation>(
request: Request,
cancellation: Cancellation,
) -> Result<Response<GreptimeResponse>, Status>
where
Request: Future<Output = Result<Response<GreptimeResponse>, Status>> + Send + 'static,
Cancellation: Future<Output = Result<Response<GreptimeResponse>, Status>> + Send + 'static,
{
let token = CancellationToken::new();
// Will call token.cancel() when the future is dropped, such as when the client cancels the request
let _drop_guard = token.clone().drop_guard();
let select_task = tokio::spawn(async move {
// Can select on token cancellation on any cancellable future while handling the request,
// allowing for custom cleanup code or monitoring
select! {
res = request => res,
_ = token.cancelled() => cancellation.await,
}
});

select_task.await.unwrap()
}
51 changes: 46 additions & 5 deletions src/servers/src/grpc/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::sync::Arc;

use api::v1::region::region_server::Region as RegionServer;
Expand All @@ -21,9 +22,11 @@ use common_error::ext::ErrorExt;
use common_runtime::Runtime;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error};
use common_telemetry::{debug, error, warn};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status};

use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result};
use crate::grpc::TonicResult;
Expand Down Expand Up @@ -94,8 +97,46 @@ impl RegionServer for RegionServerRequestHandler {
&self,
request: Request<RegionRequest>,
) -> TonicResult<Response<RegionResponse>> {
let request = request.into_inner();
let response = self.handle(request).await?;
Ok(Response::new(response))
let remote_addr = request.remote_addr();
let self_cloned = self.clone();
let request_future = async move {
let request = request.into_inner();
let response = self_cloned.handle(request).await?;

Ok(Response::new(response))
};

let cancellation_future = async move {
warn!("Region request from {:?} cancelled by client", remote_addr);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled("Region request cancelled by client"))
};
with_cancellation_handler(request_future, cancellation_future).await
}
}

async fn with_cancellation_handler<Request, Cancellation>(
request: Request,
cancellation: Cancellation,
) -> std::result::Result<Response<RegionResponse>, Status>
where
Request:
Future<Output = std::result::Result<Response<RegionResponse>, Status>> + Send + 'static,
Cancellation:
Future<Output = std::result::Result<Response<RegionResponse>, Status>> + Send + 'static,
{
let token = CancellationToken::new();
// Will call token.cancel() when the future is dropped, such as when the client cancels the request
let _drop_guard = token.clone().drop_guard();
let select_task = tokio::spawn(async move {
// Can select on token cancellation on any cancellable future while handling the request,
// allowing for custom cleanup code or monitoring
select! {
res = request => res,
_ = token.cancelled() => cancellation.await,
}
});

select_task.await.unwrap()
}

0 comments on commit db27e7c

Please sign in to comment.