From 8f4b503f60ec63ff7c4fe5d9cdc1dfd53242fdfc Mon Sep 17 00:00:00 2001
From: Kexiang Wang <kx.wang@hotmail.com>
Date: Tue, 2 Jan 2024 20:41:12 -0500
Subject: [PATCH] feat(connector): support posix as local fs source

---
 Cargo.lock                                    | 17 +++---
 src/connector/Cargo.toml                      |  2 +-
 src/connector/src/macros.rs                   |  1 +
 src/connector/src/source/base.rs              | 12 +++--
 .../source/filesystem/opendal_source/mod.rs   | 38 +++++++++++++
 .../opendal_source/posix_fs_source.rs         | 54 +++++++++++++++++++
 src/connector/src/source/mod.rs               |  4 +-
 src/frontend/src/handler/create_source.rs     |  7 ++-
 src/source/src/connector_source.rs            |  7 ++-
 .../src/executor/source/fetch_executor.rs     |  7 ++-
 src/stream/src/from_proto/source/fs_fetch.rs  | 15 +++++-
 11 files changed, 147 insertions(+), 17 deletions(-)
 create mode 100644 src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs

diff --git a/Cargo.lock b/Cargo.lock
index ed87be2ae251d..31e9bfc3a889e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4714,7 +4714,7 @@ dependencies = [
  "log",
  "murmur3",
  "once_cell",
- "opendal 0.43.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "opendal 0.43.0",
  "ordered-float 3.9.1",
  "parquet 49.0.0",
  "prometheus",
@@ -6216,8 +6216,9 @@ dependencies = [
 
 [[package]]
 name = "opendal"
-version = "0.43.0"
-source = "git+https://github.com/apache/incubator-opendal?rev=9a222e4d72b328a24d5775b1565292f4636bbe69#9a222e4d72b328a24d5775b1565292f4636bbe69"
+version = "0.44.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66"
 dependencies = [
  "anyhow",
  "async-compat",
@@ -6228,6 +6229,7 @@ dependencies = [
  "chrono",
  "flagset",
  "futures",
+ "getrandom",
  "http 0.2.9",
  "log",
  "md-5",
@@ -7826,15 +7828,16 @@ dependencies = [
 
 [[package]]
 name = "reqsign"
-version = "0.14.5"
+version = "0.14.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f52b6eef6975eb2decff7d7e95744c8a6b6bb8558bc9b4230c0a3431a74f59c"
+checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd"
 dependencies = [
  "anyhow",
  "async-trait",
  "base64 0.21.4",
  "chrono",
  "form_urlencoded",
+ "getrandom",
  "hex",
  "hmac",
  "home",
@@ -8471,7 +8474,7 @@ dependencies = [
  "mysql_common",
  "nexmark",
  "num-bigint",
- "opendal 0.43.0 (git+https://github.com/apache/incubator-opendal?rev=9a222e4d72b328a24d5775b1565292f4636bbe69)",
+ "opendal 0.44.0",
  "parking_lot 0.12.1",
  "paste",
  "pretty_assertions",
@@ -9039,7 +9042,7 @@ dependencies = [
  "itertools 0.12.0",
  "madsim-aws-sdk-s3",
  "madsim-tokio",
- "opendal 0.43.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "opendal 0.43.0",
  "prometheus",
  "risingwave_common",
  "rustls",
diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml
index 82303fd620f29..5d74d749f929d 100644
--- a/src/connector/Cargo.toml
+++ b/src/connector/Cargo.toml
@@ -79,7 +79,7 @@ mysql_common = { version = "0.31", default-features = false, features = [
 ] }
 nexmark = { version = "0.2", features = ["serde"] }
 num-bigint = "0.4"
-opendal = { git = "https://github.com/apache/incubator-opendal", rev = "9a222e4d72b328a24d5775b1565292f4636bbe69" }
+opendal = "0.44"
 parking_lot = "0.12"
 paste = "1"
 prometheus = { version = "0.13", features = ["process"] }
diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs
index 47839f6dc5d5a..9a2383dbb4a96 100644
--- a/src/connector/src/macros.rs
+++ b/src/connector/src/macros.rs
@@ -35,6 +35,7 @@ macro_rules! for_all_classified_sources {
                 { S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
                 { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
                 { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
+                { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
                 { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
             }
             $(
diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs
index 1a4a594d44285..58b499421d48b 100644
--- a/src/connector/src/source/base.rs
+++ b/src/connector/src/source/base.rs
@@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
 use super::kafka::KafkaMeta;
 use super::monitor::SourceMetrics;
 use super::nexmark::source::message::NexmarkMeta;
-use super::OPENDAL_S3_CONNECTOR;
+use super::{POSIX_FS_CONNECTOR, OPENDAL_S3_CONNECTOR};
 use crate::parser::ParserConfig;
 pub(crate) use crate::source::common::CommonSplitReader;
 use crate::source::filesystem::FsPageItem;
@@ -386,14 +386,20 @@ impl ConnectorProperties {
     pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
         with_properties
             .get(UPSTREAM_SOURCE_KEY)
-            .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
+            .map(|s| {
+                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
+                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
+            })
             .unwrap_or(false)
     }
 
     pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
         with_properties
             .get(UPSTREAM_SOURCE_KEY)
-            .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
+            .map(|s| {
+                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
+                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
+            })
             .unwrap_or(false)
     }
 }
diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs
index 93e09be83ebf1..084c3601a3aba 100644
--- a/src/connector/src/source/filesystem/opendal_source/mod.rs
+++ b/src/connector/src/source/filesystem/opendal_source/mod.rs
@@ -15,6 +15,7 @@
 use std::collections::HashMap;
 
 pub mod gcs_source;
+pub mod posix_fs_source;
 pub mod s3_source;
 
 use serde::Deserialize;
@@ -31,6 +32,7 @@ use crate::source::{SourceProperties, UnknownFields};
 pub const GCS_CONNECTOR: &str = "gcs";
 // The new s3_v2 will use opendal.
 pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
+pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
 
 #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
 pub struct GcsProperties {
@@ -89,6 +91,17 @@ impl OpendalSource for OpendalGcs {
     }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct OpendalPosixFs;
+
+impl OpendalSource for OpendalPosixFs {
+    type Properties = PosixFsProperties;
+
+    fn new_enumerator(properties: Self::Properties) -> anyhow::Result<OpendalEnumerator<Self>> {
+        OpendalEnumerator::new_posix_fs_source(properties)
+    }
+}
+
 #[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
 pub struct OpendalS3Properties {
     #[serde(flatten)]
@@ -115,3 +128,28 @@ impl SourceProperties for OpendalS3Properties {
 
     const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
 }
+
+#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
+pub struct PosixFsProperties {
+    #[serde(rename = "posix_fs.root")]
+    pub root: String,
+    #[serde(rename = "match_pattern", default)]
+    pub match_pattern: Option<String>,
+
+    #[serde(flatten)]
+    pub unknown_fields: HashMap<String, String>,
+}
+
+impl UnknownFields for PosixFsProperties {
+    fn unknown_fields(&self) -> HashMap<String, String> {
+        self.unknown_fields.clone()
+    }
+}
+
+impl SourceProperties for PosixFsProperties {
+    type Split = OpendalFsSplit<OpendalPosixFs>;
+    type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
+    type SplitReader = OpendalReader<OpendalPosixFs>;
+
+    const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
+}
diff --git a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs
new file mode 100644
index 0000000000000..43ca390674e08
--- /dev/null
+++ b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs
@@ -0,0 +1,54 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::marker::PhantomData;
+
+use anyhow::Context;
+use opendal::layers::{LoggingLayer, RetryLayer};
+use opendal::services::Fs;
+use opendal::Operator;
+
+use super::opendal_enumerator::OpendalEnumerator;
+use super::{OpendalSource, PosixFsProperties};
+
+impl<Src: OpendalSource> OpendalEnumerator<Src> {
+    /// create opendal posix fs source.
+    pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> anyhow::Result<Self> {
+        // Create Fs builder.
+        let mut builder = Fs::default();
+
+        builder.root(&posix_fs_properties.root);
+
+        let op: Operator = Operator::new(builder)?
+            .layer(LoggingLayer::default())
+            .layer(RetryLayer::default())
+            .finish();
+
+        let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() {
+            // TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal)
+            // So we assign prefix to empty string.
+            let matcher = glob::Pattern::new(pattern)
+                .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
+            (Some(String::new()), Some(matcher))
+        } else {
+            (None, None)
+        };
+        Ok(Self {
+            op,
+            prefix,
+            matcher,
+            marker: PhantomData,
+        })
+    }
+}
diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs
index 9dd1ef5b76cde..1b7927e23008b 100644
--- a/src/connector/src/source/mod.rs
+++ b/src/connector/src/source/mod.rs
@@ -39,7 +39,9 @@ pub use manager::{SourceColumnDesc, SourceColumnType};
 pub use crate::parser::additional_columns::{
     get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn,
 };
-pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR};
+pub use crate::source::filesystem::opendal_source::{
+    GCS_CONNECTOR, POSIX_FS_CONNECTOR, OPENDAL_S3_CONNECTOR,
+};
 pub use crate::source::filesystem::S3_CONNECTOR;
 pub use crate::source::nexmark::NEXMARK_CONNECTOR;
 pub use crate::source::pulsar::PULSAR_CONNECTOR;
diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs
index a315fd88fc9bc..61b47eb8fe975 100644
--- a/src/frontend/src/handler/create_source.rs
+++ b/src/frontend/src/handler/create_source.rs
@@ -44,8 +44,8 @@ use risingwave_connector::source::nexmark::source::{get_event_data_types_with_na
 use risingwave_connector::source::test_source::TEST_CONNECTOR;
 use risingwave_connector::source::{
     get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR,
-    KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
-    PULSAR_CONNECTOR, S3_CONNECTOR,
+    KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
+    POSIX_FS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
 };
 use risingwave_pb::catalog::{
     PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
@@ -1091,6 +1091,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
                 GCS_CONNECTOR => hashmap!(
                     Format::Plain => vec![Encode::Csv, Encode::Json],
                 ),
+                POSIX_FS_CONNECTOR => hashmap!(
+                    Format::Plain => vec![Encode::Csv],
+                ),
                 MYSQL_CDC_CONNECTOR => hashmap!(
                     Format::Debezium => vec![Encode::Json],
                     // support source stream job
diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs
index ecb1846c01249..441a91836bb0a 100644
--- a/src/source/src/connector_source.rs
+++ b/src/source/src/connector_source.rs
@@ -30,7 +30,7 @@ use risingwave_connector::dispatch_source_prop;
 use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
 use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
 use risingwave_connector::source::filesystem::opendal_source::{
-    OpendalGcs, OpendalS3, OpendalSource,
+    OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
 };
 use risingwave_connector::source::filesystem::FsPageItem;
 use risingwave_connector::source::{
@@ -103,6 +103,11 @@ impl ConnectorSource {
                     OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
                 Ok(build_opendal_fs_list_stream(lister))
             }
+            ConnectorProperties::PosixFs(prop) => {
+                let lister: OpendalEnumerator<OpendalPosixFs> =
+                    OpendalEnumerator::new_posix_fs_source(*prop)?;
+                Ok(build_opendal_fs_list_stream(lister))
+            }
             other => bail!("Unsupported source: {:?}", other),
         }
     }
diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs
index 32cd08e9c87fe..633dce3b0b9a8 100644
--- a/src/stream/src/executor/source/fetch_executor.rs
+++ b/src/stream/src/executor/source/fetch_executor.rs
@@ -26,7 +26,7 @@ use risingwave_common::hash::VnodeBitmapExt;
 use risingwave_common::row::{OwnedRow, Row};
 use risingwave_common::types::{ScalarRef, ScalarRefImpl};
 use risingwave_connector::source::filesystem::opendal_source::{
-    OpendalGcs, OpendalS3, OpendalSource,
+    OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
 };
 use risingwave_connector::source::filesystem::OpendalFsSplit;
 use risingwave_connector::source::{
@@ -125,6 +125,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
                                 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
                             SplitImpl::from(split)
                         }
+                        risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
+                            let split: OpendalFsSplit<OpendalPosixFs> =
+                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
+                            SplitImpl::from(split)
+                        }
                         _ => unreachable!(),
                     },
                     _ => unreachable!(),
diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs
index dc6b718f566ea..8d8cb78a80b19 100644
--- a/src/stream/src/from_proto/source/fs_fetch.rs
+++ b/src/stream/src/from_proto/source/fs_fetch.rs
@@ -15,7 +15,9 @@
 use std::sync::Arc;
 
 use risingwave_common::catalog::{ColumnId, TableId};
-use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
+use risingwave_connector::source::filesystem::opendal_source::{
+    OpendalGcs, OpendalPosixFs, OpendalS3,
+};
 use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts};
 use risingwave_pb::stream_plan::StreamFsFetchNode;
 use risingwave_source::source_desc::SourceDescBuilder;
@@ -110,6 +112,17 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
                 )
                 .boxed()
             }
+            risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
+                FsFetchExecutor::<_, OpendalPosixFs>::new(
+                    params.actor_context.clone(),
+                    params.info,
+                    stream_source_core,
+                    upstream,
+                    source_ctrl_opts,
+                    params.env.connector_params(),
+                )
+                .boxed()
+            }
             _ => unreachable!(),
         };
         let rate_limit = source.rate_limit.map(|x| x as _);