Skip to content

Commit

Permalink
refactor(source): remove into_fs and as_fs methods from `SplitImp…
Browse files Browse the repository at this point in the history
…l` (#16210)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 9, 2024
1 parent 7cb2b3c commit e4456fa
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 25 deletions.
19 changes: 0 additions & 19 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use serde::de::DeserializeOwned;

use super::cdc::DebeziumCdcMeta;
use super::datagen::DatagenMeta;
use super::filesystem::FsSplit;
use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::kinesis::KinesisMeta;
Expand Down Expand Up @@ -416,24 +415,6 @@ impl TryFrom<&ConnectorSplit> for SplitImpl {
}
}

// for the `FsSourceExecutor`
impl SplitImpl {
#[allow(clippy::result_unit_err)]
pub fn into_fs(self) -> Result<FsSplit, ()> {
match self {
Self::S3(split) => Ok(split),
_ => Err(()),
}
}

pub fn as_fs(&self) -> Option<&FsSplit> {
match self {
Self::S3(split) => Some(split),
_ => None,
}
}
}

impl SplitImpl {
fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
match_source_name_str!(
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
for sc in rhs {
if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) {
let fs = s
.as_fs()
.as_s3()
.unwrap_or_else(|| panic!("split {:?} is not fs", s));
// unfinished this epoch
if fs.offset < fs.size {
Expand Down Expand Up @@ -214,7 +214,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.values()
.filter(|split| {
let fs = split
.as_fs()
.as_s3()
.unwrap_or_else(|| panic!("split {:?} is not fs", split));
fs.offset < fs.size
})
Expand All @@ -226,7 +226,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.values()
.filter(|split| {
let fs = split
.as_fs()
.as_s3()
.unwrap_or_else(|| panic!("split {:?} is not fs", split));
fs.offset == fs.size
})
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
.map_err(StreamExecutorError::from)
}

// this method should only be used by `FsSourceExecutor
/// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor)
pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult<HashSet<SplitId>> {
let start = Bound::Excluded(row::once(Some(Self::string_to_scalar(
COMPLETE_SPLIT_PREFIX,
Expand All @@ -105,7 +105,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
if let Some(ScalarRefImpl::Jsonb(jsonb_ref)) = row.datum_at(1) {
let split = SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?;
let fs = split
.as_fs()
.as_s3()
.unwrap_or_else(|| panic!("split {:?} is not fs", split));
if fs.offset == fs.size {
let split_id = split.id();
Expand Down Expand Up @@ -133,7 +133,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
}

/// set all complete
/// can only used by `FsSourceExecutor`
/// can only used by [`FsSourceExecutor`](super::FsSourceExecutor)
pub(crate) async fn set_all_complete(
&mut self,
states: Vec<SplitImpl>,
Expand Down

0 comments on commit e4456fa

Please sign in to comment.