Skip to content

Commit

Permalink
fix: s3 source cannot read incremental files (#18017)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
Co-authored-by: congyi <[email protected]>
Co-authored-by: congyi wang <[email protected]>
  • Loading branch information
3 people authored Aug 30, 2024
1 parent 655ce2a commit 70c1146
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 92 deletions.
5 changes: 3 additions & 2 deletions e2e_test/s3/fs_parquet_source_and_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def _table():
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}',
refresh.interval.sec = 1,
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
Expand Down Expand Up @@ -160,7 +161,7 @@ def _table():
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/s3/fs_source_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def _encode():
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}',
refresh.interval.sec = 1
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
Expand Down
4 changes: 4 additions & 0 deletions e2e_test/s3/fs_source_v2_new_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _table():
formatted_batch1 = FORMATTER[fmt](data_batch1)
upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0)

# config in do_test that fs source's list interval is 1s
do_test(
config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False
)
Expand All @@ -84,3 +85,6 @@ def _table():
# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx, 0))

if success_flag == False:
exit(1)
23 changes: 23 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod posix_fs_source;
pub mod s3_source;

use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;
pub mod opendal_enumerator;
pub mod opendal_reader;
Expand All @@ -38,6 +39,16 @@ pub const GCS_CONNECTOR: &str = "gcs";
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
pub const POSIX_FS_CONNECTOR: &str = "posix_fs";

pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;

