diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 62bb98241b59e..b3c8c4010a9c7 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -34,7 +34,6 @@ use serde::de::DeserializeOwned; use super::cdc::DebeziumCdcMeta; use super::datagen::DatagenMeta; -use super::filesystem::FsSplit; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::kinesis::KinesisMeta; @@ -416,24 +415,6 @@ impl TryFrom<&ConnectorSplit> for SplitImpl { } } -// for the `FsSourceExecutor` -impl SplitImpl { - #[allow(clippy::result_unit_err)] - pub fn into_fs(self) -> Result { - match self { - Self::S3(split) => Ok(split), - _ => Err(()), - } - } - - pub fn as_fs(&self) -> Option<&FsSplit> { - match self { - Self::S3(split) => Some(split), - _ => None, - } - } -} - impl SplitImpl { fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result { match_source_name_str!( diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index d073863b50d44..3e53702e720f2 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -146,7 +146,7 @@ impl FsSourceExecutor { for sc in rhs { if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) { let fs = s - .as_fs() + .as_s3() .unwrap_or_else(|| panic!("split {:?} is not fs", s)); // unfinished this epoch if fs.offset < fs.size { @@ -214,7 +214,7 @@ impl FsSourceExecutor { .values() .filter(|split| { let fs = split - .as_fs() + .as_s3() .unwrap_or_else(|| panic!("split {:?} is not fs", split)); fs.offset < fs.size }) @@ -226,7 +226,7 @@ impl FsSourceExecutor { .values() .filter(|split| { let fs = split - .as_fs() + .as_s3() .unwrap_or_else(|| panic!("split {:?} is not fs", split)); fs.offset == fs.size }) diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index c9d967ca56c8f..dbead349822e0 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -82,7 +82,7 @@ impl SourceStateTableHandler { .map_err(StreamExecutorError::from) } - // this method should only be used by `FsSourceExecutor + /// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor) pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult> { let start = Bound::Excluded(row::once(Some(Self::string_to_scalar( COMPLETE_SPLIT_PREFIX, @@ -105,7 +105,7 @@ impl SourceStateTableHandler { if let Some(ScalarRefImpl::Jsonb(jsonb_ref)) = row.datum_at(1) { let split = SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?; let fs = split - .as_fs() + .as_s3() .unwrap_or_else(|| panic!("split {:?} is not fs", split)); if fs.offset == fs.size { let split_id = split.id(); @@ -133,7 +133,7 @@ impl SourceStateTableHandler { } /// set all complete - /// can only used by `FsSourceExecutor` + /// can only used by [`FsSourceExecutor`](super::FsSourceExecutor) pub(crate) async fn set_all_complete( &mut self, states: Vec,