diff --git a/dozer-api/src/grpc/client_server.rs b/dozer-api/src/grpc/client_server.rs index 057d98b4ad..66ab37aa5b 100644 --- a/dozer-api/src/grpc/client_server.rs +++ b/dozer-api/src/grpc/client_server.rs @@ -6,6 +6,7 @@ use crate::grpc::auth::AuthService; use crate::grpc::grpc_web_middleware::enable_grpc_web; use crate::grpc::health::HealthService; use crate::grpc::{common, run_server, typed}; +use crate::shutdown::ShutdownReceiver; use crate::{errors::GrpcError, CacheEndpoint}; use dozer_tracing::LabelsAndProgress; use dozer_types::grpc_types::health::health_check_response::ServingStatus; @@ -91,7 +92,7 @@ impl ApiServer { pub async fn run( &self, cache_endpoints: Vec>, - shutdown: impl Future + Send + 'static, + shutdown: ShutdownReceiver, operations_receiver: Option>, labels: LabelsAndProgress, default_max_num_records: usize, diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index aa935d3a67..0f5ddb27f2 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -26,6 +26,7 @@ use tokio::sync::Mutex; use crate::errors::GrpcError; use crate::grpc::run_server; +use crate::shutdown::ShutdownReceiver; #[derive(Debug, Clone)] pub struct LogEndpoint { @@ -173,7 +174,7 @@ async fn serialize_log_response(response: LogResponseFuture) -> Result, options: &AppGrpcOptions, - shutdown: impl Future + Send + 'static, + shutdown: ShutdownReceiver, ) -> Result>, GrpcError> { let endpoints = endpoint_and_logs .into_iter() diff --git a/dozer-api/src/grpc/mod.rs b/dozer-api/src/grpc/mod.rs index 49f250ca21..aaa46b9ea7 100644 --- a/dozer-api/src/grpc/mod.rs +++ b/dozer-api/src/grpc/mod.rs @@ -11,23 +11,127 @@ mod shared_impl; pub mod typed; pub mod types_helper; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::Bytes; pub use client_server::ApiServer; use dozer_types::errors::internal::BoxedError; -use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming}; -use futures_util::{ - stream::{AbortHandle, Abortable, Aborted}, - Future, +use dozer_types::tonic::transport::server::{ + Connected, Router, Routes, TcpConnectInfo, TcpIncoming, }; +use futures_util::Future; +use futures_util::StreamExt; pub use grpc_web_middleware::enable_grpc_web; use http::{Request, Response}; +use hyper::server::conn::AddrStream; use hyper::Body; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tower::{Layer, Service}; +use crate::shutdown::ShutdownReceiver; + +#[derive(Debug)] +struct ShutdownAddrStream { + inner: AddrStream, + state: ShutdownState, +} + +#[derive(Debug)] +enum ShutdownState { + SignalPending(F), + ShutdownPending, + Done, +} + +impl + Unpin> ShutdownAddrStream { + fn check_shutdown(&mut self, cx: &mut Context<'_>) -> Result<(), io::Error> { + match &mut self.state { + ShutdownState::SignalPending(signal) => { + if let Poll::Ready(()) = Pin::new(signal).poll(cx) { + self.state = ShutdownState::ShutdownPending; + self.check_shutdown(cx) + } else { + Ok(()) + } + } + ShutdownState::ShutdownPending => match Pin::new(&mut self.inner).poll_shutdown(cx) { + Poll::Ready(Ok(())) => { + self.state = ShutdownState::Done; + Ok(()) + } + Poll::Ready(Err(e)) => Err(e), + Poll::Pending => Ok(()), + }, + ShutdownState::Done => Ok(()), + } + } + + fn poll_impl( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + func: fn(Pin<&mut AddrStream>, &mut Context<'_>) -> Poll>, + ) -> Poll> { + let this = Pin::into_inner(self); + if let Err(e) = this.check_shutdown(cx) { + return Poll::Ready(Err(e)); + } + + func(Pin::new(&mut this.inner), cx) + } +} + +impl + Unpin> AsyncRead for ShutdownAddrStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = Pin::into_inner(self); + if let Err(e) = this.check_shutdown(cx) { + return Poll::Ready(Err(e)); + } + + Pin::new(&mut this.inner).poll_read(cx, buf) + } +} + +impl + Unpin> AsyncWrite for ShutdownAddrStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = Pin::into_inner(self); + if let Err(e) = this.check_shutdown(cx) { + return Poll::Ready(Err(e)); + } + + Pin::new(&mut this.inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_impl(cx, AsyncWrite::poll_flush) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_impl(cx, AsyncWrite::poll_shutdown) + } +} + +impl Connected for ShutdownAddrStream { + type ConnectInfo = TcpConnectInfo; + + fn connect_info(&self) -> Self::ConnectInfo { + self.inner.connect_info() + } +} + async fn run_server( server: Router, incoming: TcpIncoming, - shutdown: impl Future + Send + 'static, + shutdown: ShutdownReceiver, ) -> Result<(), dozer_types::tonic::transport::Error> where L: Layer, @@ -37,16 +141,17 @@ where ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { - // Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection. - // So we just abort the server when the shutdown signal is received. - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - tokio::spawn(async move { - shutdown.await; - abort_handle.abort(); + let incoming = incoming.map(|stream| { + stream.map(|stream| { + let shutdown = shutdown.create_shutdown_future(); + ShutdownAddrStream { + inner: stream, + state: ShutdownState::SignalPending(Box::pin(shutdown)), + } + }) }); - match Abortable::new(server.serve_with_incoming(incoming), abort_registration).await { - Ok(result) => result, - Err(Aborted) => Ok(()), - } + server + .serve_with_incoming_shutdown(incoming, shutdown.create_shutdown_future()) + .await } diff --git a/dozer-api/src/lib.rs b/dozer-api/src/lib.rs index 76535c9a5d..fe984597dd 100644 --- a/dozer-api/src/lib.rs +++ b/dozer-api/src/lib.rs @@ -181,6 +181,7 @@ pub use actix_web_httpauth; pub use api_helper::API_LATENCY_HISTOGRAM_NAME; pub use api_helper::API_REQUEST_COUNTER_NAME; pub use async_trait; +pub mod shutdown; pub use dozer_types::tonic; use errors::ApiInitError; pub use openapiv3; diff --git a/dozer-cli/src/shutdown.rs b/dozer-api/src/shutdown.rs similarity index 97% rename from dozer-cli/src/shutdown.rs rename to dozer-api/src/shutdown.rs index d65b1d1cdf..dae29f7883 100644 --- a/dozer-cli/src/shutdown.rs +++ b/dozer-api/src/shutdown.rs @@ -3,7 +3,7 @@ use std::sync::{ Arc, }; -use futures::Future; +use futures_util::Future; use tokio::{ runtime::Runtime, sync::watch::{channel, Receiver, Sender}, diff --git a/dozer-cli/src/lib.rs b/dozer-cli/src/lib.rs index 8ed645d374..dfeb6a556d 100644 --- a/dozer-cli/src/lib.rs +++ b/dozer-cli/src/lib.rs @@ -2,13 +2,12 @@ pub mod cli; pub mod errors; pub mod live; pub mod pipeline; -pub mod shutdown; pub mod simple; +use dozer_api::shutdown::ShutdownSender; use dozer_core::{app::AppPipeline, errors::ExecutionError}; use dozer_sql::{builder::statement_to_pipeline, errors::PipelineError}; use dozer_types::log::debug; use errors::OrchestrationError; -use shutdown::ShutdownSender; use std::{ backtrace::{Backtrace, BacktraceStatus}, panic, process, @@ -68,6 +67,7 @@ pub fn wrapped_statement_to_pipeline(sql: &str) -> Result, diff --git a/dozer-cli/src/pipeline/source_builder.rs b/dozer-cli/src/pipeline/source_builder.rs index 03bc6d4d45..2d54f23e62 100644 --- a/dozer-cli/src/pipeline/source_builder.rs +++ b/dozer-cli/src/pipeline/source_builder.rs @@ -1,6 +1,6 @@ use crate::pipeline::connector_source::ConnectorSourceFactory; -use crate::shutdown::ShutdownReceiver; use crate::OrchestrationError; +use dozer_api::shutdown::ShutdownReceiver; use dozer_core::appsource::{AppSourceManager, AppSourceMappings}; use dozer_ingestion::connectors::TableInfo; diff --git a/dozer-cli/src/pipeline/tests/builder.rs b/dozer-cli/src/pipeline/tests/builder.rs index 46c050aebc..3e4e4bfb68 100644 --- a/dozer-cli/src/pipeline/tests/builder.rs +++ b/dozer-cli/src/pipeline/tests/builder.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::pipeline::source_builder::SourceBuilder; use crate::pipeline::PipelineBuilder; +use dozer_api::shutdown; use dozer_types::ingestion_types::{GrpcConfig, GrpcConfigSchemas}; use dozer_types::models::config::Config; @@ -83,7 +84,7 @@ fn load_multi_sources() { .unwrap(); let source_builder = SourceBuilder::new(grouped_connections, Default::default()); - let (_sender, shutdown_receiver) = crate::shutdown::new(&runtime); + let (_sender, shutdown_receiver) = shutdown::new(&runtime); let asm = runtime .block_on(source_builder.build_source_manager(&runtime, shutdown_receiver)) .unwrap(); diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 73dc4630e4..b3158afb7e 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -1,4 +1,5 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint; +use dozer_api::shutdown::ShutdownReceiver; use dozer_cache::dozer_log::camino::Utf8Path; use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir}; use dozer_cache::dozer_log::replication::Log; @@ -15,7 +16,6 @@ use dozer_types::models::source::Source; use dozer_types::models::udf_config::UdfConfig; use crate::pipeline::PipelineBuilder; -use crate::shutdown::ShutdownReceiver; use dozer_core::executor::{DagExecutor, ExecutorOptions}; use dozer_types::models::connection::Connection; diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 9a5596e4dc..8b969a2bfb 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -2,7 +2,6 @@ use super::executor::{run_dag_executor, Executor}; use super::Contract; use crate::errors::OrchestrationError; use crate::pipeline::PipelineBuilder; -use crate::shutdown::ShutdownReceiver; use crate::simple::build; use crate::simple::helper::validate_config; use crate::utils::{ @@ -13,6 +12,7 @@ use crate::utils::{ use crate::{flatten_join_handle, join_handle_map_err}; use dozer_api::auth::{Access, Authorizer}; use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server; +use dozer_api::shutdown::ShutdownReceiver; use dozer_api::{get_api_security, grpc, rest, CacheEndpoint}; use dozer_cache::cache::LmdbRwCacheManager; use dozer_cache::dozer_log::camino::Utf8PathBuf; @@ -160,11 +160,10 @@ impl SimpleOrchestrator { let grpc_handle = if grpc_config.enabled.unwrap_or(true) { let api_security = self.config.api.api_security.clone(); let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags); - let shutdown = shutdown.create_shutdown_future(); let grpc_server = grpc_server .run( cache_endpoints, - shutdown, + shutdown.clone(), operations_receiver, self.labels.clone(), default_max_num_records, @@ -247,7 +246,7 @@ impl SimpleOrchestrator { .block_on(start_internal_pipeline_server( endpoint_and_logs, app_grpc_config, - shutdown.create_shutdown_future(), + shutdown.clone(), )) .map_err(OrchestrationError::InternalServerFailed)?; diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index 627c529b27..dff4965a9a 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -16,6 +16,7 @@ use dozer_types::models::api_endpoint::{ use dozer_types::tonic::transport::Channel; use dozer_types::tonic::Streaming; use dozer_types::{bincode, serde_json}; +use tokio::select; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -265,10 +266,27 @@ async fn call_get_log_once( } async fn log_reader_worker( + log_client: LogClient, + pos: u64, + options: LogReaderOptions, + op_sender: Sender, +) -> Result<(), ReaderError> { + select! { + _ = op_sender.closed() => { + debug!("Log reader thread quit because LogReader was dropped"); + Ok(()) + } + result = log_reader_worker_loop(log_client, pos, options, &op_sender) => { + result + } + } +} + +async fn log_reader_worker_loop( mut log_client: LogClient, mut pos: u64, options: LogReaderOptions, - op_sender: Sender, + op_sender: &Sender, ) -> Result<(), ReaderError> { loop { // Request ops. diff --git a/dozer-tests/src/tests/e2e/mod.rs b/dozer-tests/src/tests/e2e/mod.rs index 7c9a5ed8c2..2ff27b9f3e 100644 --- a/dozer-tests/src/tests/e2e/mod.rs +++ b/dozer-tests/src/tests/e2e/mod.rs @@ -1,10 +1,10 @@ use std::{future::Future, sync::Arc, thread::JoinHandle, time::Duration}; -use dozer_api::tonic::transport::Channel; -use dozer_cli::{ +use dozer_api::{ shutdown::{self, ShutdownSender}, - simple::SimpleOrchestrator, + tonic::transport::Channel, }; +use dozer_cli::simple::SimpleOrchestrator; use dozer_types::{ grpc_types::{ common::common_grpc_service_client::CommonGrpcServiceClient,