#[serde_as]
#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct FsSourceCommon {
#[serde(rename = "refresh.interval.sec")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub refresh_interval_sec: Option<u64>,
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
Expand All @@ -54,6 +65,9 @@ pub struct GcsProperties {
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub fs_common: FsSourceCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,

Expand Down Expand Up @@ -123,6 +137,9 @@ pub struct OpendalS3Properties {
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,

#[serde(flatten)]
pub fs_common: FsSourceCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
Expand Down Expand Up @@ -151,6 +168,9 @@ pub struct PosixFsProperties {
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub fs_common: FsSourceCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
#[serde(rename = "compression_format", default = "Default::default")]
Expand Down Expand Up @@ -186,6 +206,9 @@ pub struct AzblobProperties {
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub fs_common: FsSourceCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,

Expand Down
62 changes: 38 additions & 24 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use crate::source::filesystem::opendal_source::{
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
DEFAULT_REFRESH_INTERVAL_SEC,
};
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
Expand Down Expand Up @@ -84,26 +85,33 @@ impl SourceReader {

pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
let config = self.config.clone();
let list_interval_sec: u64;
let get_list_interval_sec =
|interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
match config {
ConnectorProperties::Gcs(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let lister: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
ConnectorProperties::OpendalS3(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let lister: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(lister))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
ConnectorProperties::Azblob(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
ConnectorProperties::PosixFs(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let lister: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
other => bail!("Unsupported source: {:?}", other),
}
Expand Down Expand Up @@ -185,29 +193,35 @@ impl SourceReader {
}

#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
async fn build_opendal_fs_list_stream<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list().await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Ok(res) => {
if matcher
.as_ref()
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
continue;
async fn build_opendal_fs_list_stream<Src: OpendalSource>(
lister: OpendalEnumerator<Src>,
list_interval_sec: u64,
) {
loop {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list().await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Ok(res) => {
if matcher
.as_ref()
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
continue;
}
}
Err(err) => {
tracing::error!(error = %err.as_report(), "list object fail");
return Err(err);
}
}
Err(err) => {
tracing::error!(error = %err.as_report(), "list object fail");
return Err(err);
}
}
tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ AzblobProperties:
field_type: String
required: false
default: Default::default
- name: refresh.interval.sec
field_type: u64
required: false
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down Expand Up @@ -69,6 +72,9 @@ GcsProperties:
field_type: String
required: false
default: Default::default
- name: refresh.interval.sec
field_type: u64
required: false
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down Expand Up @@ -819,6 +825,9 @@ OpendalS3Properties:
comments: The following are only supported by `s3_v2` (opendal) source.
required: false
default: Default::default
- name: refresh.interval.sec
field_type: u64
required: false
PosixFsProperties:
fields:
- name: posix_fs.root
Expand All @@ -830,6 +839,9 @@ PosixFsProperties:
comments: The regex pattern to match files under root directory.
required: false
default: Default::default
- name: refresh.interval.sec
field_type: u64
required: false
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down
109 changes: 44 additions & 65 deletions src/stream/src/executor/source/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use super::{barrier_to_message_stream, StreamSourceCore};
use crate::executor::prelude::*;
use crate::executor::stream_reader::StreamReaderWithPause;

const CHUNK_SIZE: usize = 1024;

#[allow(dead_code)]
pub struct FsListExecutor<S: StateStore> {
actor_ctx: ActorContextRef,
Expand Down Expand Up @@ -78,38 +76,28 @@ impl<S: StateStore> FsListExecutor<S> {
.map_err(StreamExecutorError::connector_error)?
.map_err(StreamExecutorError::connector_error);

// Group FsPageItem stream into chunks of size 1024.
let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| {
let rows = chunk
.into_iter()
.map(|item| match item {
Ok(page_item) => Ok((
Op::Insert,
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())),
Some(ScalarImpl::Timestamptz(page_item.timestamp)),
Some(ScalarImpl::Int64(page_item.size)),
]),
)),
Err(e) => {
tracing::error!(error = %e.as_report(), "Connector fail to list item");
Err(e)
}
})
.collect::<Vec<_>>();

let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect();
if res.is_empty() {
tracing::warn!("No items were listed from source.");
return Ok(StreamChunk::default());
let processed_stream = stream.map(|item| match item {
Ok(page_item) => {
let row = (
Op::Insert,
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())),
Some(ScalarImpl::Timestamptz(page_item.timestamp)),
Some(ScalarImpl::Int64(page_item.size)),
]),
);
Ok(StreamChunk::from_rows(
&[row],
&[DataType::Varchar, DataType::Timestamptz, DataType::Int64],
))
}
Err(e) => {
tracing::error!(error = %e.as_report(), "Connector failed to list item");
Err(e)
}
Ok(StreamChunk::from_rows(
&res,
&[DataType::Varchar, DataType::Timestamptz, DataType::Int64],
))
});

Ok(chunked_stream)
Ok(processed_stream)
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand Down Expand Up @@ -150,45 +138,36 @@ impl<S: StateStore> FsListExecutor<S> {

yield Message::Barrier(barrier);

loop {
// a list file stream never ends, keep list to find if there is any new file.
while let Some(msg) = stream.next().await {
match msg {
Err(e) => {
tracing::warn!(error = %e.as_report(), "encountered an error, recovering");
stream
.replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?);
}
Ok(msg) => match msg {
// Barrier arrives.
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => (),
}
while let Some(msg) = stream.next().await {
match msg {
Err(e) => {
tracing::warn!(error = %e.as_report(), "encountered an error, recovering");
stream.replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?);
}
Ok(msg) => match msg {
// Barrier arrives.
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => (),
}

// Propagate the barrier.
yield msg;
}
// Only barrier can be received.
_ => unreachable!(),
},
// Chunked FsPage arrives.
Either::Right(chunk) => {
yield Message::Chunk(chunk);

// Propagate the barrier.
yield msg;
}
// Only barrier can be received.
_ => unreachable!(),
},
}
// Chunked FsPage arrives.
Either::Right(chunk) => {
yield Message::Chunk(chunk);
}
},
}

stream.replace_data_stream(
self.build_chunked_paginate_stream(&source_desc)
.map_err(StreamExecutorError::from)?,
);
}
}
}
Expand Down

0 comments on commit 70c1146

Please sign in to comment.