Skip to content

Commit

Permalink
feat(connector): support azblob file source (#18295) (#18306)
Browse files Browse the repository at this point in the history
Co-authored-by: congyi wang <[email protected]>
  • Loading branch information
github-actions[bot] and wcy-fdu authored Aug 28, 2024
1 parent 612955d commit 40fdddc
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ macro_rules! for_all_classified_sources {
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
{ Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use risingwave_pb::plan_common::{
use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
Expand All @@ -57,6 +57,8 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
(AZBLOB_CONNECTOR, HashSet::from(["file", "offset"])),
(POSIX_FS_CONNECTOR, HashSet::from(["file", "offset"])),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use super::kafka::KafkaMeta;
use super::kinesis::KinesisMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::error::ConnectorResult as Result;
use crate::parser::schema_change::SchemaChangeEnvelope;
use crate::parser::ParserConfig;
Expand Down Expand Up @@ -385,6 +385,7 @@ impl ConnectorProperties {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
|| s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
})
.unwrap_or(false)
}
Expand Down Expand Up @@ -435,6 +436,7 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kafka(_))
|| matches!(self, ConnectorProperties::OpendalS3(_))
|| matches!(self, ConnectorProperties::Gcs(_))
|| matches!(self, ConnectorProperties::Azblob(_))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Azblob;
use opendal::Operator;

use super::opendal_enumerator::OpendalEnumerator;
use super::{AzblobProperties, OpendalSource};
use crate::error::ConnectorResult;
use crate::source::filesystem::s3::enumerator::get_prefix;

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal azblob source.
pub fn new_azblob_source(azblob_properties: AzblobProperties) -> ConnectorResult<Self> {
// Create azblob builder.
let mut builder = Azblob::default();

builder.container(&azblob_properties.container_name);

builder.endpoint(&azblob_properties.endpoint_url);

if let Some(account_name) = azblob_properties.account_name {
builder.account_name(&account_name);
} else {
tracing::warn!(
"account_name azblob is not set, container {}",
azblob_properties.container_name
);
}

if let Some(account_key) = azblob_properties.account_key {
builder.account_key(&account_key);
} else {
tracing::warn!(
"account_key azblob is not set, container {}",
azblob_properties.container_name
);
}
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();

let (prefix, matcher) = if let Some(pattern) = azblob_properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};

let compression_format = azblob_properties.compression_format;

Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
compression_format,
})
}
}
49 changes: 49 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

pub mod azblob_source;
pub mod gcs_source;
pub mod posix_fs_source;
pub mod s3_source;
Expand All @@ -31,6 +32,7 @@ use super::OpendalFsSplit;
use crate::error::ConnectorResult;
use crate::source::{SourceProperties, UnknownFields};

pub const AZBLOB_CONNECTOR: &str = "azblob";
pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
Expand Down Expand Up @@ -168,3 +170,50 @@ impl SourceProperties for PosixFsProperties {

const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct AzblobProperties {
#[serde(rename = "azblob.container_name")]
pub container_name: String,

#[serde(rename = "azblob.credentials.account_name", default)]
pub account_name: Option<String>,
#[serde(rename = "azblob.credentials.account_key", default)]
pub account_key: Option<String>,
#[serde(rename = "azblob.endpoint_url")]
pub endpoint_url: String,

#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,

#[serde(rename = "compression_format", default = "Default::default")]
pub compression_format: CompressionFormat,
}

impl UnknownFields for AzblobProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl SourceProperties for AzblobProperties {
type Split = OpendalFsSplit<OpendalAzblob>;
type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
type SplitReader = OpendalReader<OpendalAzblob>;

const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalAzblob;

impl OpendalSource for OpendalAzblob {
type Properties = AzblobProperties;

fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
OpendalEnumerator::new_azblob_source(properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
let object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.recursive(false)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use risingwave_common::array::{Array, ArrayRef};
use thiserror_ext::AsReport;

pub use crate::source::filesystem::opendal_source::{
GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
};
pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::error::ConnectorResult;
use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use crate::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
Expand Down Expand Up @@ -95,6 +95,11 @@ impl SourceReader {
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::Azblob(prop) => {
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::PosixFs(prop) => {
let lister: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
Expand Down Expand Up @@ -192,6 +197,7 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(lister: OpendalEnumera
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
println!("这里 res{:?}", res.name);
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::sink::catalog::SinkFormatDesc;
use crate::source::cdc::external::CdcTableType;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
UPSTREAM_SOURCE_KEY,
};

/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
Expand Down Expand Up @@ -142,6 +143,7 @@ pub trait WithPropertiesExt: Get + Sized {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
|| s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
})
.unwrap_or(false)
}
Expand Down
24 changes: 24 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
# THIS FILE IS AUTO_GENERATED. DO NOT EDIT
# UPDATE WITH: ./risedev generate-with-options

AzblobProperties:
fields:
- name: azblob.container_name
field_type: String
required: true
- name: azblob.credentials.account_name
field_type: String
required: false
default: Default::default
- name: azblob.credentials.account_key
field_type: String
required: false
default: Default::default
- name: azblob.endpoint_url
field_type: String
required: true
- name: match_pattern
field_type: String
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
default: Default::default
DatagenProperties:
fields:
- name: datagen.split.num
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use risingwave_connector::source::nexmark::source::{get_event_data_types_with_na
use risingwave_connector::source::test_source::TEST_CONNECTOR;
pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_connector::source::{
ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
Expand Down Expand Up @@ -1064,7 +1064,10 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
GCS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json],
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
AZBLOB_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
POSIX_FS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv],
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalAzblob, OpendalGcs, OpendalS3,
};
use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo};
use risingwave_connector::source::kafka::KafkaSplitEnumerator;
use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch;
Expand Down Expand Up @@ -330,6 +332,15 @@ impl SourceScanInfo {

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Azblob(prop) => {
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec();

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Iceberg(prop) => {
let iceberg_enumerator =
IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into())
Expand Down Expand Up @@ -740,7 +751,9 @@ impl StageGraph {
let task_parallelism = match &stage.source_info {
Some(SourceScanInfo::Incomplete(source_fetch_info)) => {
match source_fetch_info.connector {
ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => (min(
ConnectorProperties::Gcs(_)
| ConnectorProperties::OpendalS3(_)
| ConnectorProperties::Azblob(_) => (min(
complete_source_info.split_info().unwrap().len() as u32,
(self.batch_parallelism / 2) as u32,
))
Expand Down
7 changes: 6 additions & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::types::ScalarRef;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::OpendalFsSplit;
use risingwave_connector::source::reader::desc::SourceDesc;
Expand Down Expand Up @@ -110,6 +110,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
risingwave_connector::source::ConnectorProperties::Azblob(_) => {
let split: OpendalFsSplit<OpendalAzblob> =
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
let split: OpendalFsSplit<OpendalPosixFs> =
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
Expand Down
11 changes: 10 additions & 1 deletion src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3,
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
};
use risingwave_connector::source::reader::desc::SourceDescBuilder;
use risingwave_connector::source::ConnectorProperties;
Expand Down Expand Up @@ -104,6 +104,15 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
)
.boxed()
}
risingwave_connector::source::ConnectorProperties::Azblob(_) => {
FsFetchExecutor::<_, OpendalAzblob>::new(
params.actor_context.clone(),
stream_source_core,
upstream,
source.rate_limit,
)
.boxed()
}
risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
FsFetchExecutor::<_, OpendalPosixFs>::new(
params.actor_context.clone(),
Expand Down

0 comments on commit 40fdddc

Please sign in to comment.