Skip to content

Commit

Permalink
replace infallible with never
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jun 13, 2024
1 parent 40eb907 commit 6b33f55
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#![feature(negative_impls)]
#![feature(register_tool)]
#![feature(assert_matches)]
#![feature(never_type)]
#![register_tool(rw)]
#![recursion_limit = "256"]

Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;
use std::num::NonZeroU64;
use std::time::Instant;

Expand Down Expand Up @@ -49,7 +48,7 @@ impl<W> DecoupleCheckpointLogSinkerOf<W> {

#[async_trait]
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<Infallible> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
#[derive(Debug)]
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub mod utils;
pub mod writer;

use std::collections::BTreeMap;
use std::convert::Infallible;
use std::future::Future;

use ::clickhouse::error::Error as ClickHouseError;
Expand Down Expand Up @@ -387,7 +386,7 @@ impl<R: LogReader> SinkLogReader for R {

#[async_trait]
pub trait LogSinker: 'static {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<Infallible>;
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!>;
}

#[async_trait]
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::{BTreeMap, VecDeque};
use std::convert::Infallible;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::pin;
Expand Down Expand Up @@ -302,7 +301,7 @@ impl RemoteLogSinker {

#[async_trait]
impl LogSinker for RemoteLogSinker {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<Infallible> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut request_tx = self.request_sender;
let mut response_err_stream_rx = self.response_stream;
let sink_metrics = self.sink_metrics;
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/trivial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;
use std::marker::PhantomData;

use async_trait::async_trait;
Expand Down Expand Up @@ -76,7 +75,7 @@ impl<T: TrivialSinkName> Sink for TrivialSink<T> {

#[async_trait]
impl<T: TrivialSinkName> LogSinker for TrivialSink<T> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<Infallible> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
loop {
let (epoch, item) = log_reader.next_item().await?;
match item {
Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/sink/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;
use std::future::{Future, Ready};
use std::pin::pin;
use std::sync::Arc;
Expand Down Expand Up @@ -127,7 +126,7 @@ impl<W> LogSinkerOf<W> {

#[async_trait]
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<Infallible> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
#[derive(Debug)]
Expand Down Expand Up @@ -243,10 +242,7 @@ impl<W: AsyncTruncateSinkWriter> AsyncTruncateLogSinkerOf<W> {

#[async_trait]
impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
async fn consume_log_and_sink(
mut self,
log_reader: &mut impl SinkLogReader,
) -> Result<Infallible> {
async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result<!> {
loop {
let select_result = drop_either_future(
select(
Expand Down
6 changes: 2 additions & 4 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;
use std::mem;

use anyhow::anyhow;
Expand Down Expand Up @@ -126,7 +125,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let sink_id = self.sink_param.sink_id;
let actor_id = self.actor_context.id;
let fragment_id = self.actor_context.fragment_id;
let executor_id = self.sink_writer_param.executor_id;

let stream_key = self.info.pk_indices.clone();
let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
Expand Down Expand Up @@ -219,7 +217,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
self.actor_context,
)
.instrument_await(format!("consume_log (sink_id {sink_id})"))
.map_ok(|f| match f {}); // unify return type to `Message`
.map_ok(|never| match never {}); // unify return type to `Message`

// TODO: may try to remove the boxed
select(consume_log_stream.into_stream(), write_log_stream).boxed()
Expand Down Expand Up @@ -403,7 +401,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
sink_param: SinkParam,
mut sink_writer_param: SinkWriterParam,
actor_context: ActorContextRef,
) -> StreamExecutorResult<Infallible> {
) -> StreamExecutorResult<!> {
let metrics = sink_writer_param.sink_metrics.clone();

let visible_columns = columns
Expand Down

0 comments on commit 6b33f55

Please sign in to comment.