Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Close connections in gRPC server on shutdown #2089

Merged
merged 6 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::grpc::auth::AuthService;
use crate::grpc::grpc_web_middleware::enable_grpc_web;
use crate::grpc::health::HealthService;
use crate::grpc::{common, run_server, typed};
use crate::shutdown::ShutdownReceiver;
use crate::{errors::GrpcError, CacheEndpoint};
use dozer_tracing::LabelsAndProgress;
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
Expand Down Expand Up @@ -91,7 +92,7 @@ impl ApiServer {
pub async fn run(
&self,
cache_endpoints: Vec<Arc<CacheEndpoint>>,
shutdown: impl Future<Output = ()> + Send + 'static,
shutdown: ShutdownReceiver,
operations_receiver: Option<Receiver<Operation>>,
labels: LabelsAndProgress,
default_max_num_records: usize,
Expand Down
3 changes: 2 additions & 1 deletion dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::sync::Mutex;

use crate::errors::GrpcError;
use crate::grpc::run_server;
use crate::shutdown::ShutdownReceiver;

#[derive(Debug, Clone)]
pub struct LogEndpoint {
Expand Down Expand Up @@ -173,7 +174,7 @@ async fn serialize_log_response(response: LogResponseFuture) -> Result<LogRespon
pub async fn start_internal_pipeline_server(
endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
options: &AppGrpcOptions,
shutdown: impl Future<Output = ()> + Send + 'static,
shutdown: ShutdownReceiver,
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, GrpcError> {
let endpoints = endpoint_and_logs
.into_iter()
Expand Down
151 changes: 136 additions & 15 deletions dozer-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,143 @@ mod shared_impl;
pub mod typed;
pub mod types_helper;

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
pub use client_server::ApiServer;
use dozer_types::errors::internal::BoxedError;
use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming};
use futures_util::{
stream::{AbortHandle, Abortable, Aborted},
Future,
use dozer_types::tonic::transport::server::{
Connected, Router, Routes, TcpConnectInfo, TcpIncoming,
};
use futures_util::Future;
use futures_util::StreamExt;
pub use grpc_web_middleware::enable_grpc_web;
use http::{Request, Response};
use hyper::server::conn::AddrStream;
use hyper::Body;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tower::{Layer, Service};

use crate::shutdown::ShutdownReceiver;

#[derive(Debug)]
enum ShutdownAddrStream<F> {
Jesse-Bakker marked this conversation as resolved.
Show resolved Hide resolved
Alive { inner: AddrStream, shutdown: F },
Shutdown { inner: AddrStream },
Temp,
}

impl<F: Future<Output = ()> + Unpin> ShutdownAddrStream<F> {
fn check_shutdown(&mut self, cx: &mut Context<'_>) -> Result<(), io::Error> {
loop {
match self {
Self::Alive { shutdown, .. } => {
if let Poll::Ready(()) = Pin::new(shutdown).poll(cx) {
let mut temp = Self::Temp;
std::mem::swap(self, &mut temp);
let Self::Alive { inner, .. } = temp else {
unreachable!()
};
*self = Self::Shutdown { inner };
continue;
} else {
return Ok(());
}
}
Self::Shutdown { inner } => {
if let Poll::Ready(Err(e)) = Pin::new(inner).poll_shutdown(cx) {
return Err(e);
} else {
return Ok(());
}
}
Self::Temp => unreachable!(),
}
}
}

fn pool_impl<T>(
chubei marked this conversation as resolved.
Show resolved Hide resolved
self: Pin<&mut Self>,
cx: &mut Context<'_>,
func: fn(Pin<&mut AddrStream>, &mut Context<'_>) -> Poll<io::Result<T>>,
) -> Poll<io::Result<T>> {
let this = Pin::into_inner(self);
if let Err(e) = this.check_shutdown(cx) {
return Poll::Ready(Err(e));
}

match this {
Self::Alive { inner, .. } => func(Pin::new(inner), cx),
Self::Shutdown { inner } => func(Pin::new(inner), cx),
Self::Temp => unreachable!(),
}
}
}

impl<F: Future<Output = ()> + Unpin> AsyncRead for ShutdownAddrStream<F> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::into_inner(self);
if let Err(e) = this.check_shutdown(cx) {
return Poll::Ready(Err(e));
}

match this {
Self::Alive { inner, .. } => Pin::new(inner).poll_read(cx, buf),
Self::Shutdown { inner } => Pin::new(inner).poll_read(cx, buf),
Self::Temp => unreachable!(),
}
}
}

impl<F: Future<Output = ()> + Unpin> AsyncWrite for ShutdownAddrStream<F> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let this = Pin::into_inner(self);
if let Err(e) = this.check_shutdown(cx) {
return Poll::Ready(Err(e));
}

match this {
Self::Alive { inner, .. } => Pin::new(inner).poll_write(cx, buf),
Jesse-Bakker marked this conversation as resolved.
Show resolved Hide resolved
Self::Shutdown { inner } => Pin::new(inner).poll_write(cx, buf),
Self::Temp => unreachable!(),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.pool_impl(cx, AsyncWrite::poll_flush)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.pool_impl(cx, AsyncWrite::poll_shutdown)
}
}

impl<F> Connected for ShutdownAddrStream<F> {
type ConnectInfo = TcpConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
match self {
Self::Alive { inner, .. } => inner.connect_info(),
Self::Shutdown { inner } => inner.connect_info(),
Self::Temp => unreachable!(),
}
}
}

async fn run_server<L, ResBody>(
server: Router<L>,
incoming: TcpIncoming,
shutdown: impl Future<Output = ()> + Send + 'static,
shutdown: ShutdownReceiver,
) -> Result<(), dozer_types::tonic::transport::Error>
where
L: Layer<Routes>,
Expand All @@ -37,16 +157,17 @@ where
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<BoxedError>,
{
// Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
// So we just abort the server when the shutdown signal is received.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(async move {
shutdown.await;
abort_handle.abort();
let incoming = incoming.map(|stream| {
stream.map(|stream| {
let shutdown = shutdown.create_shutdown_future();
ShutdownAddrStream::Alive {
inner: stream,
shutdown: Box::pin(shutdown),
}
})
});

match Abortable::new(server.serve_with_incoming(incoming), abort_registration).await {
Ok(result) => result,
Err(Aborted) => Ok(()),
}
server
.serve_with_incoming_shutdown(incoming, shutdown.create_shutdown_future())
.await
}
1 change: 1 addition & 0 deletions dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ pub use actix_web_httpauth;
pub use api_helper::API_LATENCY_HISTOGRAM_NAME;
pub use api_helper::API_REQUEST_COUNTER_NAME;
pub use async_trait;
pub mod shutdown;
pub use dozer_types::tonic;
use errors::ApiInitError;
pub use openapiv3;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/shutdown.rs → dozer-api/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{
Arc,
};

use futures::Future;
use futures_util::Future;
use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver, Sender},
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ pub mod cli;
pub mod errors;
pub mod live;
pub mod pipeline;
pub mod shutdown;
pub mod simple;
use dozer_api::shutdown::ShutdownSender;
use dozer_core::{app::AppPipeline, errors::ExecutionError};
use dozer_sql::{builder::statement_to_pipeline, errors::PipelineError};
use dozer_types::log::debug;
use errors::OrchestrationError;
use shutdown::ShutdownSender;
use std::{
backtrace::{Backtrace, BacktraceStatus},
panic, process,
Expand Down Expand Up @@ -68,6 +67,7 @@ pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, Pipeline
use crate::cli::cloud::{
Cloud, DeployCommandArgs, ListCommandArgs, LogCommandArgs, SecretsCommand,
};
pub use dozer_api::shutdown;
pub use dozer_types::models::connection::Connection;
use dozer_types::tracing::error;

Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ mod server;
mod state;
mod watcher;
use self::state::LiveState;
use crate::{cli::types::Live, live::server::LIVE_PORT, shutdown::ShutdownReceiver};
use crate::{cli::types::Live, live::server::LIVE_PORT};
use dozer_api::shutdown::ShutdownReceiver;
use dozer_types::{grpc_types::live::ConnectResponse, log::info};
use std::sync::Arc;
mod progress;
Expand Down
3 changes: 1 addition & 2 deletions dozer-cli/src/live/progress.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering, time::Duration};

use dozer_api::shutdown::ShutdownReceiver;
use dozer_types::grpc_types::live::{ConnectResponse, Metric, ProgressResponse};
use prometheus_parse::Value;
use tokio::time::interval;

use crate::shutdown::ShutdownReceiver;

use super::LiveError;

const PROGRESS_POLL_FREQUENCY: u64 = 100;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, thread::JoinHandle};

