Skip to content

Commit

Permalink
fix: Close connections in gRPC server on shutdown (#2089)
Browse files Browse the repository at this point in the history
* chore: Move `shutdown` module to `dozer-api`

* fix: Close connections in gRPC server on shutdown

* fix: `LogReader` task was not cancelled on drop

* chore: clippy fix

* fix: `AddrStream::pool_shutdown` was not called pooled until completion

* fix: typo pool -> poll
  • Loading branch information
chubei authored Sep 27, 2023
1 parent c1d7f73 commit 8296ead
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 41 deletions.
3 changes: 2 additions & 1 deletion dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::grpc::auth::AuthService;
use crate::grpc::grpc_web_middleware::enable_grpc_web;
use crate::grpc::health::HealthService;
use crate::grpc::{common, run_server, typed};
use crate::shutdown::ShutdownReceiver;
use crate::{errors::GrpcError, CacheEndpoint};
use dozer_tracing::LabelsAndProgress;
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
Expand Down Expand Up @@ -91,7 +92,7 @@ impl ApiServer {
pub async fn run(
&self,
cache_endpoints: Vec<Arc<CacheEndpoint>>,
shutdown: impl Future<Output = ()> + Send + 'static,
shutdown: ShutdownReceiver,
operations_receiver: Option<Receiver<Operation>>,
labels: LabelsAndProgress,
default_max_num_records: usize,
Expand Down
3 changes: 2 additions & 1 deletion dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::sync::Mutex;

use crate::errors::GrpcError;
use crate::grpc::run_server;
use crate::shutdown::ShutdownReceiver;

#[derive(Debug, Clone)]
pub struct LogEndpoint {
Expand Down Expand Up @@ -173,7 +174,7 @@ async fn serialize_log_response(response: LogResponseFuture) -> Result<LogRespon
pub async fn start_internal_pipeline_server(
endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
options: &AppGrpcOptions,
shutdown: impl Future<Output = ()> + Send + 'static,
shutdown: ShutdownReceiver,
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, GrpcError> {
let endpoints = endpoint_and_logs
.into_iter()
Expand Down
135 changes: 120 additions & 15 deletions dozer-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,127 @@ mod shared_impl;
pub mod typed;
pub mod types_helper;

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
pub use client_server::ApiServer;
use dozer_types::errors::internal::BoxedError;
use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming};
use futures_util::{
stream::{AbortHandle, Abortable, Aborted},
Future,
use dozer_types::tonic::transport::server::{
Connected, Router, Routes, TcpConnectInfo, TcpIncoming,
};
use futures_util::Future;
use futures_util::StreamExt;
pub use grpc_web_middleware::enable_grpc_web;
use http::{Request, Response};
use hyper::server::conn::AddrStream;
use hyper::Body;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tower::{Layer, Service};

use crate::shutdown::ShutdownReceiver;

#[derive(Debug)]
struct ShutdownAddrStream<F> {
inner: AddrStream,
state: ShutdownState<F>,
}

#[derive(Debug)]
enum ShutdownState<F> {
SignalPending(F),
ShutdownPending,
Done,
}

impl<F: Future<Output = ()> + Unpin> ShutdownAddrStream<F> {
fn check_shutdown(&mut self, cx: &mut Context<'_>) -> Result<(), io::Error> {
match &mut self.state {
ShutdownState::SignalPending(signal) => {
if let Poll::Ready(()) = Pin::new(signal).poll(cx) {
self.state = ShutdownState::ShutdownPending;
self.check_shutdown(cx)
} else {
Ok(())
}
}
ShutdownState::ShutdownPending => match Pin::new(&mut self.inner).poll_shutdown(cx) {
Poll::Ready(Ok(())) => {
self.state = ShutdownState::Done;
Ok(())
}
Poll::Ready(Err(e)) => Err(e),
Poll::Pending => Ok(()),
},
ShutdownState::Done => Ok(()),
}
}

fn poll_impl<T>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
func: fn(Pin<&mut AddrStream>, &mut Context<'_>) -> Poll<io::Result<T>>,
) -> Poll<io::Result<T>> {
let this = Pin::into_inner(self);
if let Err(e) = this.check_shutdown(cx) {
return Poll::Ready(Err(e));
}

func(Pin::new(&mut this.inner), cx)
}
}

impl<F: Future<Output = ()> + Unpin> AsyncRead for ShutdownAddrStream<F> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::into_inner(self);
if let Err(e) = this.check_shutdown(cx) {
return Poll::Ready(Err(e));
}

Pin::new(&mut this.inner).poll_read(cx, buf)
}
}

impl<F: Future<Output = ()> + Unpin> AsyncWrite for ShutdownAddrStream<F> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let this = Pin::into_inner(self);
if let Err(e) = this.check_shutdown(cx) {
return Poll::Ready(Err(e));
}

Pin::new(&mut this.inner).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.poll_impl(cx, AsyncWrite::poll_flush)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.poll_impl(cx, AsyncWrite::poll_shutdown)
}
}

impl<F> Connected for ShutdownAddrStream<F> {
type ConnectInfo = TcpConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
self.inner.connect_info()
}
}

async fn run_server<L, ResBody>(
server: Router<L>,
incoming: TcpIncoming,
shutdown: impl Future<Output = ()> + Send + 'static,
shutdown: ShutdownReceiver,
) -> Result<(), dozer_types::tonic::transport::Error>
where
L: Layer<Routes>,
Expand All @@ -37,16 +141,17 @@ where
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<BoxedError>,
{
// Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
// So we just abort the server when the shutdown signal is received.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(async move {
shutdown.await;
abort_handle.abort();
let incoming = incoming.map(|stream| {
stream.map(|stream| {
let shutdown = shutdown.create_shutdown_future();
ShutdownAddrStream {
inner: stream,
state: ShutdownState::SignalPending(Box::pin(shutdown)),
}
})
});

match Abortable::new(server.serve_with_incoming(incoming), abort_registration).await {
Ok(result) => result,
Err(Aborted) => Ok(()),
}
server
.serve_with_incoming_shutdown(incoming, shutdown.create_shutdown_future())
.await
}
1 change: 1 addition & 0 deletions dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ pub use actix_web_httpauth;
pub use api_helper::API_LATENCY_HISTOGRAM_NAME;
pub use api_helper::API_REQUEST_COUNTER_NAME;
pub use async_trait;
pub mod shutdown;
pub use dozer_types::tonic;
use errors::ApiInitError;
pub use openapiv3;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/shutdown.rs → dozer-api/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{
Arc,
};

use futures::Future;
use futures_util::Future;
use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver, Sender},
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ pub mod cli;
pub mod errors;
pub mod live;
pub mod pipeline;
pub mod shutdown;
pub mod simple;
use dozer_api::shutdown::ShutdownSender;
use dozer_core::{app::AppPipeline, errors::ExecutionError};
use dozer_sql::{builder::statement_to_pipeline, errors::PipelineError};
use dozer_types::log::debug;
use errors::OrchestrationError;
use shutdown::ShutdownSender;
use std::{
backtrace::{Backtrace, BacktraceStatus},
panic, process,
Expand Down Expand Up @@ -68,6 +67,7 @@ pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, Pipeline
use crate::cli::cloud::{
Cloud, DeployCommandArgs, ListCommandArgs, LogCommandArgs, SecretsCommand,
};
pub use dozer_api::shutdown;
pub use dozer_types::models::connection::Connection;
use dozer_types::tracing::error;

Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ mod server;
mod state;
mod watcher;
use self::state::LiveState;
use crate::{cli::types::Live, live::server::LIVE_PORT, shutdown::ShutdownReceiver};
use crate::{cli::types::Live, live::server::LIVE_PORT};
use dozer_api::shutdown::ShutdownReceiver;
use dozer_types::{grpc_types::live::ConnectResponse, log::info};
use std::sync::Arc;
mod progress;
Expand Down
3 changes: 1 addition & 2 deletions dozer-cli/src/live/progress.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering, time::Duration};

