diff --git a/src/cmd/src/cli/database.rs b/src/cmd/src/cli/database.rs index a4eab9982609..9e6b752ea51b 100644 --- a/src/cmd/src/cli/database.rs +++ b/src/cmd/src/cli/database.rs @@ -30,7 +30,7 @@ pub(crate) struct DatabaseClient { addr: String, catalog: String, auth_header: Option, - timeout: Option, + timeout: Duration, } impl DatabaseClient { @@ -38,7 +38,7 @@ impl DatabaseClient { addr: String, catalog: String, auth_basic: Option, - timeout: Option, + timeout: Duration, ) -> Self { let auth_header = if let Some(basic) = auth_basic { let encoded = general_purpose::STANDARD.encode(basic); @@ -73,12 +73,11 @@ impl DatabaseClient { if let Some(ref auth) = self.auth_header { request = request.header("Authorization", auth); } - if let Some(ref timeout) = self.timeout { - request = request.header( - GREPTIME_DB_HEADER_TIMEOUT, - format_duration(*timeout).to_string(), - ); - } + + request = request.header( + GREPTIME_DB_HEADER_TIMEOUT, + format_duration(self.timeout).to_string(), + ); let response = request.send().await.with_context(|_| HttpQuerySqlSnafu { reason: format!("bad url: {}", url), diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 760fdbbc02f6..6d6cb6756b82 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -86,6 +86,9 @@ pub struct ExportCommand { auth_basic: Option, /// The timeout of invoking the database. + /// + /// It is used to override the server-side timeout setting. + /// The default behavior will disable server-side default timeout(i.e. `0s`). #[clap(long, value_parser = humantime::parse_duration)] timeout: Option, } @@ -98,7 +101,8 @@ impl ExportCommand { self.addr.clone(), catalog.clone(), self.auth_basic.clone(), - self.timeout, + // Treats `None` as `0s` to disable server-side default timeout. + self.timeout.unwrap_or_default(), ); Ok(Instance::new( diff --git a/src/cmd/src/cli/import.rs b/src/cmd/src/cli/import.rs index 908fc944bd03..9cb7b60f59e7 100644 --- a/src/cmd/src/cli/import.rs +++ b/src/cmd/src/cli/import.rs @@ -71,6 +71,9 @@ pub struct ImportCommand { auth_basic: Option, /// The timeout of invoking the database. + /// + /// It is used to override the server-side timeout setting. + /// The default behavior will disable server-side default timeout(i.e. `0s`). #[clap(long, value_parser = humantime::parse_duration)] timeout: Option, } @@ -82,7 +85,8 @@ impl ImportCommand { self.addr.clone(), catalog.clone(), self.auth_basic.clone(), - self.timeout, + // Treats `None` as `0s` to disable server-side default timeout. + self.timeout.unwrap_or_default(), ); Ok(Instance::new( diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index c3cd977242a1..62ae15b8acc9 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1119,6 +1119,29 @@ mod test { assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT); let elapsed = now.elapsed(); assert!(elapsed > Duration::from_millis(15)); + + tokio::time::timeout( + Duration::from_millis(15), + client + .get("/test/timeout") + .header(GREPTIME_DB_HEADER_TIMEOUT, "0s") + .send(), + ) + .await + .unwrap_err(); + + tokio::time::timeout( + Duration::from_millis(15), + client + .get("/test/timeout") + .header( + GREPTIME_DB_HEADER_TIMEOUT, + humantime::format_duration(Duration::default()).to_string(), + ) + .send(), + ) + .await + .unwrap_err(); } #[tokio::test] diff --git a/src/servers/src/http/test_helpers.rs b/src/servers/src/http/test_helpers.rs index 0142fc5180fc..c5fe975a1472 100644 --- a/src/servers/src/http/test_helpers.rs +++ b/src/servers/src/http/test_helpers.rs @@ -194,6 +194,7 @@ impl RequestBuilder { /// This is convenient for tests where panics are what you want. For access to /// non-panicking versions or the complete `Response` API use `into_inner()` or /// `as_ref()`. +#[derive(Debug)] pub struct TestResponse { response: reqwest::Response, } diff --git a/src/servers/src/http/timeout.rs b/src/servers/src/http/timeout.rs index 7a42918124d4..c03620011e9d 100644 --- a/src/servers/src/http/timeout.rs +++ b/src/servers/src/http/timeout.rs @@ -21,7 +21,7 @@ use axum::body::Body; use axum::http::Request; use axum::response::Response; use pin_project::pin_project; -use tokio::time::Sleep; +use tokio::time::{Instant, Sleep}; use tower::timeout::error::Elapsed; use tower::{BoxError, Layer, Service}; @@ -128,7 +128,7 @@ where } fn call(&mut self, request: Request) -> Self::Future { - let user_timeout = request + let timeout = request .headers() .get(GREPTIME_DB_HEADER_TIMEOUT) .and_then(|value| { @@ -136,9 +136,17 @@ where .to_str() .ok() .and_then(|value| humantime::parse_duration(value).ok()) - }); + }) + .unwrap_or(self.default_timeout); let response = self.inner.call(request); - let sleep = tokio::time::sleep(user_timeout.unwrap_or(self.default_timeout)); - ResponseFuture::new(response, sleep) + + if timeout.is_zero() { + // 30 years. See `Instant::far_future`. + let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); + ResponseFuture::new(response, tokio::time::sleep_until(far_future)) + } else { + let sleep = tokio::time::sleep(timeout); + ResponseFuture::new(response, sleep) + } } }