From 1795e6cdccf8bb61019026c693b0874faa204fad Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 5 Jan 2024 00:19:07 -0500 Subject: [PATCH] feat(connector): support local fs source (#14312) Co-authored-by: KeXiangWang --- ci/scripts/e2e-source-test.sh | 3 + ci/scripts/notify.py | 3 +- ci/workflows/main-cron.yml | 16 +++ e2e_test/s3/posix_fs_source.py | 136 ++++++++++++++++++ e2e_test/s3/run_csv.py | 1 - e2e_test/source/opendal/data/data1.csv | 6 + e2e_test/source/opendal/data/data2.csv | 6 + e2e_test/source/opendal/posix_fs.slt | 33 +++++ src/connector/src/macros.rs | 1 + src/connector/src/source/base.rs | 12 +- .../source/filesystem/opendal_source/mod.rs | 41 ++++++ .../opendal_source/posix_fs_source.rs | 58 ++++++++ src/connector/src/source/mod.rs | 4 +- src/connector/with_options_source.yaml | 9 ++ src/frontend/src/handler/create_source.rs | 5 +- src/source/src/connector_source.rs | 7 +- .../src/executor/source/fetch_executor.rs | 7 +- src/stream/src/from_proto/source/fs_fetch.rs | 15 +- 18 files changed, 353 insertions(+), 10 deletions(-) create mode 100644 e2e_test/s3/posix_fs_source.py create mode 100644 e2e_test/source/opendal/data/data1.csv create mode 100644 e2e_test/source/opendal/data/data2.csv create mode 100644 e2e_test/source/opendal/posix_fs.slt create mode 100644 src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index c1624230abc7e..64144d051ad58 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -56,6 +56,9 @@ echo "--- inline cdc test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt' +echo "--- opendal source test" +sqllogictest -p 4566 -d dev './e2e_test/source/opendal/**/*.slt' + echo "--- mysql & postgres cdc validate test" sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt' sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt' diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 818dfce72143a..5266998b0045f 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -19,7 +19,8 @@ "e2e-java-binding-tests": ["yiming"], "e2e-clickhouse-sink-tests": ["bohan"], "e2e-pulsar-sink-tests": ["renjie"], - "s3-source-test-for-opendal-fs-engine": ["congyi"], + "s3-source-test-for-opendal-fs-engine": ["congyi", "kexiang"], + "s3-source-tests": ["congyi", "kexiang"], "pulsar-source-tests": ["renjie"], "connector-node-integration-test": ["siyuan"], } diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 75f58eadf2492..653578e4688e2 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -489,6 +489,22 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "PosixFs source on OpenDAL fs engine (csv parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-s3-source-tests" + || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + depends_on: build + plugins: + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "S3 source on OpenDAL fs engine" key: "s3-source-test-for-opendal-fs-engine" command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run" diff --git a/e2e_test/s3/posix_fs_source.py b/e2e_test/s3/posix_fs_source.py new file mode 100644 index 0000000000000..a7cea46fa496a --- /dev/null +++ b/e2e_test/s3/posix_fs_source.py @@ -0,0 +1,136 @@ +import os +import sys +import csv +import random +import psycopg2 +import opendal + +from time import sleep +from io import StringIO +from functools import partial + +def gen_data(file_num, item_num_per_file): + assert item_num_per_file % 2 == 0, \ + f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + return [ + [{ + 'id': file_id * item_num_per_file + item_id, + 'name': f'{file_id}_{item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + } for item_id in range(item_num_per_file)] + for file_id in range(file_num) + ] + +def format_csv(data, with_header): + csv_files = [] + + for file_data in data: + ostream = StringIO() + writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) + if with_header: + writer.writeheader() + for item_data in file_data: + writer.writerow(item_data) + csv_files.append(ostream.getvalue()) + return csv_files + + +def do_test(file_num, item_num_per_file, prefix, fmt): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f'posix_fs_test_{fmt}' + + def _encode(): + return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE {_table()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 'posix_fs', + match_pattern = '{prefix}*.{fmt}', + posix_fs.root = '/tmp', + ) FORMAT PLAIN ENCODE {_encode()};''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_table()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") + sleep(30) + + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) + + print('Test pass') + + cur.execute(f'drop table {_table()}') + cur.close() + conn.close() + + +if __name__ == "__main__": + FILE_NUM = 4001 + ITEM_NUM_PER_FILE = 2 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + + fmt = sys.argv[1] + FORMATTER = { + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } + assert fmt in FORMATTER, f"Unsupported format: {fmt}" + formatted_files = FORMATTER[fmt](data) + + run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.{fmt}' + _posix = lambda idx: f"{run_id}_data_{idx}.{fmt}" + # put local files + op = opendal.Operator("fs", root="/tmp") + + print("write file to /tmp") + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + file_name = _posix(idx) + file_bytes = file_str.encode('utf-8') + op.write(file_name, file_bytes) + + # do test + print("do test") + do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + + # clean up local files + print("clean up local files in /tmp") + for idx, _ in enumerate(formatted_files): + file_name = _posix(idx) + op.delete(file_name) diff --git a/e2e_test/s3/run_csv.py b/e2e_test/s3/run_csv.py index b721e3c796066..a6c0dc37bc4ca 100644 --- a/e2e_test/s3/run_csv.py +++ b/e2e_test/s3/run_csv.py @@ -1,5 +1,4 @@ import os -import string import json import string from time import sleep diff --git a/e2e_test/source/opendal/data/data1.csv b/e2e_test/source/opendal/data/data1.csv new file mode 100644 index 0000000000000..9279ffa24aa25 --- /dev/null +++ b/e2e_test/source/opendal/data/data1.csv @@ -0,0 +1,6 @@ +carat,cut,color,depth +0.25,Ideal,E,61.4 +0.22,Premium,I,62.0 +0.28,Good,J,63.1 +0.23,Very Good,H,57.5 +0.30,Fair,E,64.7 diff --git a/e2e_test/source/opendal/data/data2.csv b/e2e_test/source/opendal/data/data2.csv new file mode 100644 index 0000000000000..b5e39f73f2d51 --- /dev/null +++ b/e2e_test/source/opendal/data/data2.csv @@ -0,0 +1,6 @@ +carat,cut,color,depth +1.25,Ideal,E,61.4 +1.22,Premium,I,62.0 +1.28,Good,J,63.1 +1.23,Very Good,H,57.5 +1.30,Fair,E,64.7 diff --git a/e2e_test/source/opendal/posix_fs.slt b/e2e_test/source/opendal/posix_fs.slt new file mode 100644 index 0000000000000..3fc572a1a1cc8 --- /dev/null +++ b/e2e_test/source/opendal/posix_fs.slt @@ -0,0 +1,33 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE TABLE diamonds ( + carat FLOAT, + cut TEXT, + color TEXT, + depth FLOAT, +) WITH ( + connector = 'posix_fs', + match_pattern = 'data*.csv', + posix_fs.root = 'e2e_test/source/opendal/data', +) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ','); + +sleep 10s + +query TTTT rowsort +select * from diamonds; +---- +0.22 Premium I 62 +0.23 Very Good H 57.5 +0.25 Ideal E 61.4 +0.28 Good J 63.1 +0.3 Fair E 64.7 +1.22 Premium I 62 +1.23 Very Good H 57.5 +1.25 Ideal E 61.4 +1.28 Good J 63.1 +1.3 Fair E 64.7 + +statement ok +DROP TABLE diamonds; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 47839f6dc5d5a..9a2383dbb4a96 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -35,6 +35,7 @@ macro_rules! for_all_classified_sources { { S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }, { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, + { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> }, { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit} } $( diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1a4a594d44285..b6093a351783b 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; -use super::OPENDAL_S3_CONNECTOR; +use super::{OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; @@ -386,14 +386,20 @@ impl ConnectorProperties { pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + }) .unwrap_or(false) } pub fn is_new_fs_connector_hash_map(with_properties: &HashMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + }) .unwrap_or(false) } } diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 93e09be83ebf1..d6223c467d08b 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; pub mod gcs_source; +pub mod posix_fs_source; pub mod s3_source; use serde::Deserialize; @@ -31,6 +32,7 @@ use crate::source::{SourceProperties, UnknownFields}; pub const GCS_CONNECTOR: &str = "gcs"; // The new s3_v2 will use opendal. pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; +pub const POSIX_FS_CONNECTOR: &str = "posix_fs"; #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] pub struct GcsProperties { @@ -89,6 +91,17 @@ impl OpendalSource for OpendalGcs { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OpendalPosixFs; + +impl OpendalSource for OpendalPosixFs { + type Properties = PosixFsProperties; + + fn new_enumerator(properties: Self::Properties) -> anyhow::Result> { + OpendalEnumerator::new_posix_fs_source(properties) + } +} + #[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] pub struct OpendalS3Properties { #[serde(flatten)] @@ -115,3 +128,31 @@ impl SourceProperties for OpendalS3Properties { const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR; } + +#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] +pub struct PosixFsProperties { + // The root directly of the files to search. The files will be searched recursively. + #[serde(rename = "posix_fs.root")] + pub root: String, + + // The regex pattern to match files under root directory. + #[serde(rename = "match_pattern", default)] + pub match_pattern: Option, + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl UnknownFields for PosixFsProperties { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +impl SourceProperties for PosixFsProperties { + type Split = OpendalFsSplit; + type SplitEnumerator = OpendalEnumerator; + type SplitReader = OpendalReader; + + const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR; +} diff --git a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs new file mode 100644 index 0000000000000..748230ba5a16e --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs @@ -0,0 +1,58 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; + +use anyhow::Context; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Fs; +use opendal::Operator; + +use super::opendal_enumerator::OpendalEnumerator; +use super::{OpendalSource, PosixFsProperties}; + +// Posix fs source should only be used for testing. +// For a single-CN cluster, the behavior is well-defined. It will read from the local file system. +// For a multi-CN cluster, each CN will read from its own local file system under the given directory. + +impl OpendalEnumerator { + /// create opendal posix fs source. + pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> anyhow::Result { + // Create Fs builder. + let mut builder = Fs::default(); + + builder.root(&posix_fs_properties.root); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() { + // TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal) + // So we assign prefix to empty string. + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(String::new()), Some(matcher)) + } else { + (None, None) + }; + Ok(Self { + op, + prefix, + matcher, + marker: PhantomData, + }) + } +} diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 9dd1ef5b76cde..6cdc7d30e277a 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -39,7 +39,9 @@ pub use manager::{SourceColumnDesc, SourceColumnType}; pub use crate::parser::additional_columns::{ get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn, }; -pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR}; +pub use crate::source::filesystem::opendal_source::{ + GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, +}; pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; pub use crate::source::pulsar::PULSAR_CONNECTOR; diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 6ee69ccd2ab18..98a45599b56f2 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -455,6 +455,15 @@ OpendalS3Properties: field_type: String required: false default: Default::default +PosixFsProperties: + fields: + - name: posix_fs.root + field_type: String + required: true + - name: match_pattern + field_type: String + required: false + default: Default::default PubsubProperties: fields: - name: pubsub.split_count diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 80e526b01c15f..c0d3248b5af8a 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -45,7 +45,7 @@ use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, - PULSAR_CONNECTOR, S3_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, @@ -917,6 +917,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Csv, Encode::Json], ), + POSIX_FS_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv], + ), MYSQL_CDC_CONNECTOR => hashmap!( Format::Debezium => vec![Encode::Json], // support source stream job diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index ecb1846c01249..441a91836bb0a 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -30,7 +30,7 @@ use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{ - OpendalGcs, OpendalS3, OpendalSource, + OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; use risingwave_connector::source::filesystem::FsPageItem; use risingwave_connector::source::{ @@ -103,6 +103,11 @@ impl ConnectorSource { OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; Ok(build_opendal_fs_list_stream(lister)) } + ConnectorProperties::PosixFs(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_posix_fs_source(*prop)?; + Ok(build_opendal_fs_list_stream(lister)) + } other => bail!("Unsupported source: {:?}", other), } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 32cd08e9c87fe..633dce3b0b9a8 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -26,7 +26,7 @@ use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{ScalarRef, ScalarRefImpl}; use risingwave_connector::source::filesystem::opendal_source::{ - OpendalGcs, OpendalS3, OpendalSource, + OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; use risingwave_connector::source::filesystem::OpendalFsSplit; use risingwave_connector::source::{ @@ -125,6 +125,11 @@ impl FsFetchExecutor { OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; SplitImpl::from(split) } + risingwave_connector::source::ConnectorProperties::PosixFs(_) => { + let split: OpendalFsSplit = + OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; + SplitImpl::from(split) + } _ => unreachable!(), }, _ => unreachable!(), diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index dc6b718f566ea..8d8cb78a80b19 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -15,7 +15,9 @@ use std::sync::Arc; use risingwave_common::catalog::{ColumnId, TableId}; -use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; +use risingwave_connector::source::filesystem::opendal_source::{ + OpendalGcs, OpendalPosixFs, OpendalS3, +}; use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; use risingwave_pb::stream_plan::StreamFsFetchNode; use risingwave_source::source_desc::SourceDescBuilder; @@ -110,6 +112,17 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { ) .boxed() } + risingwave_connector::source::ConnectorProperties::PosixFs(_) => { + FsFetchExecutor::<_, OpendalPosixFs>::new( + params.actor_context.clone(), + params.info, + stream_source_core, + upstream, + source_ctrl_opts, + params.env.connector_params(), + ) + .boxed() + } _ => unreachable!(), }; let rate_limit = source.rate_limit.map(|x| x as _);