Skip to content

Commit

Permalink
feat(connector): support posix as local fs source
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jan 3, 2024
1 parent c8cdb9f commit 8f4b503
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 17 deletions.
17 changes: 10 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mysql_common = { version = "0.31", default-features = false, features = [
] }
nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
opendal = { git = "https://github.com/apache/incubator-opendal", rev = "9a222e4d72b328a24d5775b1565292f4636bbe69" }
opendal = "0.44"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ macro_rules! for_all_classified_sources {
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
}
$(
Expand Down
12 changes: 9 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use super::{POSIX_FS_CONNECTOR, OPENDAL_S3_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
Expand Down Expand Up @@ -386,14 +386,20 @@ impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
})
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
})
.unwrap_or(false)
}
}
Expand Down
38 changes: 38 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

pub mod gcs_source;
pub mod posix_fs_source;
pub mod s3_source;

use serde::Deserialize;
Expand All @@ -31,6 +32,7 @@ use crate::source::{SourceProperties, UnknownFields};
pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
pub const POSIX_FS_CONNECTOR: &str = "posix_fs";

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
Expand Down Expand Up @@ -89,6 +91,17 @@ impl OpendalSource for OpendalGcs {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalPosixFs;

impl OpendalSource for OpendalPosixFs {
type Properties = PosixFsProperties;

fn new_enumerator(properties: Self::Properties) -> anyhow::Result<OpendalEnumerator<Self>> {
OpendalEnumerator::new_posix_fs_source(properties)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct OpendalS3Properties {
#[serde(flatten)]
Expand All @@ -115,3 +128,28 @@ impl SourceProperties for OpendalS3Properties {

const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct PosixFsProperties {
#[serde(rename = "posix_fs.root")]
pub root: String,
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl UnknownFields for PosixFsProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl SourceProperties for PosixFsProperties {
type Split = OpendalFsSplit<OpendalPosixFs>;
type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
type SplitReader = OpendalReader<OpendalPosixFs>;

const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Fs;
use opendal::Operator;

use super::opendal_enumerator::OpendalEnumerator;
use super::{OpendalSource, PosixFsProperties};

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal posix fs source.
pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> anyhow::Result<Self> {
// Create Fs builder.
let mut builder = Fs::default();

builder.root(&posix_fs_properties.root);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();

let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() {
// TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal)
// So we assign prefix to empty string.
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(String::new()), Some(matcher))
} else {
(None, None)
};
Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
})
}
}
4 changes: 3 additions & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub use manager::{SourceColumnDesc, SourceColumnType};
pub use crate::parser::additional_columns::{
get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn,
};
pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR};
pub use crate::source::filesystem::opendal_source::{
GCS_CONNECTOR, POSIX_FS_CONNECTOR, OPENDAL_S3_CONNECTOR,
};
pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use risingwave_connector::source::nexmark::source::{get_event_data_types_with_na
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_connector::source::{
get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR,
KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
PULSAR_CONNECTOR, S3_CONNECTOR,
KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
POSIX_FS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
Expand Down Expand Up @@ -1091,6 +1091,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
GCS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json],
),
POSIX_FS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv],
),
MYSQL_CDC_CONNECTOR => hashmap!(
Format::Debezium => vec![Encode::Json],
// support source stream job
Expand Down
7 changes: 6 additions & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_connector::dispatch_source_prop;
use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalS3, OpendalSource,
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::FsPageItem;
use risingwave_connector::source::{
Expand Down Expand Up @@ -103,6 +103,11 @@ impl ConnectorSource {
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::PosixFs(prop) => {
let lister: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
}
other => bail!("Unsupported source: {:?}", other),
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{ScalarRef, ScalarRefImpl};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalS3, OpendalSource,
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::OpendalFsSplit;
use risingwave_connector::source::{
Expand Down Expand Up @@ -125,6 +125,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
let split: OpendalFsSplit<OpendalPosixFs> =
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
_ => unreachable!(),
},
_ => unreachable!(),
Expand Down
15 changes: 14 additions & 1 deletion src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::sync::Arc;

use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3,
};
use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts};
use risingwave_pb::stream_plan::StreamFsFetchNode;
use risingwave_source::source_desc::SourceDescBuilder;
Expand Down Expand Up @@ -110,6 +112,17 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
)
.boxed()
}
risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
FsFetchExecutor::<_, OpendalPosixFs>::new(
params.actor_context.clone(),
params.info,
stream_source_core,
upstream,
source_ctrl_opts,
params.env.connector_params(),
)
.boxed()
}
_ => unreachable!(),
};
let rate_limit = source.rate_limit.map(|x| x as _);
Expand Down

0 comments on commit 8f4b503

Please sign in to comment.