diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index b701a0b8be431..8c0ade401cb47 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -34,6 +34,7 @@ #![feature(negative_impls)] #![feature(register_tool)] #![feature(assert_matches)] +#![feature(never_type)] #![register_tool(rw)] #![recursion_limit = "256"] diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 72edad9608bf6..26576cf3e3666 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::convert::Infallible; use std::num::NonZeroU64; use std::time::Instant; @@ -49,7 +48,7 @@ impl DecoupleCheckpointLogSinkerOf { #[async_trait] impl> LogSinker for DecoupleCheckpointLogSinkerOf { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; #[derive(Debug)] diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index de0daab48b19f..bdd923f786ec4 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -46,7 +46,6 @@ pub mod utils; pub mod writer; use std::collections::BTreeMap; -use std::convert::Infallible; use std::future::Future; use ::clickhouse::error::Error as ClickHouseError; @@ -387,7 +386,7 @@ impl SinkLogReader for R { #[async_trait] pub trait LogSinker: 'static { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result; + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result; } #[async_trait] diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index b61db79792848..f8b84fc64eb86 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeMap, VecDeque}; -use std::convert::Infallible; use std::marker::PhantomData; use std::ops::Deref; use std::pin::pin; @@ -302,7 +301,7 @@ impl RemoteLogSinker { #[async_trait] impl LogSinker for RemoteLogSinker { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; let sink_metrics = self.sink_metrics; diff --git a/src/connector/src/sink/trivial.rs b/src/connector/src/sink/trivial.rs index e78cfe936247c..0cfa82c5c4d19 100644 --- a/src/connector/src/sink/trivial.rs +++ b/src/connector/src/sink/trivial.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::convert::Infallible; use std::marker::PhantomData; use async_trait::async_trait; @@ -76,7 +75,7 @@ impl Sink for TrivialSink { #[async_trait] impl LogSinker for TrivialSink { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { loop { let (epoch, item) = log_reader.next_item().await?; match item { diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index dd7c8aef8d204..00917f26c4163 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::convert::Infallible; use std::future::{Future, Ready}; use std::pin::pin; use std::sync::Arc; @@ -127,7 +126,7 @@ impl LogSinkerOf { #[async_trait] impl> LogSinker for LogSinkerOf { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; #[derive(Debug)] @@ -243,10 +242,7 @@ impl AsyncTruncateLogSinkerOf { #[async_trait] impl LogSinker for AsyncTruncateLogSinkerOf { - async fn consume_log_and_sink( - mut self, - log_reader: &mut impl SinkLogReader, - ) -> Result { + async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { loop { let select_result = drop_either_future( select( diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 80a7235137e52..e8dfc09e9130c 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::convert::Infallible; use std::mem; use anyhow::anyhow; @@ -126,7 +125,6 @@ impl SinkExecutor { let sink_id = self.sink_param.sink_id; let actor_id = self.actor_context.id; let fragment_id = self.actor_context.fragment_id; - let executor_id = self.sink_writer_param.executor_id; let stream_key = self.info.pk_indices.clone(); let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics( @@ -219,7 +217,7 @@ impl SinkExecutor { self.actor_context, ) .instrument_await(format!("consume_log (sink_id {sink_id})")) - .map_ok(|f| match f {}); // unify return type to `Message` + .map_ok(|never| match never {}); // unify return type to `Message` // TODO: may try to remove the boxed select(consume_log_stream.into_stream(), write_log_stream).boxed() @@ -403,7 +401,7 @@ impl SinkExecutor { sink_param: SinkParam, mut sink_writer_param: SinkWriterParam, actor_context: ActorContextRef, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let metrics = sink_writer_param.sink_metrics.clone(); let visible_columns = columns