Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): parquet file source use number of rows to determine the end of the file reading #18149

Merged
merged 14 commits into from
Aug 29, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl FsSplit {
pub struct OpendalFsSplit<Src: OpendalSource> {
pub name: String,
pub offset: usize,
// For Parquet encoding, the size represents the number of rows, while for other encodings, the size denotes the file size.
pub size: usize,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
_marker: PhantomData<Src>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

#[derive(Debug, Clone)]
pub struct OpendalEnumerator<Src: OpendalSource> {
pub(crate) op: Operator,
pub op: Operator,
// prefix is used to reduce the number of objects to be listed
pub(crate) prefix: Option<String>,
pub(crate) matcher: Option<glob::Pattern>,
Expand Down
8 changes: 8 additions & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ lru = { workspace = true }
maplit = "1.0.2"
memcomparable = "0.2"
multimap = "0.10"
opendal = { workspace = true, features = [
"executors-tokio",
"services-azblob",
"services-fs",
"services-gcs",
"services-s3",
] }
parking_lot = { workspace = true }
parquet = { workspace = true }
pin-project = "1"
prehash = "1"
prometheus = { version = "0.13", features = ["process"] }
Expand Down
14 changes: 14 additions & 0 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ pub enum ErrorKind {
DmlError,
),

#[error(transparent)]
OpendalError(
#[from]
#[backtrace]
opendal::Error,
),

#[error(transparent)]
Parquet(
#[from]
#[backtrace]
parquet::errors::ParquetError,
),

#[error(transparent)]
NotImplemented(#[from] NotImplemented),

Expand Down
148 changes: 134 additions & 14 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::IntoFuture;
use std::marker::PhantomData;
use std::ops::Bound;

use either::Either;
use futures::{stream, TryStreamExt};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::types::ScalarRef;
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
use risingwave_connector::parser::EncodingProperties;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::OpendalFsSplit;
use risingwave_connector::source::reader::desc::SourceDesc;
use risingwave_connector::source::{
BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
BoxChunkSourceStream, ConnectorProperties, SourceContext, SourceCtrlOpts, SplitImpl,
SplitMetaData,
};
use risingwave_storage::store::PrefetchOptions;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -299,19 +307,53 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
// Receiving file assignments from upstream list executor,
// store into state table.
Message::Chunk(chunk) => {
let file_assignment = chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();
let size = row.datum_at(2).unwrap().into_int64();
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
size as usize,
// For Parquet encoding, the offset indicates the current row being read.
// Therefore, to determine if the end of a Parquet file has been reached, we need to compare its offset with the total number of rows.
// We directly obtain the total row count and set the size in `OpendalFsSplit` to this value.
let file_assignment = if let EncodingProperties::Parquet =
source_desc.source.parser_config.encoding_config
{
let filename_list: Vec<_> = chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();
filename.to_string()
})
.collect();
let mut parquet_file_assignment = vec![];
for filename in &filename_list {
let total_row_num =
get_total_row_nums_for_parquet_file(
filename,
source_desc.clone(),
)
.await?;
parquet_file_assignment.push(
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
total_row_num - 1, // -1 because offset start from 0.
),
)
})
.collect();
}
parquet_file_assignment
} else {
chunk
.data_chunk()
.rows()
.map(|row| {
let filename = row.datum_at(0).unwrap().into_utf8();

let size = row.datum_at(2).unwrap().into_int64();
OpendalFsSplit::<Src>::new(
filename.to_owned(),
0,
size as usize,
)
})
.collect()
};
state_store_handler.set_states(file_assignment).await?;
state_store_handler.state_table.try_flush().await?;
}
Expand Down Expand Up @@ -381,3 +423,81 @@ impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
}
}
}

/// Retrieves the total number of rows in the specified Parquet file.
///
/// This function constructs an `OpenDAL` operator using the information
/// from the provided `source_desc`. It then accesses the metadata of the
/// Parquet file to determine and return the total row count.
///
/// # Arguments
///
/// * `file_name` - The parquet file name.
/// * `source_desc` - A struct or type containing the necessary information
/// to construct the `OpenDAL` operator.
///
/// # Returns
///
/// Returns the total number of rows in the Parquet file as a `usize`.
async fn get_total_row_nums_for_parquet_file(
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
parquet_file_name: &str,
source_desc: SourceDesc,
) -> StreamExecutorResult<usize> {
let total_row_num = match source_desc.source.config {
ConnectorProperties::Gcs(prop) => {
let connector: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
let reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();

ParquetRecordBatchStreamBuilder::new(reader)
.await?
.metadata()
.file_metadata()
.num_rows()
}
ConnectorProperties::OpendalS3(prop) => {
let connector: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
let reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();
ParquetRecordBatchStreamBuilder::new(reader)
.await?
.metadata()
.file_metadata()
.num_rows()
}

ConnectorProperties::PosixFs(prop) => {
let connector: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
let reader = connector
.op
.reader_with(parquet_file_name)
.into_future()
.await?
.into_futures_async_read(..)
.await?
.compat();
ParquetRecordBatchStreamBuilder::new(reader)
.await?
.metadata()
.file_metadata()
.num_rows()
}
other => bail!("Unsupported source: {:?}", other),
};
Ok(total_row_num as usize)
}
2 changes: 2 additions & 0 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,8 @@ impl ScoredStreamError {
| ErrorKind::ExprError(_)
| ErrorKind::SerdeError(_)
| ErrorKind::SinkError(_, _)
| ErrorKind::OpendalError(_)
| ErrorKind::Parquet(_)
| ErrorKind::RpcError(_)
| ErrorKind::AlignBarrier(_, _)
| ErrorKind::ConnectorError(_)
Expand Down
Loading