Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jan 3, 2024
1 parent 4eacb34 commit 5e847e1
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ macro_rules! for_all_classified_sources {
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ LocalFs, $crate::source::filesystem::opendal_source::LocalFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalLocalFs> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
}
$(
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use super::{LOCAL_FS_CONNECTOR, OPENDAL_S3_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Fs;
use opendal::Operator;

use super::opendal_enumerator::OpendalEnumerator;
use super::{LocalFsProperties, OpendalSource};
use crate::source::filesystem::s3::enumerator::get_prefix;

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal posix fs source.
pub fn new_local_fs_source(local_fs_properties: LocalFsProperties) -> anyhow::Result<Self> {
// Create Fs builder.
let mut builder = Fs::default();

builder.root(&local_fs_properties.root);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();

let (prefix, matcher) = if let Some(pattern) = local_fs_properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
// let prefix = local_fs_properties.root.clone() + "/" + prefix.as_str();
(Some(String::new()), Some(matcher))
} else {
(None, None)
};
Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
})
}
}
38 changes: 38 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

pub mod gcs_source;
pub mod local_fs_source;
pub mod s3_source;

use serde::Deserialize;
Expand All @@ -31,6 +32,7 @@ use crate::source::{SourceProperties, UnknownFields};
pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
pub const LOCAL_FS_CONNECTOR: &str = "local_fs";

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
Expand Down Expand Up @@ -89,6 +91,17 @@ impl OpendalSource for OpendalGcs {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalLocalFs;

impl OpendalSource for OpendalLocalFs {
type Properties = LocalFsProperties;

fn new_enumerator(properties: Self::Properties) -> anyhow::Result<OpendalEnumerator<Self>> {
OpendalEnumerator::new_local_fs_source(properties)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct OpendalS3Properties {
#[serde(flatten)]
Expand All @@ -115,3 +128,28 @@ impl SourceProperties for OpendalS3Properties {

const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct LocalFsProperties {
#[serde(rename = "local_fs.root")]
pub root: String,
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl UnknownFields for LocalFsProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl SourceProperties for LocalFsProperties {
type Split = OpendalFsSplit<OpendalLocalFs>;
type SplitEnumerator = OpendalEnumerator<OpendalLocalFs>;
type SplitReader = OpendalReader<OpendalLocalFs>;

const SOURCE_NAME: &'static str = LOCAL_FS_CONNECTOR;
}
4 changes: 3 additions & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub use manager::{SourceColumnDesc, SourceColumnType};
pub use crate::parser::additional_columns::{
get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn,
};
pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR};
pub use crate::source::filesystem::opendal_source::{
GCS_CONNECTOR, LOCAL_FS_CONNECTOR, OPENDAL_S3_CONNECTOR,
};
pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use risingwave_connector::source::nexmark::source::{get_event_data_types_with_na
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_connector::source::{
get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR,
KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
PULSAR_CONNECTOR, S3_CONNECTOR,
KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
LOCAL_FS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
Expand Down Expand Up @@ -1091,6 +1091,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
GCS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json],
),
LOCAL_FS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv],
),
MYSQL_CDC_CONNECTOR => hashmap!(
Format::Debezium => vec![Encode::Json],
// support source stream job
Expand Down
7 changes: 6 additions & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_connector::dispatch_source_prop;
use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalS3, OpendalSource,
OpendalGcs, OpendalLocalFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::FsPageItem;
use risingwave_connector::source::{
Expand Down Expand Up @@ -103,6 +103,11 @@ impl ConnectorSource {
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::LocalFs(prop) => {
let lister: OpendalEnumerator<OpendalLocalFs> =
OpendalEnumerator::new_local_fs_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
}
other => bail!("Unsupported source: {:?}", other),
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{ScalarRef, ScalarRefImpl};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalS3, OpendalSource,
OpendalGcs, OpendalLocalFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::OpendalFsSplit;
use risingwave_connector::source::{
Expand Down Expand Up @@ -125,6 +125,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
risingwave_connector::source::ConnectorProperties::LocalFs(_) => {
let split: OpendalFsSplit<OpendalLocalFs> =
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
_ => unreachable!(),
},
_ => unreachable!(),
Expand Down
15 changes: 14 additions & 1 deletion src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::sync::Arc;

use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalLocalFs, OpendalS3,
};
use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts};
use risingwave_pb::stream_plan::StreamFsFetchNode;
use risingwave_source::source_desc::SourceDescBuilder;
Expand Down Expand Up @@ -110,6 +112,17 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
)
.boxed()
}
risingwave_connector::source::ConnectorProperties::LocalFs(_) => {
FsFetchExecutor::<_, OpendalLocalFs>::new(
params.actor_context.clone(),
params.info,
stream_source_core,
upstream,
source_ctrl_opts,
params.env.connector_params(),
)
.boxed()
}
_ => unreachable!(),
};
let rate_limit = source.rate_limit.map(|x| x as _);
Expand Down

0 comments on commit 5e847e1

Please sign in to comment.