From 1be711f6c4643002863e15946abd5b5ddc44790a Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Tue, 26 Sep 2023 22:28:02 +0800 Subject: [PATCH 1/6] chore: Move `shutdown` module to `dozer-api` --- dozer-api/src/lib.rs | 1 + {dozer-cli => dozer-api}/src/shutdown.rs | 2 +- dozer-cli/src/lib.rs | 4 ++-- dozer-cli/src/live/mod.rs | 3 ++- dozer-cli/src/live/progress.rs | 3 +-- dozer-cli/src/live/state.rs | 2 +- dozer-cli/src/live/watcher.rs | 3 +-- dozer-cli/src/main.rs | 3 ++- dozer-cli/src/pipeline/builder.rs | 2 +- dozer-cli/src/pipeline/connector_source.rs | 3 +-- dozer-cli/src/pipeline/source_builder.rs | 2 +- dozer-cli/src/pipeline/tests/builder.rs | 3 ++- dozer-cli/src/simple/executor.rs | 2 +- dozer-cli/src/simple/orchestrator.rs | 2 +- dozer-tests/src/tests/e2e/mod.rs | 6 +++--- 15 files changed, 21 insertions(+), 20 deletions(-) rename {dozer-cli => dozer-api}/src/shutdown.rs (97%) diff --git a/dozer-api/src/lib.rs b/dozer-api/src/lib.rs index 76535c9a5d..fe984597dd 100644 --- a/dozer-api/src/lib.rs +++ b/dozer-api/src/lib.rs @@ -181,6 +181,7 @@ pub use actix_web_httpauth; pub use api_helper::API_LATENCY_HISTOGRAM_NAME; pub use api_helper::API_REQUEST_COUNTER_NAME; pub use async_trait; +pub mod shutdown; pub use dozer_types::tonic; use errors::ApiInitError; pub use openapiv3; diff --git a/dozer-cli/src/shutdown.rs b/dozer-api/src/shutdown.rs similarity index 97% rename from dozer-cli/src/shutdown.rs rename to dozer-api/src/shutdown.rs index d65b1d1cdf..dae29f7883 100644 --- a/dozer-cli/src/shutdown.rs +++ b/dozer-api/src/shutdown.rs @@ -3,7 +3,7 @@ use std::sync::{ Arc, }; -use futures::Future; +use futures_util::Future; use tokio::{ runtime::Runtime, sync::watch::{channel, Receiver, Sender}, diff --git a/dozer-cli/src/lib.rs b/dozer-cli/src/lib.rs index 8ed645d374..dfeb6a556d 100644 --- a/dozer-cli/src/lib.rs +++ b/dozer-cli/src/lib.rs @@ -2,13 +2,12 @@ pub mod cli; pub mod errors; pub mod live; pub mod pipeline; -pub mod shutdown; pub mod simple; +use dozer_api::shutdown::ShutdownSender; use dozer_core::{app::AppPipeline, errors::ExecutionError}; use dozer_sql::{builder::statement_to_pipeline, errors::PipelineError}; use dozer_types::log::debug; use errors::OrchestrationError; -use shutdown::ShutdownSender; use std::{ backtrace::{Backtrace, BacktraceStatus}, panic, process, @@ -68,6 +67,7 @@ pub fn wrapped_statement_to_pipeline(sql: &str) -> Result, diff --git a/dozer-cli/src/pipeline/source_builder.rs b/dozer-cli/src/pipeline/source_builder.rs index 03bc6d4d45..2d54f23e62 100644 --- a/dozer-cli/src/pipeline/source_builder.rs +++ b/dozer-cli/src/pipeline/source_builder.rs @@ -1,6 +1,6 @@ use crate::pipeline::connector_source::ConnectorSourceFactory; -use crate::shutdown::ShutdownReceiver; use crate::OrchestrationError; +use dozer_api::shutdown::ShutdownReceiver; use dozer_core::appsource::{AppSourceManager, AppSourceMappings}; use dozer_ingestion::connectors::TableInfo; diff --git a/dozer-cli/src/pipeline/tests/builder.rs b/dozer-cli/src/pipeline/tests/builder.rs index 46c050aebc..3e4e4bfb68 100644 --- a/dozer-cli/src/pipeline/tests/builder.rs +++ b/dozer-cli/src/pipeline/tests/builder.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::pipeline::source_builder::SourceBuilder; use crate::pipeline::PipelineBuilder; +use dozer_api::shutdown; use dozer_types::ingestion_types::{GrpcConfig, GrpcConfigSchemas}; use dozer_types::models::config::Config; @@ -83,7 +84,7 @@ fn load_multi_sources() { .unwrap(); let source_builder = SourceBuilder::new(grouped_connections, Default::default()); - let (_sender, shutdown_receiver) = crate::shutdown::new(&runtime); + let (_sender, shutdown_receiver) = shutdown::new(&runtime); let asm = runtime .block_on(source_builder.build_source_manager(&runtime, shutdown_receiver)) .unwrap(); diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 73dc4630e4..b3158afb7e 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -1,4 +1,5 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint; +use dozer_api::shutdown::ShutdownReceiver; use dozer_cache::dozer_log::camino::Utf8Path; use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir}; use dozer_cache::dozer_log::replication::Log; @@ -15,7 +16,6 @@ use dozer_types::models::source::Source; use dozer_types::models::udf_config::UdfConfig; use crate::pipeline::PipelineBuilder; -use crate::shutdown::ShutdownReceiver; use dozer_core::executor::{DagExecutor, ExecutorOptions}; use dozer_types::models::connection::Connection; diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 9a5596e4dc..39596bfa2e 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -2,7 +2,6 @@ use super::executor::{run_dag_executor, Executor}; use super::Contract; use crate::errors::OrchestrationError; use crate::pipeline::PipelineBuilder; -use crate::shutdown::ShutdownReceiver; use crate::simple::build; use crate::simple::helper::validate_config; use crate::utils::{ @@ -13,6 +12,7 @@ use crate::utils::{ use crate::{flatten_join_handle, join_handle_map_err}; use dozer_api::auth::{Access, Authorizer}; use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server; +use dozer_api::shutdown::ShutdownReceiver; use dozer_api::{get_api_security, grpc, rest, CacheEndpoint}; use dozer_cache::cache::LmdbRwCacheManager; use dozer_cache::dozer_log::camino::Utf8PathBuf; diff --git a/dozer-tests/src/tests/e2e/mod.rs b/dozer-tests/src/tests/e2e/mod.rs index 7c9a5ed8c2..2ff27b9f3e 100644 --- a/dozer-tests/src/tests/e2e/mod.rs +++ b/dozer-tests/src/tests/e2e/mod.rs @@ -1,10 +1,10 @@ use std::{future::Future, sync::Arc, thread::JoinHandle, time::Duration}; -use dozer_api::tonic::transport::Channel; -use dozer_cli::{ +use dozer_api::{ shutdown::{self, ShutdownSender}, - simple::SimpleOrchestrator, + tonic::transport::Channel, }; +use dozer_cli::simple::SimpleOrchestrator; use dozer_types::{ grpc_types::{ common::common_grpc_service_client::CommonGrpcServiceClient, From 66074f9199c136357a7837122c3198c367adc0e1 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Tue, 26 Sep 2023 23:39:08 +0800 Subject: [PATCH 2/6] fix: Close connections in gRPC server on shutdown --- dozer-api/src/grpc/client_server.rs | 3 +- .../grpc/internal/internal_pipeline_server.rs | 3 +- dozer-api/src/grpc/mod.rs | 151 ++++++++++++++++-- dozer-cli/src/simple/orchestrator.rs | 5 +- 4 files changed, 142 insertions(+), 20 deletions(-) diff --git a/dozer-api/src/grpc/client_server.rs b/dozer-api/src/grpc/client_server.rs index 057d98b4ad..66ab37aa5b 100644 --- a/dozer-api/src/grpc/client_server.rs +++ b/dozer-api/src/grpc/client_server.rs @@ -6,6 +6,7 @@ use crate::grpc::auth::AuthService; use crate::grpc::grpc_web_middleware::enable_grpc_web; use crate::grpc::health::HealthService; use crate::grpc::{common, run_server, typed}; +use crate::shutdown::ShutdownReceiver; use crate::{errors::GrpcError, CacheEndpoint}; use dozer_tracing::LabelsAndProgress; use dozer_types::grpc_types::health::health_check_response::ServingStatus; @@ -91,7 +92,7 @@ impl ApiServer { pub async fn run( &self, cache_endpoints: Vec>, - shutdown: impl Future + Send + 'static, + shutdown: ShutdownReceiver, operations_receiver: Option>, labels: LabelsAndProgress, default_max_num_records: usize, diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index aa935d3a67..0f5ddb27f2 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -26,6 +26,7 @@ use tokio::sync::Mutex; use crate::errors::GrpcError; use crate::grpc::run_server; +use crate::shutdown::ShutdownReceiver; #[derive(Debug, Clone)] pub struct LogEndpoint { @@ -173,7 +174,7 @@ async fn serialize_log_response(response: LogResponseFuture) -> Result, options: &AppGrpcOptions, - shutdown: impl Future + Send + 'static, + shutdown: ShutdownReceiver, ) -> Result>, GrpcError> { let endpoints = endpoint_and_logs .into_iter() diff --git a/dozer-api/src/grpc/mod.rs b/dozer-api/src/grpc/mod.rs index 49f250ca21..c1898fadd2 100644 --- a/dozer-api/src/grpc/mod.rs +++ b/dozer-api/src/grpc/mod.rs @@ -11,23 +11,143 @@ mod shared_impl; pub mod typed; pub mod types_helper; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::Bytes; pub use client_server::ApiServer; use dozer_types::errors::internal::BoxedError; -use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming}; -use futures_util::{ - stream::{AbortHandle, Abortable, Aborted}, - Future, +use dozer_types::tonic::transport::server::{ + Connected, Router, Routes, TcpConnectInfo, TcpIncoming, }; +use futures_util::Future; +use futures_util::StreamExt; pub use grpc_web_middleware::enable_grpc_web; use http::{Request, Response}; +use hyper::server::conn::AddrStream; use hyper::Body; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tower::{Layer, Service}; +use crate::shutdown::ShutdownReceiver; + +#[derive(Debug)] +enum ShutdownAddrStream { + Alive { inner: AddrStream, shutdown: F }, + Shutdown { inner: AddrStream }, + Temp, +} + +impl + Unpin> ShutdownAddrStream { + fn check_shutdown(&mut self, cx: &mut Context<'_>) -> Result<(), io::Error> { + loop { + match self { + Self::Alive { shutdown, .. } => { + if let Poll::Ready(()) = Pin::new(shutdown).poll(cx) { + let mut temp = Self::Temp; + std::mem::swap(self, &mut temp); + let Self::Alive { inner, .. } = temp else { + unreachable!() + }; + *self = Self::Shutdown { inner }; + continue; + } else { + return Ok(()); + } + } + Self::Shutdown { inner } => { + if let Poll::Ready(Err(e)) = Pin::new(inner).poll_shutdown(cx) { + return Err(e); + } else { + return Ok(()); + } + } + Self::Temp => unreachable!(), + } + } + } + + fn pool_impl( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + func: fn(Pin<&mut AddrStream>, &mut Context<'_>) -> Poll>, + ) -> Poll> { + let this = Pin::into_inner(self); + if let Err(e) = this.check_shutdown(cx) { + return Poll::Ready(Err(e)); + } + + match this { + Self::Alive { inner, .. } => func(Pin::new(inner), cx), + Self::Shutdown { inner } => func(Pin::new(inner), cx), + Self::Temp => unreachable!(), + } + } +} + +impl + Unpin> AsyncRead for ShutdownAddrStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = Pin::into_inner(self); + if let Err(e) = this.check_shutdown(cx) { + return Poll::Ready(Err(e)); + } + + match this { + Self::Alive { inner, .. } => Pin::new(inner).poll_read(cx, buf), + Self::Shutdown { inner } => Pin::new(inner).poll_read(cx, buf), + Self::Temp => unreachable!(), + } + } +} + +impl + Unpin> AsyncWrite for ShutdownAddrStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = Pin::into_inner(self); + if let Err(e) = this.check_shutdown(cx) { + return Poll::Ready(Err(e)); + } + + match this { + Self::Alive { inner, .. } => Pin::new(inner).poll_write(cx, buf), + Self::Shutdown { inner } => Pin::new(inner).poll_write(cx, buf), + Self::Temp => unreachable!(), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.pool_impl(cx, AsyncWrite::poll_flush) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.pool_impl(cx, AsyncWrite::poll_shutdown) + } +} + +impl Connected for ShutdownAddrStream { + type ConnectInfo = TcpConnectInfo; + + fn connect_info(&self) -> Self::ConnectInfo { + match self { + Self::Alive { inner, .. } => inner.connect_info(), + Self::Shutdown { inner } => inner.connect_info(), + Self::Temp => unreachable!(), + } + } +} + async fn run_server( server: Router, incoming: TcpIncoming, - shutdown: impl Future + Send + 'static, + shutdown: ShutdownReceiver, ) -> Result<(), dozer_types::tonic::transport::Error> where L: Layer, @@ -37,16 +157,17 @@ where ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { - // Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection. - // So we just abort the server when the shutdown signal is received. - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - tokio::spawn(async move { - shutdown.await; - abort_handle.abort(); + let incoming = incoming.map(|stream| { + stream.map(|stream| { + let shutdown = shutdown.create_shutdown_future(); + ShutdownAddrStream::Alive { + inner: stream, + shutdown: Box::pin(shutdown), + } + }) }); - match Abortable::new(server.serve_with_incoming(incoming), abort_registration).await { - Ok(result) => result, - Err(Aborted) => Ok(()), - } + server + .serve_with_incoming_shutdown(incoming, shutdown.create_shutdown_future()) + .await } diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 39596bfa2e..8b969a2bfb 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -160,11 +160,10 @@ impl SimpleOrchestrator { let grpc_handle = if grpc_config.enabled.unwrap_or(true) { let api_security = self.config.api.api_security.clone(); let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags); - let shutdown = shutdown.create_shutdown_future(); let grpc_server = grpc_server .run( cache_endpoints, - shutdown, + shutdown.clone(), operations_receiver, self.labels.clone(), default_max_num_records, @@ -247,7 +246,7 @@ impl SimpleOrchestrator { .block_on(start_internal_pipeline_server( endpoint_and_logs, app_grpc_config, - shutdown.create_shutdown_future(), + shutdown.clone(), )) .map_err(OrchestrationError::InternalServerFailed)?; From 7f19d8eb80b948cc8f91db09c1b552d90ed12253 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Wed, 27 Sep 2023 00:30:40 +0800 Subject: [PATCH 3/6] fix: `LogReader` task was not cancelled on drop --- dozer-log/src/reader.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index 627c529b27..ba0620935d 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -16,6 +16,7 @@ use dozer_types::models::api_endpoint::{ use dozer_types::tonic::transport::Channel; use dozer_types::tonic::Streaming; use dozer_types::{bincode, serde_json}; +use tokio::select; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -265,10 +266,27 @@ async fn call_get_log_once( } async fn log_reader_worker( + log_client: LogClient, + pos: u64, + options: LogReaderOptions, + op_sender: Sender, +) -> Result<(), ReaderError> { + select! { + _ = op_sender.closed() => { + debug!("Log reader thread quit because LogReader was dropped"); + return Ok(()); + } + result = log_reader_worker_loop(log_client, pos, options, &op_sender) => { + result + } + } +} + +async fn log_reader_worker_loop( mut log_client: LogClient, mut pos: u64, options: LogReaderOptions, - op_sender: Sender, + op_sender: &Sender, ) -> Result<(), ReaderError> { loop { // Request ops. From b4f59136a6c2ead8baeb99f5ad0ad24ac403dd68 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Wed, 27 Sep 2023 00:41:25 +0800 Subject: [PATCH 4/6] chore: clippy fix --- dozer-log/src/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index ba0620935d..dff4965a9a 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -274,7 +274,7 @@ async fn log_reader_worker( select! { _ = op_sender.closed() => { debug!("Log reader thread quit because LogReader was dropped"); - return Ok(()); + Ok(()) } result = log_reader_worker_loop(log_client, pos, options, &op_sender) => { result From 8e9d3226d61cfc0cc75cee8bfc2a1b452f18a066 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Wed, 27 Sep 2023 16:13:43 +0800 Subject: [PATCH 5/6] fix: `AddrStream::pool_shutdown` was not called pooled until completion --- dozer-api/src/grpc/mod.rs | 80 ++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 48 deletions(-) diff --git a/dozer-api/src/grpc/mod.rs b/dozer-api/src/grpc/mod.rs index c1898fadd2..b56696921c 100644 --- a/dozer-api/src/grpc/mod.rs +++ b/dozer-api/src/grpc/mod.rs @@ -33,38 +33,38 @@ use tower::{Layer, Service}; use crate::shutdown::ShutdownReceiver; #[derive(Debug)] -enum ShutdownAddrStream { - Alive { inner: AddrStream, shutdown: F }, - Shutdown { inner: AddrStream }, - Temp, +struct ShutdownAddrStream { + inner: AddrStream, + state: ShutdownState, +} + +#[derive(Debug)] +enum ShutdownState { + SignalPending(F), + ShutdownPending, + Done, } impl + Unpin> ShutdownAddrStream { fn check_shutdown(&mut self, cx: &mut Context<'_>) -> Result<(), io::Error> { - loop { - match self { - Self::Alive { shutdown, .. } => { - if let Poll::Ready(()) = Pin::new(shutdown).poll(cx) { - let mut temp = Self::Temp; - std::mem::swap(self, &mut temp); - let Self::Alive { inner, .. } = temp else { - unreachable!() - }; - *self = Self::Shutdown { inner }; - continue; - } else { - return Ok(()); - } + match &mut self.state { + ShutdownState::SignalPending(signal) => { + if let Poll::Ready(()) = Pin::new(signal).poll(cx) { + self.state = ShutdownState::ShutdownPending; + self.check_shutdown(cx) + } else { + Ok(()) } - Self::Shutdown { inner } => { - if let Poll::Ready(Err(e)) = Pin::new(inner).poll_shutdown(cx) { - return Err(e); - } else { - return Ok(()); - } - } - Self::Temp => unreachable!(), } + ShutdownState::ShutdownPending => match Pin::new(&mut self.inner).poll_shutdown(cx) { + Poll::Ready(Ok(())) => { + self.state = ShutdownState::Done; + Ok(()) + } + Poll::Ready(Err(e)) => Err(e), + Poll::Pending => Ok(()), + }, + ShutdownState::Done => Ok(()), } } @@ -78,11 +78,7 @@ impl + Unpin> ShutdownAddrStream { return Poll::Ready(Err(e)); } - match this { - Self::Alive { inner, .. } => func(Pin::new(inner), cx), - Self::Shutdown { inner } => func(Pin::new(inner), cx), - Self::Temp => unreachable!(), - } + func(Pin::new(&mut this.inner), cx) } } @@ -97,11 +93,7 @@ impl + Unpin> AsyncRead for ShutdownAddrStream { return Poll::Ready(Err(e)); } - match this { - Self::Alive { inner, .. } => Pin::new(inner).poll_read(cx, buf), - Self::Shutdown { inner } => Pin::new(inner).poll_read(cx, buf), - Self::Temp => unreachable!(), - } + Pin::new(&mut this.inner).poll_read(cx, buf) } } @@ -116,11 +108,7 @@ impl + Unpin> AsyncWrite for ShutdownAddrStream { return Poll::Ready(Err(e)); } - match this { - Self::Alive { inner, .. } => Pin::new(inner).poll_write(cx, buf), - Self::Shutdown { inner } => Pin::new(inner).poll_write(cx, buf), - Self::Temp => unreachable!(), - } + Pin::new(&mut this.inner).poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -136,11 +124,7 @@ impl Connected for ShutdownAddrStream { type ConnectInfo = TcpConnectInfo; fn connect_info(&self) -> Self::ConnectInfo { - match self { - Self::Alive { inner, .. } => inner.connect_info(), - Self::Shutdown { inner } => inner.connect_info(), - Self::Temp => unreachable!(), - } + self.inner.connect_info() } } @@ -160,9 +144,9 @@ where let incoming = incoming.map(|stream| { stream.map(|stream| { let shutdown = shutdown.create_shutdown_future(); - ShutdownAddrStream::Alive { + ShutdownAddrStream { inner: stream, - shutdown: Box::pin(shutdown), + state: ShutdownState::SignalPending(Box::pin(shutdown)), } }) }); From ece772d2b36c0b7505bc50cdad7922205e3322c9 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Wed, 27 Sep 2023 16:26:33 +0800 Subject: [PATCH 6/6] fix: typo pool -> poll --- dozer-api/src/grpc/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dozer-api/src/grpc/mod.rs b/dozer-api/src/grpc/mod.rs index b56696921c..aaa46b9ea7 100644 --- a/dozer-api/src/grpc/mod.rs +++ b/dozer-api/src/grpc/mod.rs @@ -68,7 +68,7 @@ impl + Unpin> ShutdownAddrStream { } } - fn pool_impl( + fn poll_impl( self: Pin<&mut Self>, cx: &mut Context<'_>, func: fn(Pin<&mut AddrStream>, &mut Context<'_>) -> Poll>, @@ -112,11 +112,11 @@ impl + Unpin> AsyncWrite for ShutdownAddrStream { } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.pool_impl(cx, AsyncWrite::poll_flush) + self.poll_impl(cx, AsyncWrite::poll_flush) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.pool_impl(cx, AsyncWrite::poll_shutdown) + self.poll_impl(cx, AsyncWrite::poll_shutdown) } }