use dozer_api::shutdown::ShutdownReceiver;
use dozer_types::grpc_types::live::{ConnectResponse, Metric, ProgressResponse};
use prometheus_parse::Value;
use tokio::time::interval;

use crate::shutdown::ShutdownReceiver;

use super::LiveError;

const PROGRESS_POLL_FREQUENCY: u64 = 100;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, thread::JoinHandle};

use clap::Parser;

use dozer_api::shutdown::{self, ShutdownReceiver, ShutdownSender};
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, Dag};
use dozer_sql::builder::statement_to_pipeline;
Expand All @@ -26,7 +27,6 @@ use crate::{
cli::{init_dozer, types::Cli},
errors::OrchestrationError,
pipeline::PipelineBuilder,
shutdown::{self, ShutdownReceiver, ShutdownSender},
simple::{helper::validate_config, Contract, SimpleOrchestrator},
};

Expand Down
3 changes: 1 addition & 2 deletions dozer-cli/src/live/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{sync::Arc, time::Duration};

use crate::shutdown::ShutdownReceiver;

use super::{state::LiveState, LiveError};

use crate::live::state::BroadcastType;
use dozer_api::shutdown::ShutdownReceiver;
use dozer_types::log::info;
use notify::{RecursiveMode, Watcher};
use notify_debouncer_full::new_debouncer;
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use dozer_api::shutdown;
#[cfg(feature = "cloud")]
use dozer_cli::cli::cloud::CloudCommands;
use dozer_cli::cli::generate_config_repl;
Expand All @@ -8,7 +9,7 @@ use dozer_cli::errors::{CliError, CloudError, OrchestrationError};
use dozer_cli::simple::SimpleOrchestrator;
#[cfg(feature = "cloud")]
use dozer_cli::CloudOrchestrator;
use dozer_cli::{live, set_ctrl_handler, set_panic_hook, shutdown};
use dozer_cli::{live, set_ctrl_handler, set_panic_hook};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig};
use dozer_types::serde::Deserialize;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use dozer_api::shutdown::ShutdownReceiver;
use dozer_cache::dozer_log::replication::Log;
use dozer_core::app::App;
use dozer_core::app::AppPipeline;
Expand All @@ -24,7 +25,6 @@ use tokio::sync::Mutex;

