diff --git a/e2e_test/s3/fs_parquet_source_and_sink.py b/e2e_test/s3/fs_parquet_source_and_sink.py new file mode 100644 index 0000000000000..3ae00d3fcee15 --- /dev/null +++ b/e2e_test/s3/fs_parquet_source_and_sink.py @@ -0,0 +1,243 @@ +import os +import sys +import random +import psycopg2 +import json +import pyarrow as pa +import pyarrow.parquet as pq +import pandas as pd +from datetime import datetime, timezone +from time import sleep +from minio import Minio +from random import uniform + +def gen_data(file_num, item_num_per_file): + assert item_num_per_file % 2 == 0, \ + f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + return [ + [{ + 'id': file_id * item_num_per_file + item_id, + 'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + 'test_int': pa.scalar(1, type=pa.int32()), + 'test_real': pa.scalar(4.0, type=pa.float32()), + 'test_double_precision': pa.scalar(5.0, type=pa.float64()), + 'test_varchar': pa.scalar('7', type=pa.string()), + 'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()), + 'test_date': pa.scalar(datetime.now().date(), type=pa.date32()), + 'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')), + 'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), + 'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')), + } for item_id in range(item_num_per_file)] + for file_id in range(file_num) + ] + +def do_test(config, file_num, item_num_per_file, prefix): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return 's3_test_parquet' + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE {_table()}( + id bigint primary key, + name TEXT, + sex bigint, + mark bigint, + test_int int, + test_real real, + test_double_precision double precision, + test_varchar varchar, + test_bytea bytea, + test_date date, + test_time time, + test_timestamp timestamp, + test_timestamptz timestamptz, + ) WITH ( + connector = 's3', + match_pattern = '*.parquet', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', + refresh.interval.sec = 1, + ) FORMAT PLAIN ENCODE PARQUET;''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_table()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") + sleep(10) + + stmt = f'select count(*), sum(id) from {_table()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + + print('File source test pass!') + + cur.close() + conn.close() + +def do_sink(config, file_num, item_num_per_file, prefix): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return 's3_test_parquet' + + # Execute a SELECT statement + cur.execute(f'''CREATE sink test_file_sink as select + id, + name, + sex, + mark, + test_int, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp, + test_timestamptz + from {_table()} WITH ( + connector = 's3', + match_pattern = '*.parquet', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + s3.path = '', + s3.file_type = 'parquet', + type = 'append-only', + force_append_only='true' + ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') + + print('Sink into s3...') + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE test_sink_table( + id bigint primary key, + name TEXT, + sex bigint, + mark bigint, + test_int int, + test_real real, + test_double_precision double precision, + test_varchar varchar, + test_bytea bytea, + test_date date, + test_time time, + test_timestamp timestamp, + test_timestamptz timestamptz, + ) WITH ( + connector = 's3', + match_pattern = '*.parquet', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) FORMAT PLAIN ENCODE PARQUET;''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from test_sink_table') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") + sleep(10) + + stmt = f'select count(*), sum(id) from test_sink_table' + print(f'Execute reading sink files: {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + + print('File sink test pass!') + cur.execute(f'drop sink test_file_sink') + cur.execute(f'drop table test_sink_table') + cur.close() + conn.close() + + + +if __name__ == "__main__": + FILE_NUM = 10 + ITEM_NUM_PER_FILE = 2000 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + + config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) + client = Minio( + config["S3_ENDPOINT"], + access_key=config["S3_ACCESS_KEY"], + secret_key=config["S3_SECRET_KEY"], + secure=True, + ) + run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.parquet' + _s3 = lambda idx: f"{run_id}_data_{idx}.parquet" + + # put s3 files + for idx, file_data in enumerate(data): + table = pa.Table.from_pandas(pd.DataFrame(file_data)) + pq.write_table(table, _local(idx)) + + client.fput_object( + config["S3_BUCKET"], + _s3(idx), + _local(idx) + ) + + # do test + do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) + + # clean up s3 files + for idx, _ in enumerate(data): + client.remove_object(config["S3_BUCKET"], _s3(idx)) + + do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) + + # clean up s3 files + for idx, _ in enumerate(data): + client.remove_object(config["S3_BUCKET"], _s3(idx)) + diff --git a/e2e_test/s3/fs_source_v2.py b/e2e_test/s3/fs_source_v2.py index 203524a74b557..ffc88e8e6aa97 100644 --- a/e2e_test/s3/fs_source_v2.py +++ b/e2e_test/s3/fs_source_v2.py @@ -75,7 +75,8 @@ def _encode(): s3.bucket_name = '{config['S3_BUCKET']}', s3.credentials.access = '{config['S3_ACCESS_KEY']}', s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', + refresh.interval.sec = 1 ) FORMAT PLAIN ENCODE {_encode()};''') total_rows = file_num * item_num_per_file diff --git a/e2e_test/s3/fs_source_v2_new_file.py b/e2e_test/s3/fs_source_v2_new_file.py index a7ae53f3a37dd..c90103e15c127 100644 --- a/e2e_test/s3/fs_source_v2_new_file.py +++ b/e2e_test/s3/fs_source_v2_new_file.py @@ -67,6 +67,7 @@ def _table(): formatted_batch1 = FORMATTER[fmt](data_batch1) upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0) + # config in do_test that fs source's list interval is 1s do_test( config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False ) @@ -84,3 +85,6 @@ def _table(): # clean up s3 files for idx, _ in enumerate(data): client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) + + if success_flag == False: + exit(1) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index a41876543419b..ff5099b60103e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -109,7 +109,7 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } +redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp", "cluster-async"] } regex = "1.4" reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } @@ -125,7 +125,11 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] } +sea-schema = { version = "0.14", default-features = false, features = [ + "discovery", + "sqlx-postgres", + "sqlx-mysql", +] } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index a9689a921d7f0..5a530df5b8702 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -19,6 +19,7 @@ pub mod posix_fs_source; pub mod s3_source; use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; pub mod opendal_enumerator; pub mod opendal_reader; @@ -35,6 +36,16 @@ pub const GCS_CONNECTOR: &str = "gcs"; pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; pub const POSIX_FS_CONNECTOR: &str = "posix_fs"; +pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60; + +#[serde_as] +#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] +pub struct FsSourceCommon { + #[serde(rename = "refresh.interval.sec")] + #[serde_as(as = "Option")] + pub refresh_interval_sec: Option, +} + #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] pub struct GcsProperties { #[serde(rename = "gcs.bucket_name")] @@ -51,6 +62,9 @@ pub struct GcsProperties { #[serde(rename = "match_pattern", default)] pub match_pattern: Option, + #[serde(flatten)] + pub fs_common: FsSourceCommon, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -117,6 +131,9 @@ pub struct OpendalS3Properties { #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, + #[serde(flatten)] + pub fs_common: FsSourceCommon, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -145,6 +162,9 @@ pub struct PosixFsProperties { #[serde(rename = "match_pattern", default)] pub match_pattern: Option, + #[serde(flatten)] + pub fs_common: FsSourceCommon, + #[serde(flatten)] pub unknown_fields: HashMap, } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 02012841c5a48..56c670bc9c010 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -31,7 +31,7 @@ use crate::error::ConnectorResult; use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use crate::source::filesystem::opendal_source::{ - OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, + OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, DEFAULT_REFRESH_INTERVAL_SEC, }; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ @@ -85,21 +85,27 @@ impl SourceReader { pub fn get_source_list(&self) -> ConnectorResult> { let config = self.config.clone(); + let list_interval_sec: u64; + let get_list_interval_sec = + |interval: Option| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) }; match config { ConnectorProperties::Gcs(prop) => { + list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); let lister: OpendalEnumerator = OpendalEnumerator::new_gcs_source(*prop)?; - Ok(build_opendal_fs_list_stream(lister)) + Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) } ConnectorProperties::OpendalS3(prop) => { + list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); let lister: OpendalEnumerator = OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; - Ok(build_opendal_fs_list_stream(lister)) + Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) } ConnectorProperties::PosixFs(prop) => { + list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); let lister: OpendalEnumerator = OpendalEnumerator::new_posix_fs_source(*prop)?; - Ok(build_opendal_fs_list_stream(lister)) + Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) } other => bail!("Unsupported source: {:?}", other), } @@ -183,29 +189,35 @@ impl SourceReader { } #[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)] -async fn build_opendal_fs_list_stream(lister: OpendalEnumerator) { - let matcher = lister.get_matcher(); - let mut object_metadata_iter = lister.list().await?; - - while let Some(list_res) = object_metadata_iter.next().await { - match list_res { - Ok(res) => { - if matcher - .as_ref() - .map(|m| m.matches(&res.name)) - .unwrap_or(true) - { - yield res - } else { - // Currrntly due to the lack of prefix list, we just skip the unmatched files. - continue; +async fn build_opendal_fs_list_stream( + lister: OpendalEnumerator, + list_interval_sec: u64, +) { + loop { + let matcher = lister.get_matcher(); + let mut object_metadata_iter = lister.list().await?; + + while let Some(list_res) = object_metadata_iter.next().await { + match list_res { + Ok(res) => { + if matcher + .as_ref() + .map(|m| m.matches(&res.name)) + .unwrap_or(true) + { + yield res + } else { + // Currrntly due to the lack of prefix list, we just skip the unmatched files. + continue; + } + } + Err(err) => { + tracing::error!(error = %err.as_report(), "list object fail"); + return Err(err); } - } - Err(err) => { - tracing::error!(error = %err.as_report(), "list object fail"); - return Err(err); } } + tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await; } } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 822ab25ea3ef7..c6276cc030fdc 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -33,6 +33,9 @@ GcsProperties: field_type: String required: false default: Default::default + - name: refresh.interval.sec + field_type: u64 + required: false IcebergProperties: fields: - name: catalog.type @@ -614,6 +617,9 @@ OpendalS3Properties: comments: The following are only supported by `s3_v2` (opendal) source. required: false default: Default::default + - name: refresh.interval.sec + field_type: u64 + required: false PosixFsProperties: fields: - name: posix_fs.root @@ -625,6 +631,9 @@ PosixFsProperties: comments: The regex pattern to match files under root directory. required: false default: Default::default + - name: refresh.interval.sec + field_type: u64 + required: false PubsubProperties: fields: - name: pubsub.subscription diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 25b32c0a0e4b8..cfba8b8d29dc5 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -26,8 +26,6 @@ use super::{barrier_to_message_stream, StreamSourceCore}; use crate::executor::prelude::*; use crate::executor::stream_reader::StreamReaderWithPause; -const CHUNK_SIZE: usize = 1024; - #[allow(dead_code)] pub struct FsListExecutor { actor_ctx: ActorContextRef, @@ -78,34 +76,28 @@ impl FsListExecutor { .map_err(StreamExecutorError::connector_error)? .map_err(StreamExecutorError::connector_error); - // Group FsPageItem stream into chunks of size 1024. - let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| { - let rows = chunk - .into_iter() - .map(|item| match item { - Ok(page_item) => Ok(( - Op::Insert, - OwnedRow::new(vec![ - Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())), - Some(ScalarImpl::Timestamptz(page_item.timestamp)), - Some(ScalarImpl::Int64(page_item.size)), - ]), - )), - Err(e) => { - tracing::error!(error = %e.as_report(), "Connector fail to list item"); - Err(e) - } - }) - .collect::>(); - - let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect(); - Ok(StreamChunk::from_rows( - &res, - &[DataType::Varchar, DataType::Timestamptz, DataType::Int64], - )) + let processed_stream = stream.map(|item| match item { + Ok(page_item) => { + let row = ( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())), + Some(ScalarImpl::Timestamptz(page_item.timestamp)), + Some(ScalarImpl::Int64(page_item.size)), + ]), + ); + Ok(StreamChunk::from_rows( + &[row], + &[DataType::Varchar, DataType::Timestamptz, DataType::Int64], + )) + } + Err(e) => { + tracing::error!(error = %e.as_report(), "Connector failed to list item"); + Err(e) + } }); - Ok(chunked_stream) + Ok(processed_stream) } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -146,45 +138,36 @@ impl FsListExecutor { yield Message::Barrier(barrier); - loop { - // a list file stream never ends, keep list to find if there is any new file. - while let Some(msg) = stream.next().await { - match msg { - Err(e) => { - tracing::warn!(error = %e.as_report(), "encountered an error, recovering"); - stream - .replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?); - } - Ok(msg) => match msg { - // Barrier arrives. - Either::Left(msg) => match &msg { - Message::Barrier(barrier) => { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => stream.pause_stream(), - Mutation::Resume => stream.resume_stream(), - _ => (), - } + while let Some(msg) = stream.next().await { + match msg { + Err(e) => { + tracing::warn!(error = %e.as_report(), "encountered an error, recovering"); + stream.replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?); + } + Ok(msg) => match msg { + // Barrier arrives. + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + _ => (), } - - // Propagate the barrier. - yield msg; } - // Only barrier can be received. - _ => unreachable!(), - }, - // Chunked FsPage arrives. - Either::Right(chunk) => { - yield Message::Chunk(chunk); + + // Propagate the barrier. + yield msg; } + // Only barrier can be received. + _ => unreachable!(), }, - } + // Chunked FsPage arrives. + Either::Right(chunk) => { + yield Message::Chunk(chunk); + } + }, } - - stream.replace_data_stream( - self.build_chunked_paginate_stream(&source_desc) - .map_err(StreamExecutorError::from)?, - ); } } }