Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): parquet file source use number of rows to determine the end of the file reading #18149

Merged
merged 14 commits into from
Aug 29, 2024
2 changes: 1 addition & 1 deletion e2e_test/error_ui/extended/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ db error: ERROR: Failed to execute the statement
Caused by these errors (recent errors listed first):
1: Expr error
2: error while evaluating expression `general_div('1', '0')`
3: Division by zero
3: Division by zero
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/expr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: Expr error
2: error while evaluating expression `format('Hello', 'World')`
3: Unsupported function: unsupported specifier type 'L'
3: Unsupported function: unsupported specifier type 'L'
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/license.slt
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ SELECT setting FROM pg_settings WHERE name = 'license_key';
query T
SELECT rw_test_paid_tier();
----
t
t
15 changes: 2 additions & 13 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,8 @@ Caused by these errors (recent errors listed first):
2: invalid IPv4 address


statement error
statement error failed to send requests to UDF service
create function int_42() returns int as int_42 using link '55.55.55.55:5555';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: failed to check UDF signature
2: failed to send requests to UDF service
3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} }
4: transport error
5: error trying to connect
6: tcp connect error
7: deadline has elapsed


statement error
Expand Down Expand Up @@ -84,4 +73,4 @@ db error: ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `maybe` for `rw_implicit_flush`
3: Invalid bool
3: Invalid bool
2 changes: 1 addition & 1 deletion e2e_test/error_ui/simple/recovery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ from error;
ok

statement ok
drop table t cascade;
drop table t cascade;
90 changes: 89 additions & 1 deletion src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::IntoFuture;
use std::sync::Arc;

use arrow_array_iceberg::RecordBatch;
use deltalake::parquet::arrow::async_reader::AsyncFileReader;
use futures_async_stream::try_stream;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
use risingwave_common::bail;
use risingwave_common::types::{Datum, ScalarImpl};
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::parser::ConnectorResult;
use crate::source::SourceColumnDesc;
use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use crate::source::filesystem::opendal_source::{OpendalGcs, OpendalPosixFs, OpendalS3};
use crate::source::reader::desc::SourceDesc;
use crate::source::{ConnectorProperties, SourceColumnDesc};
/// `ParquetParser` is responsible for converting the incoming `record_batch_stream`
/// into a `streamChunk`.
#[derive(Debug)]
Expand Down Expand Up @@ -188,3 +195,84 @@ impl ParquetParser {
Ok(data_chunk.into())
}
}

/// Retrieves the total number of rows in the specified Parquet file.
///
/// This function constructs an `OpenDAL` operator using the information
/// from the provided `source_desc`. It then accesses the metadata of the
/// Parquet file to determine and return the total row count.
///
/// # Arguments
///
/// * `file_name` - The parquet file name.
/// * `source_desc` - A struct or type containing the necessary information
/// to construct the `OpenDAL` operator.
///
/// # Returns
///
/// Returns the total number of rows in the Parquet file as a `usize`.
pub async fn get_total_row_nums_for_parquet_file(
parquet_file_name: &str,
source_desc: SourceDesc,
) -> ConnectorResult<usize> {
let total_row_num = match source_desc.source.config {
ConnectorProperties::Gcs(prop) => {
let connector: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
let mut reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();

reader
.get_metadata()
.await
.map_err(anyhow::Error::from)?
.file_metadata()
.num_rows()
}
ConnectorProperties::OpendalS3(prop) => {
let connector: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
let mut reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();
reader
.get_metadata()
.await
.map_err(anyhow::Error::from)?
.file_metadata()
.num_rows()
}

ConnectorProperties::PosixFs(prop) => {
let connector: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
let mut reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();
reader
.get_metadata()
.await
.map_err(anyhow::Error::from)?
.file_metadata()
.num_rows()
}
other => bail!("Unsupported source: {:?}", other),
};
Ok(total_row_num as usize)
}
1 change: 1 addition & 0 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl FsSplit {
pub struct OpendalFsSplit<Src: OpendalSource> {
pub name: String,
pub offset: usize,
// For Parquet encoding, the `size` represents the number of rows, while for other encodings, the `size` denotes the file size.
pub size: usize,
_marker: PhantomData<Src>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

#[derive(Debug, Clone)]
pub struct OpendalEnumerator<Src: OpendalSource> {
pub(crate) op: Operator,
pub op: Operator,
// prefix is used to reduce the number of objects to be listed
pub(crate) prefix: Option<String>,
pub(crate) matcher: Option<glob::Pattern>,
Expand Down
64 changes: 50 additions & 14 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use std::marker::PhantomData;
use std::ops::Bound;

use either::Either;
use futures::{stream, TryStreamExt};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::types::ScalarRef;
use risingwave_connector::parser::parquet_parser::get_total_row_nums_for_parquet_file;
use risingwave_connector::parser::EncodingProperties;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
Expand Down Expand Up @@ -299,19 +302,53 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
// Receiving file assignments from upstream list executor,
// store into state table.
Message::Chunk(chunk) => {
let file_assignment = chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();
let size = row.datum_at(2).unwrap().into_int64();
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
size as usize,
// For Parquet encoding, the offset indicates the current row being read.
// Therefore, to determine if the end of a Parquet file has been reached, we need to compare its offset with the total number of rows.
// We directly obtain the total row count and set the size in `OpendalFsSplit` to this value.
let file_assignment = if let EncodingProperties::Parquet =
source_desc.source.parser_config.encoding_config
{
let filename_list: Vec<_> = chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();
filename.to_string()
})
.collect();
let mut parquet_file_assignment = vec![];
for filename in &filename_list {
let total_row_num =
get_total_row_nums_for_parquet_file(
filename,
source_desc.clone(),
)
.await?;
parquet_file_assignment.push(
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
total_row_num - 1, // -1 because offset start from 0.
),
)
})
.collect();
}
parquet_file_assignment
} else {
chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();

let size = row.datum_at(2).unwrap().into_int64();
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
size as usize,
)
})
.collect()
};
state_store_handler.set_states(file_assignment).await?;
state_store_handler.state_table.try_flush().await?;
}
Expand All @@ -337,7 +374,6 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
}
_ => unreachable!(),
};

if offset.parse::<usize>().unwrap() >= fs_split.size {
splits_on_fetch -= 1;
state_store_handler.delete(split_id).await?;
Expand Down
Loading