Skip to content

Commit

Permalink
fix: list one page in fs source executor (#20292) (#20299)
Browse files Browse the repository at this point in the history
Co-authored-by: Bohan Zhang <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
3 people authored Jan 24, 2025
1 parent 35df310 commit ce56921
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::{self, BoxStream};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use opendal::{Metakey, Operator};
use risingwave_common::types::Timestamptz;

Expand Down Expand Up @@ -54,7 +54,9 @@ impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

match self.op.list(prefix).await {
let mut lister = self.op.lister(prefix).await?;
// fetch one item as validation, no need to get all
match lister.try_next().await {
Ok(_) => return Ok(vec![empty_split]),
Err(e) => {
return Err(anyhow!(e)
Expand Down
17 changes: 8 additions & 9 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,14 @@ impl SplitEnumerator for S3SplitEnumerator {
}

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
let mut objects = Vec::new();
loop {
let (files, has_finished) = self.get_next_page::<FsSplit>().await?;
objects.extend(files);
if has_finished {
break;
}
}
Ok(objects)
// fetch one page as validation, no need to get all pages
let (_, _) = self.get_next_page::<FsSplit>().await?;

Ok(vec![FsSplit {
name: "empty_split".to_owned(),
offset: 0,
size: 0,
}])
}
}

Expand Down

0 comments on commit ce56921

Please sign in to comment.