use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
use crate::shutdown::ShutdownReceiver;

use super::source_builder::SourceBuilder;
use crate::errors::OrchestrationError;
Expand Down
3 changes: 1 addition & 2 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_api::shutdown::ShutdownReceiver;
use dozer_core::channels::SourceChannelForwarder;
use dozer_core::node::{
OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceState,
Expand Down Expand Up @@ -25,8 +26,6 @@ use std::sync::Arc;
use std::thread;
use tokio::runtime::Runtime;

use crate::shutdown::ShutdownReceiver;

#[derive(Debug)]
struct Table {
schema_name: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/source_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::pipeline::connector_source::ConnectorSourceFactory;
use crate::shutdown::ShutdownReceiver;
use crate::OrchestrationError;
use dozer_api::shutdown::ShutdownReceiver;
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
use dozer_ingestion::connectors::TableInfo;

Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use crate::pipeline::source_builder::SourceBuilder;
use crate::pipeline::PipelineBuilder;
use dozer_api::shutdown;
use dozer_types::ingestion_types::{GrpcConfig, GrpcConfigSchemas};
use dozer_types::models::config::Config;

Expand Down Expand Up @@ -83,7 +84,7 @@ fn load_multi_sources() {
.unwrap();

let source_builder = SourceBuilder::new(grouped_connections, Default::default());
let (_sender, shutdown_receiver) = crate::shutdown::new(&runtime);
let (_sender, shutdown_receiver) = shutdown::new(&runtime);
let asm = runtime
.block_on(source_builder.build_source_manager(&runtime, shutdown_receiver))
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_api::shutdown::ShutdownReceiver;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
Expand All @@ -15,7 +16,6 @@ use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;

use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
use dozer_core::executor::{DagExecutor, ExecutorOptions};

use dozer_types::models::connection::Connection;
Expand Down
Loading

0 comments on commit 8296ead

Please sign in to comment.