From e252212a20dfd25f54f87dc3528745a1cc82180b Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 4 Jun 2024 01:39:18 +0800 Subject: [PATCH] chore: add unit test for cancellation --- src/servers/src/grpc/cancellation.rs | 45 ++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/servers/src/grpc/cancellation.rs b/src/servers/src/grpc/cancellation.rs index 0c8fc5e2ee64..e4f6571d3937 100644 --- a/src/servers/src/grpc/cancellation.rs +++ b/src/servers/src/grpc/cancellation.rs @@ -42,3 +42,48 @@ where select_task.await.unwrap() } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::sync::oneshot; + use tokio::time; + use tonic::Response; + + use super::*; + + #[tokio::test] + async fn test_request_completes_first() { + let request = async { Ok(Response::new("Request Completed")) }; + + let cancellation = async { + time::sleep(Duration::from_secs(1)).await; + Ok(Response::new("Cancelled")) + }; + + let result = with_cancellation_handler(request, cancellation).await; + assert_eq!(result.unwrap().into_inner(), "Request Completed"); + } + + #[tokio::test] + async fn test_cancellation_when_dropped() { + let (cancel_tx, cancel_rx) = oneshot::channel(); + let request = async { + time::sleep(Duration::from_secs(1)).await; + Ok(Response::new("Request Completed")) + }; + + let cancellation = async { + cancel_tx.send("Cancelled").unwrap(); + Ok(Response::new("Cancelled")) + }; + + let response_future = with_cancellation_handler(request, cancellation); + // It will drop the `response_future` and then call the `cancellation` future + let result = time::timeout(Duration::from_millis(50), response_future).await; + + assert!(result.is_err(), "Expected timeout error"); + assert_eq!("Cancelled", cancel_rx.await.unwrap()) + } +}