use clap::Parser;

use dozer_api::shutdown::{self, ShutdownReceiver, ShutdownSender};
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, Dag};
use dozer_sql::builder::statement_to_pipeline;
Expand All @@ -26,7 +27,6 @@ use crate::{
cli::{init_dozer, types::Cli},
errors::OrchestrationError,
pipeline::PipelineBuilder,
shutdown::{self, ShutdownReceiver, ShutdownSender},
simple::{helper::validate_config, Contract, SimpleOrchestrator},
};

Expand Down
3 changes: 1 addition & 2 deletions dozer-cli/src/live/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{sync::Arc, time::Duration};

use crate::shutdown::ShutdownReceiver;

use super::{state::LiveState, LiveError};

use crate::live::state::BroadcastType;
use dozer_api::shutdown::ShutdownReceiver;
use dozer_types::log::info;
use notify::{RecursiveMode, Watcher};
use notify_debouncer_full::new_debouncer;
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use dozer_api::shutdown;
#[cfg(feature = "cloud")]
use dozer_cli::cli::cloud::CloudCommands;
use dozer_cli::cli::generate_config_repl;
Expand All @@ -8,7 +9,7 @@ use dozer_cli::errors::{CliError, CloudError, OrchestrationError};
use dozer_cli::simple::SimpleOrchestrator;
#[cfg(feature = "cloud")]
use dozer_cli::CloudOrchestrator;
use dozer_cli::{live, set_ctrl_handler, set_panic_hook, shutdown};
use dozer_cli::{live, set_ctrl_handler, set_panic_hook};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig};
use dozer_types::serde::Deserialize;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use dozer_api::shutdown::ShutdownReceiver;
use dozer_cache::dozer_log::replication::Log;
use dozer_core::app::App;
use dozer_core::app::AppPipeline;
Expand All @@ -24,7 +25,6 @@ use tokio::sync::Mutex;

