diff --git a/e2e_test/error_ui/extended/main.slt b/e2e_test/error_ui/extended/main.slt index 6b9be9d26b14f..63253cad85d3a 100644 --- a/e2e_test/error_ui/extended/main.slt +++ b/e2e_test/error_ui/extended/main.slt @@ -17,4 +17,4 @@ db error: ERROR: Failed to execute the statement Caused by these errors (recent errors listed first): 1: Expr error 2: error while evaluating expression `general_div('1', '0')` - 3: Division by zero + 3: Division by zero \ No newline at end of file diff --git a/e2e_test/error_ui/simple/expr.slt b/e2e_test/error_ui/simple/expr.slt index b983baae99a01..cb8cb7a89e857 100644 --- a/e2e_test/error_ui/simple/expr.slt +++ b/e2e_test/error_ui/simple/expr.slt @@ -68,4 +68,4 @@ db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): 1: Expr error 2: error while evaluating expression `format('Hello', 'World')` - 3: Unsupported function: unsupported specifier type 'L' + 3: Unsupported function: unsupported specifier type 'L' \ No newline at end of file diff --git a/e2e_test/error_ui/simple/license.slt b/e2e_test/error_ui/simple/license.slt index fa04699efe380..7482f30938f10 100644 --- a/e2e_test/error_ui/simple/license.slt +++ b/e2e_test/error_ui/simple/license.slt @@ -74,4 +74,4 @@ SELECT setting FROM pg_settings WHERE name = 'license_key'; query T SELECT rw_test_paid_tier(); ---- -t +t \ No newline at end of file diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index a59b8acfa2e88..7c177e9ffc86b 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -73,4 +73,4 @@ db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): 1: Failed to get/set session config 2: Invalid value `maybe` for `rw_implicit_flush` - 3: Invalid bool + 3: Invalid bool \ No newline at end of file diff --git a/e2e_test/error_ui/simple/recovery.slt b/e2e_test/error_ui/simple/recovery.slt index 526e9d2f10227..7918fc9369210 100644 --- a/e2e_test/error_ui/simple/recovery.slt +++ b/e2e_test/error_ui/simple/recovery.slt @@ -33,4 +33,4 @@ from error; ok statement ok -drop table t cascade; +drop table t cascade; \ No newline at end of file diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index eeefaaf2d82b5..4f1e720bc47fb 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -11,16 +11,23 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::future::IntoFuture; use std::sync::Arc; use arrow_array_iceberg::RecordBatch; +use deltalake::parquet::arrow::async_reader::AsyncFileReader; use futures_async_stream::try_stream; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; +use risingwave_common::bail; use risingwave_common::types::{Datum, ScalarImpl}; +use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use crate::parser::ConnectorResult; -use crate::source::SourceColumnDesc; +use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; +use crate::source::filesystem::opendal_source::{OpendalGcs, OpendalPosixFs, OpendalS3}; +use crate::source::reader::desc::SourceDesc; +use crate::source::{ConnectorProperties, SourceColumnDesc}; /// `ParquetParser` is responsible for converting the incoming `record_batch_stream` /// into a `streamChunk`. #[derive(Debug)] @@ -188,3 +195,84 @@ impl ParquetParser { Ok(data_chunk.into()) } } + +/// Retrieves the total number of rows in the specified Parquet file. +/// +/// This function constructs an `OpenDAL` operator using the information +/// from the provided `source_desc`. It then accesses the metadata of the +/// Parquet file to determine and return the total row count. +/// +/// # Arguments +/// +/// * `file_name` - The parquet file name. +/// * `source_desc` - A struct or type containing the necessary information +/// to construct the `OpenDAL` operator. +/// +/// # Returns +/// +/// Returns the total number of rows in the Parquet file as a `usize`. +pub async fn get_total_row_nums_for_parquet_file( + parquet_file_name: &str, + source_desc: SourceDesc, +) -> ConnectorResult { + let total_row_num = match source_desc.source.config { + ConnectorProperties::Gcs(prop) => { + let connector: OpendalEnumerator = + OpendalEnumerator::new_gcs_source(*prop)?; + let mut reader = connector + .op + .reader_with(parquet_file_name) + .into_future() + .await? + .into_futures_async_read(..) + .await? + .compat(); + + reader + .get_metadata() + .await + .map_err(anyhow::Error::from)? + .file_metadata() + .num_rows() + } + ConnectorProperties::OpendalS3(prop) => { + let connector: OpendalEnumerator = + OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; + let mut reader = connector + .op + .reader_with(parquet_file_name) + .into_future() + .await? + .into_futures_async_read(..) + .await? + .compat(); + reader + .get_metadata() + .await + .map_err(anyhow::Error::from)? + .file_metadata() + .num_rows() + } + + ConnectorProperties::PosixFs(prop) => { + let connector: OpendalEnumerator = + OpendalEnumerator::new_posix_fs_source(*prop)?; + let mut reader = connector + .op + .reader_with(parquet_file_name) + .into_future() + .await? + .into_futures_async_read(..) + .await? + .compat(); + reader + .get_metadata() + .await + .map_err(anyhow::Error::from)? + .file_metadata() + .num_rows() + } + other => bail!("Unsupported source: {:?}", other), + }; + Ok(total_row_num as usize) +} diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index e012fc9ce1e1b..2cb66b7825ac1 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -79,6 +79,7 @@ impl FsSplit { pub struct OpendalFsSplit { pub name: String, pub offset: usize, + // For Parquet encoding, the `size` represents the number of rows, while for other encodings, the `size` denotes the file size. pub size: usize, _marker: PhantomData, } diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index c6d0fe4af0c2c..7396eac2ea38e 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -30,7 +30,7 @@ use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; #[derive(Debug, Clone)] pub struct OpendalEnumerator { - pub(crate) op: Operator, + pub op: Operator, // prefix is used to reduce the number of objects to be listed pub(crate) prefix: Option, pub(crate) matcher: Option, diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 1576d65487d09..766c42a5e2c87 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -16,10 +16,13 @@ use std::marker::PhantomData; use std::ops::Bound; use either::Either; -use futures::{stream, TryStreamExt}; +use futures::{stream, StreamExt, TryStreamExt}; +use futures_async_stream::try_stream; use risingwave_common::catalog::{ColumnId, TableId}; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::types::ScalarRef; +use risingwave_connector::parser::parquet_parser::get_total_row_nums_for_parquet_file; +use risingwave_connector::parser::EncodingProperties; use risingwave_connector::source::filesystem::opendal_source::{ OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; @@ -305,19 +308,53 @@ impl FsFetchExecutor { // Receiving file assignments from upstream list executor, // store into state table. Message::Chunk(chunk) => { - let file_assignment = chunk - .data_chunk() - .rows() - .map(|row| { - let filename = row.datum_at(0).unwrap().into_utf8(); - let size = row.datum_at(2).unwrap().into_int64(); - OpendalFsSplit::::new( - filename.to_owned(), - 0, - size as usize, + // For Parquet encoding, the offset indicates the current row being read. + // Therefore, to determine if the end of a Parquet file has been reached, we need to compare its offset with the total number of rows. + // We directly obtain the total row count and set the size in `OpendalFsSplit` to this value. + let file_assignment = if let EncodingProperties::Parquet = + source_desc.source.parser_config.encoding_config + { + let filename_list: Vec<_> = chunk + .data_chunk() + .rows() + .map(|row| { + let filename = row.datum_at(0).unwrap().into_utf8(); + filename.to_string() + }) + .collect(); + let mut parquet_file_assignment = vec![]; + for filename in &filename_list { + let total_row_num = + get_total_row_nums_for_parquet_file( + filename, + source_desc.clone(), + ) + .await?; + parquet_file_assignment.push( + OpendalFsSplit::::new( + filename.to_owned(), + 0, + total_row_num - 1, // -1 because offset start from 0. + ), ) - }) - .collect(); + } + parquet_file_assignment + } else { + chunk + .data_chunk() + .rows() + .map(|row| { + let filename = row.datum_at(0).unwrap().into_utf8(); + + let size = row.datum_at(2).unwrap().into_int64(); + OpendalFsSplit::::new( + filename.to_owned(), + 0, + size as usize, + ) + }) + .collect() + }; state_store_handler.set_states(file_assignment).await?; state_store_handler.state_table.try_flush().await?; } @@ -343,7 +380,6 @@ impl FsFetchExecutor { } _ => unreachable!(), }; - if offset.parse::().unwrap() >= fs_split.size { splits_on_fetch -= 1; state_store_handler.delete(split_id).await?;