Skip to content

Commit

Permalink
feat(connector): support local fs source (#14312)
Browse files Browse the repository at this point in the history
Co-authored-by: KeXiangWang <[email protected]>
  • Loading branch information
2 people authored and Li0k committed Jan 10, 2024
1 parent ca74b3c commit 1795e6c
Show file tree
Hide file tree
Showing 18 changed files with 353 additions and 10 deletions.
3 changes: 3 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ echo "--- inline cdc test"
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt'

echo "--- opendal source test"
sqllogictest -p 4566 -d dev './e2e_test/source/opendal/**/*.slt'

echo "--- mysql & postgres cdc validate test"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt'
Expand Down
3 changes: 2 additions & 1 deletion ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"e2e-java-binding-tests": ["yiming"],
"e2e-clickhouse-sink-tests": ["bohan"],
"e2e-pulsar-sink-tests": ["renjie"],
"s3-source-test-for-opendal-fs-engine": ["congyi"],
"s3-source-test-for-opendal-fs-engine": ["congyi", "kexiang"],
"s3-source-tests": ["congyi", "kexiang"],
"pulsar-source-tests": ["renjie"],
"connector-node-integration-test": ["siyuan"],
}
Expand Down
16 changes: 16 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,22 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "PosixFs source on OpenDAL fs engine (csv parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run"
Expand Down
136 changes: 136 additions & 0 deletions e2e_test/s3/posix_fs_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os
import sys
import csv
import random
import psycopg2
import opendal

from time import sleep
from io import StringIO
from functools import partial

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def format_csv(data, with_header):
csv_files = []

for file_data in data:
ostream = StringIO()
writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys())
if with_header:
writer.writeheader()
for item_data in file_data:
writer.writerow(item_data)
csv_files.append(ostream.getvalue())
return csv_files


def do_test(file_num, item_num_per_file, prefix, fmt):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return f'posix_fs_test_{fmt}'

def _encode():
return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})"

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id int,
name TEXT,
sex int,
mark int,
) WITH (
connector = 'posix_fs',
match_pattern = '{prefix}*.{fmt}',
posix_fs.root = '/tmp',
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s")
sleep(30)

stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)
_assert_eq('sum(sex)', result[2], total_rows / 2)
_assert_eq('sum(mark)', result[3], 0)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 4001
ITEM_NUM_PER_FILE = 2
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

fmt = sys.argv[1]
FORMATTER = {
'csv_with_header': partial(format_csv, with_header=True),
'csv_without_header': partial(format_csv, with_header=False),
}
assert fmt in FORMATTER, f"Unsupported format: {fmt}"
formatted_files = FORMATTER[fmt](data)

run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.{fmt}'
_posix = lambda idx: f"{run_id}_data_{idx}.{fmt}"
# put local files
op = opendal.Operator("fs", root="/tmp")

print("write file to /tmp")
for idx, file_str in enumerate(formatted_files):
with open(_local(idx), "w") as f:
f.write(file_str)
os.fsync(f.fileno())
file_name = _posix(idx)
file_bytes = file_str.encode('utf-8')
op.write(file_name, file_bytes)

# do test
print("do test")
do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt)

# clean up local files
print("clean up local files in /tmp")
for idx, _ in enumerate(formatted_files):
file_name = _posix(idx)
op.delete(file_name)
1 change: 0 additions & 1 deletion e2e_test/s3/run_csv.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import string
import json
import string
from time import sleep
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source/opendal/data/data1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
carat,cut,color,depth
0.25,Ideal,E,61.4
0.22,Premium,I,62.0
0.28,Good,J,63.1
0.23,Very Good,H,57.5
0.30,Fair,E,64.7
6 changes: 6 additions & 0 deletions e2e_test/source/opendal/data/data2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
carat,cut,color,depth
1.25,Ideal,E,61.4
1.22,Premium,I,62.0
1.28,Good,J,63.1
1.23,Very Good,H,57.5
1.30,Fair,E,64.7
33 changes: 33 additions & 0 deletions e2e_test/source/opendal/posix_fs.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE TABLE diamonds (
carat FLOAT,
cut TEXT,
color TEXT,
depth FLOAT,
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s

query TTTT rowsort
select * from diamonds;
----
0.22 Premium I 62
0.23 Very Good H 57.5
0.25 Ideal E 61.4
0.28 Good J 63.1
0.3 Fair E 64.7
1.22 Premium I 62
1.23 Very Good H 57.5
1.25 Ideal E 61.4
1.28 Good J 63.1
1.3 Fair E 64.7

statement ok
DROP TABLE diamonds;
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ macro_rules! for_all_classified_sources {
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
}
$(
Expand Down
12 changes: 9 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use super::{OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
Expand Down Expand Up @@ -386,14 +386,20 @@ impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
})
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
})
.unwrap_or(false)
}
}
Expand Down
41 changes: 41 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

pub mod gcs_source;
pub mod posix_fs_source;
pub mod s3_source;

use serde::Deserialize;
Expand All @@ -31,6 +32,7 @@ use crate::source::{SourceProperties, UnknownFields};
pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
pub const POSIX_FS_CONNECTOR: &str = "posix_fs";

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
Expand Down Expand Up @@ -89,6 +91,17 @@ impl OpendalSource for OpendalGcs {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalPosixFs;

impl OpendalSource for OpendalPosixFs {
type Properties = PosixFsProperties;

fn new_enumerator(properties: Self::Properties) -> anyhow::Result<OpendalEnumerator<Self>> {
OpendalEnumerator::new_posix_fs_source(properties)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct OpendalS3Properties {
#[serde(flatten)]
Expand All @@ -115,3 +128,31 @@ impl SourceProperties for OpendalS3Properties {

const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct PosixFsProperties {
// The root directly of the files to search. The files will be searched recursively.
#[serde(rename = "posix_fs.root")]
pub root: String,

// The regex pattern to match files under root directory.
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl UnknownFields for PosixFsProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl SourceProperties for PosixFsProperties {
type Split = OpendalFsSplit<OpendalPosixFs>;
type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
type SplitReader = OpendalReader<OpendalPosixFs>;

const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Fs;
use opendal::Operator;

use super::opendal_enumerator::OpendalEnumerator;
use super::{OpendalSource, PosixFsProperties};

// Posix fs source should only be used for testing.
// For a single-CN cluster, the behavior is well-defined. It will read from the local file system.
// For a multi-CN cluster, each CN will read from its own local file system under the given directory.

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal posix fs source.
pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> anyhow::Result<Self> {
// Create Fs builder.
let mut builder = Fs::default();

builder.root(&posix_fs_properties.root);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();

let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() {
// TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal)
// So we assign prefix to empty string.
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(String::new()), Some(matcher))
} else {
(None, None)
};
Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
})
}
}
Loading

0 comments on commit 1795e6c

Please sign in to comment.