Skip to content

Commit

Permalink
fix(source): parquet file source use number of rows to determine the …
Browse files Browse the repository at this point in the history
…end of the file reading (#18149)
  • Loading branch information
wcy-fdu authored Aug 29, 2024
1 parent ca99aee commit a5cbeb7
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 21 deletions.
2 changes: 1 addition & 1 deletion e2e_test/error_ui/extended/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ db error: ERROR: Failed to execute the statement
Caused by these errors (recent errors listed first):
1: Expr error
2: error while evaluating expression `general_div('1', '0')`
3: Division by zero
3: Division by zero
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/expr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: Expr error
2: error while evaluating expression `format('Hello', 'World')`
3: Unsupported function: unsupported specifier type 'L'
3: Unsupported function: unsupported specifier type 'L'
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/license.slt
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ SELECT setting FROM pg_settings WHERE name = 'license_key';
query T
SELECT rw_test_paid_tier();
----
t
t
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `maybe` for `rw_implicit_flush`
3: Invalid bool
3: Invalid bool
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/recovery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ from error;
ok

statement ok
drop table t cascade;
drop table t cascade;
90 changes: 89 additions & 1 deletion src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::IntoFuture;
use std::sync::Arc;

use arrow_array_iceberg::RecordBatch;
use deltalake::parquet::arrow::async_reader::AsyncFileReader;
use futures_async_stream::try_stream;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
use risingwave_common::bail;
use risingwave_common::types::{Datum, ScalarImpl};
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::parser::ConnectorResult;
use crate::source::SourceColumnDesc;
use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use crate::source::filesystem::opendal_source::{OpendalGcs, OpendalPosixFs, OpendalS3};
use crate::source::reader::desc::SourceDesc;
use crate::source::{ConnectorProperties, SourceColumnDesc};
/// `ParquetParser` is responsible for converting the incoming `record_batch_stream`
/// into a `streamChunk`.
#[derive(Debug)]
Expand Down Expand Up @@ -188,3 +195,84 @@ impl ParquetParser {
Ok(data_chunk.into())
}
}

/// Retrieves the total number of rows in the specified Parquet file.
///
/// This function constructs an `OpenDAL` operator using the information
/// from the provided `source_desc`. It then accesses the metadata of the
/// Parquet file to determine and return the total row count.
///
/// # Arguments
///
/// * `file_name` - The parquet file name.
/// * `source_desc` - A struct or type containing the necessary information
/// to construct the `OpenDAL` operator.
///
/// # Returns
///
/// Returns the total number of rows in the Parquet file as a `usize`.
pub async fn get_total_row_nums_for_parquet_file(
parquet_file_name: &str,
source_desc: SourceDesc,
) -> ConnectorResult<usize> {
let total_row_num = match source_desc.source.config {
ConnectorProperties::Gcs(prop) => {
let connector: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
let mut reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();

reader
.get_metadata()
.await
.map_err(anyhow::Error::from)?
.file_metadata()
.num_rows()
}
ConnectorProperties::OpendalS3(prop) => {
let connector: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
let mut reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();
reader
.get_metadata()
.await
.map_err(anyhow::Error::from)?
.file_metadata()
.num_rows()
}

ConnectorProperties::PosixFs(prop) => {
let connector: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
let mut reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();
reader
.get_metadata()
.await
.map_err(anyhow::Error::from)?
.file_metadata()
.num_rows()
}
other => bail!("Unsupported source: {:?}", other),
};
Ok(total_row_num as usize)
}
1 change: 1 addition & 0 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl FsSplit {
pub struct OpendalFsSplit<Src: OpendalSource> {
pub name: String,
pub offset: usize,
// For Parquet encoding, the `size` represents the number of rows, while for other encodings, the `size` denotes the file size.
pub size: usize,
_marker: PhantomData<Src>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

#[derive(Debug, Clone)]
pub struct OpendalEnumerator<Src: OpendalSource> {
pub(crate) op: Operator,
pub op: Operator,
// prefix is used to reduce the number of objects to be listed
pub(crate) prefix: Option<String>,
pub(crate) matcher: Option<glob::Pattern>,
Expand Down
64 changes: 50 additions & 14 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use std::marker::PhantomData;
use std::ops::Bound;

use either::Either;
use futures::{stream, TryStreamExt};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::types::ScalarRef;
use risingwave_connector::parser::parquet_parser::get_total_row_nums_for_parquet_file;
use risingwave_connector::parser::EncodingProperties;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
Expand Down Expand Up @@ -305,19 +308,53 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
// Receiving file assignments from upstream list executor,
// store into state table.
Message::Chunk(chunk) => {
let file_assignment = chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();
let size = row.datum_at(2).unwrap().into_int64();
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
size as usize,
// For Parquet encoding, the offset indicates the current row being read.
// Therefore, to determine if the end of a Parquet file has been reached, we need to compare its offset with the total number of rows.
// We directly obtain the total row count and set the size in `OpendalFsSplit` to this value.
let file_assignment = if let EncodingProperties::Parquet =
source_desc.source.parser_config.encoding_config
{
let filename_list: Vec<_> = chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();
filename.to_string()
})
.collect();
let mut parquet_file_assignment = vec![];
for filename in &filename_list {
let total_row_num =
get_total_row_nums_for_parquet_file(
filename,
source_desc.clone(),
)
.await?;
parquet_file_assignment.push(
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
total_row_num - 1, // -1 because offset start from 0.
),
)
})
.collect();
}
parquet_file_assignment
} else {
chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();

let size = row.datum_at(2).unwrap().into_int64();
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
size as usize,
)
})
.collect()
};
state_store_handler.set_states(file_assignment).await?;
state_store_handler.state_table.try_flush().await?;
}
Expand All @@ -343,7 +380,6 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
}
_ => unreachable!(),
};

if offset.parse::<usize>().unwrap() >= fs_split.size {
splits_on_fetch -= 1;
state_store_handler.delete(split_id).await?;
Expand Down

0 comments on commit a5cbeb7

Please sign in to comment.