Skip to content

Commit

Permalink
also add azblob for batch read
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Aug 28, 2024
1 parent 428b645 commit 4ac66c4
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
6 changes: 4 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use risingwave_pb::plan_common::{
use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
Expand All @@ -57,6 +57,8 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
(AZBLOB_CONNECTOR, HashSet::from(["file", "offset"])),
(POSIX_FS_CONNECTOR, HashSet::from(["file", "offset"])),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
let object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.recursive(false)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(lister: OpendalEnumera
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
println!("这里 res{:?}", res.name);
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalAzblob, OpendalGcs, OpendalS3,
};
use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo};
use risingwave_connector::source::kafka::KafkaSplitEnumerator;
use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch;
Expand Down Expand Up @@ -330,6 +332,15 @@ impl SourceScanInfo {

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Azblob(prop) => {
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec();

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Iceberg(prop) => {
let iceberg_enumerator =
IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into())
Expand Down Expand Up @@ -740,7 +751,9 @@ impl StageGraph {
let task_parallelism = match &stage.source_info {
Some(SourceScanInfo::Incomplete(source_fetch_info)) => {
match source_fetch_info.connector {
ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => (min(
ConnectorProperties::Gcs(_)
| ConnectorProperties::OpendalS3(_)
| ConnectorProperties::Azblob(_) => (min(
complete_source_info.split_info().unwrap().len() as u32,
(self.batch_parallelism / 2) as u32,
))
Expand Down

0 comments on commit 4ac66c4

Please sign in to comment.