use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
use crate::shutdown::ShutdownReceiver;

use super::source_builder::SourceBuilder;
use crate::errors::OrchestrationError;
Expand Down
3 changes: 1 addition & 2 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_api::shutdown::ShutdownReceiver;
use dozer_core::channels::SourceChannelForwarder;
use dozer_core::node::{
OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceState,
Expand Down Expand Up @@ -25,8 +26,6 @@ use std::sync::Arc;
use std::thread;
use tokio::runtime::Runtime;

use crate::shutdown::ShutdownReceiver;

#[derive(Debug)]
struct Table {
schema_name: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/source_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::pipeline::connector_source::ConnectorSourceFactory;
use crate::shutdown::ShutdownReceiver;
use crate::OrchestrationError;
use dozer_api::shutdown::ShutdownReceiver;
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
use dozer_ingestion::connectors::TableInfo;

Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use crate::pipeline::source_builder::SourceBuilder;
use crate::pipeline::PipelineBuilder;
use dozer_api::shutdown;
use dozer_types::ingestion_types::{GrpcConfig, GrpcConfigSchemas};
use dozer_types::models::config::Config;

Expand Down Expand Up @@ -83,7 +84,7 @@ fn load_multi_sources() {
.unwrap();

let source_builder = SourceBuilder::new(grouped_connections, Default::default());
let (_sender, shutdown_receiver) = crate::shutdown::new(&runtime);
let (_sender, shutdown_receiver) = shutdown::new(&runtime);
let asm = runtime
.block_on(source_builder.build_source_manager(&runtime, shutdown_receiver))
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_api::shutdown::ShutdownReceiver;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
Expand All @@ -15,7 +16,6 @@ use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;

use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
use dozer_core::executor::{DagExecutor, ExecutorOptions};

use dozer_types::models::connection::Connection;
Expand Down
Loading
Loading