Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): introduce gcs source and new s3 source via OpenDAL #13414

Merged
merged 57 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
eb7b035
save work
wcy-fdu Oct 24, 2023
02f63ee
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Oct 31, 2023
4fa7226
save work
wcy-fdu Oct 31, 2023
2871d05
save work
wcy-fdu Oct 31, 2023
7a5321d
save work
wcy-fdu Nov 1, 2023
3023575
implement lister, reader wi[
wcy-fdu Nov 2, 2023
3a7bdef
save work
wcy-fdu Nov 6, 2023
6b5a435
save work
wcy-fdu Nov 7, 2023
28e7286
implement opendal reader
wcy-fdu Nov 9, 2023
0314c2a
minor
wcy-fdu Nov 9, 2023
bb62cc2
introduce OpenDALConnectorTypeMarker
wcy-fdu Nov 13, 2023
ee2f87a
can apply gcs and s3
wcy-fdu Nov 13, 2023
c435b10
make clippy happy
wcy-fdu Nov 13, 2023
536bed4
bring new s3 source
wcy-fdu Nov 14, 2023
e52a599
minor
wcy-fdu Nov 14, 2023
078f506
can use opendal s3 as source, todo: add filter for prefix
wcy-fdu Nov 17, 2023
98de2c5
remove origin s3_v2, and use opendal as new s3_v2
wcy-fdu Nov 21, 2023
743226f
add credential
wcy-fdu Nov 23, 2023
4c774eb
fix list
wcy-fdu Nov 23, 2023
a303fc3
use stream read
wcy-fdu Nov 24, 2023
64f5f24
add python test script
wcy-fdu Nov 24, 2023
a0dc1cd
minor
wcy-fdu Nov 24, 2023
f37a0af
ready for review
wcy-fdu Nov 24, 2023
4ce0a99
format python file
wcy-fdu Nov 24, 2023
b8792b8
fmt
wcy-fdu Nov 27, 2023
e44558b
fmt
wcy-fdu Nov 27, 2023
fdcd292
fmt
wcy-fdu Nov 27, 2023
0ded6e0
strange fmt
wcy-fdu Nov 27, 2023
8d5dc2c
Merge branch 'main' into wcy/gcs_source
wcy-fdu Nov 27, 2023
b762319
add e2e test for gcs source in main cron
wcy-fdu Nov 28, 2023
6d08617
Merge branch 'wcy/gcs_source' of https://github.com/risingwavelabs/ri…
wcy-fdu Nov 28, 2023
1bf66be
install opendal in python
wcy-fdu Nov 28, 2023
0e6fa98
install opendal in python
wcy-fdu Nov 28, 2023
7682ce2
fix python scripts
wcy-fdu Nov 29, 2023
de9f1bf
fix python scripts
wcy-fdu Nov 29, 2023
a29a574
Merge branch 'main' into wcy/gcs_source
wcy-fdu Nov 29, 2023
f41cc59
resolve some comments
wcy-fdu Nov 30, 2023
bacf0af
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Nov 30, 2023
52a95f1
gcs source supports csv, add assume role for s3
wcy-fdu Nov 30, 2023
1a08aa7
resolve some comments
wcy-fdu Dec 4, 2023
8453836
resolve some comments
wcy-fdu Dec 5, 2023
5081ee7
refactor: refine OpendalSource types
stdrc Dec 6, 2023
b7c26da
resolve some comments
wcy-fdu Dec 6, 2023
1bab9fc
minor
wcy-fdu Dec 7, 2023
a352b0c
remove Timestamp
wcy-fdu Dec 7, 2023
7beee7c
minor
wcy-fdu Dec 7, 2023
e04e46f
add github label
wcy-fdu Dec 8, 2023
a4188dd
rebase main
wcy-fdu Dec 11, 2023
40ac24b
use prefix list
wcy-fdu Dec 11, 2023
c05c22e
Fix "cargo-hakari"
wcy-fdu Dec 11, 2023
cdce5db
rerun
wcy-fdu Dec 14, 2023
ff1084c
Merge branch 'wcy/gcs_source' of https://github.com/risingwavelabs/ri…
wcy-fdu Dec 14, 2023
3ecb61c
can merge
wcy-fdu Dec 15, 2023
b320bac
minor
wcy-fdu Dec 15, 2023
a09f116
resolve conflict
wcy-fdu Dec 18, 2023
6e3b492
Fix "cargo-hakari"
wcy-fdu Dec 18, 2023
dd22a8d
Merge branch 'main' into wcy/gcs_source
wcy-fdu Dec 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-1cn-1fe

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 -m pip install minio psycopg2-binary opendal
python3 e2e_test/s3/$script

echo "--- Kill cluster"
Expand Down
17 changes: 17 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,23 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "GCS source check on AWS (json parser)"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test passed in gcp machine but fail in out CI, I guess it's because the gateway/network issue while doing credential check, so I suggest only leave s3 source in CI.
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huangjw806 any solution for testing GCS?

Copy link
Contributor

@huangjw806 huangjw806 Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcloud auth activate-service-account

Maybe you need to auth gcloud with service account key.

wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'gcs_source.py json'"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
GCS_SOURCE_TEST_CONF: ci_gcs_source_test_gcp
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- GCS_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on AWS (csv parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'fs_source_v2.py csv_without_header'"
if: |
Expand Down
130 changes: 130 additions & 0 deletions e2e_test/s3/gcs_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os
import sys
import csv
import json
import random
import psycopg2
import opendal

from time import sleep
from io import StringIO
from functools import partial

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def format_json(data):
return [
'\n'.join([json.dumps(item) for item in file])
for file in data
]


def do_test(config, file_num, item_num_per_file, prefix, fmt, credential):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return f'gcs_test_{fmt}'

def _encode():
return 'JSON'

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id int,
name TEXT,
sex int,
mark int,
) WITH (
connector = 'gcs',
match_pattern = '{prefix}*.{fmt}',
gcs.bucket_name = '{config['GCS_BUCKET']}',
gcs.credentials = '{credential}',
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s")
sleep(30)

stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)
_assert_eq('sum(sex)', result[2], total_rows / 2)
_assert_eq('sum(mark)', result[3], 0)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 4001
ITEM_NUM_PER_FILE = 2
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

fmt = sys.argv[1]
FORMATTER = {
'json': format_json,
}
assert fmt in FORMATTER, f"Unsupported format: {fmt}"
formatted_files = FORMATTER[fmt](data)

config = json.loads(os.environ["GCS_SOURCE_TEST_CONF"])
run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.{fmt}'
_gcs = lambda idx: f"{run_id}_data_{idx}.{fmt}"
credential_str = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS"])
# put gcs files
op = opendal.Operator("gcs", root="/", bucket=config["GCS_BUCKET"], credential=credential_str)

print("upload file to gcs")
for idx, file_str in enumerate(formatted_files):
with open(_local(idx), "w") as f:
f.write(file_str)
os.fsync(f.fileno())
file_bytes = file_str.encode('utf-8')
op.write(_gcs(idx), file_bytes)

# do test
print("do test")
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt, credential_str)

# clean up gcs files
print("clean up gcs files")
for idx, _ in enumerate(formatted_files):
op.delete(_gcs(idx))
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mysql_common = { version = "0.31", default-features = false, features = [
] }
nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
opendal = "0.41"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ macro_rules! for_all_classified_sources {
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::GcsProperties> },
{ OpenDalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3Properties> },
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
}
$(
Expand Down
60 changes: 34 additions & 26 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::{FsPageItem, S3Properties, S3_V2_CONNECTOR};
use crate::source::filesystem::opendal_source::OpendalS3Properties;
use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties};
use crate::source::monitor::EnumeratorMetrics;
use crate::source::S3_CONNECTOR;
use crate::{
dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties,
impl_split, match_source_name_str,
Expand Down Expand Up @@ -75,7 +76,7 @@ impl<P: DeserializeOwned> TryFromHashmap for P {
}
}

pub async fn create_split_reader<P: SourceProperties>(
pub async fn create_split_reader<P: SourceProperties + std::fmt::Debug>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
Expand Down Expand Up @@ -364,44 +365,51 @@ impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(props: &BTreeMap<String, String>) -> bool {
props
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(S3_V2_CONNECTOR))
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.unwrap_or(false)
|| props
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(GCS_CONNECTOR))
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(props: &HashMap<String, String>) -> bool {
props
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(S3_V2_CONNECTOR))
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.unwrap_or(false)
}

pub fn rewrite_upstream_source_key_hash_map(props: &mut HashMap<String, String>) {
let connector = props.remove(UPSTREAM_SOURCE_KEY).unwrap();
match connector.as_str() {
S3_V2_CONNECTOR => {
tracing::info!(
"using new fs source, rewrite connector from '{}' to '{}'",
S3_V2_CONNECTOR,
S3_CONNECTOR
);
props.insert(UPSTREAM_SOURCE_KEY.to_string(), S3_CONNECTOR.to_string());
}
_ => {
props.insert(UPSTREAM_SOURCE_KEY.to_string(), connector);
}
}
|| props
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(GCS_CONNECTOR))
.unwrap_or(false)
}
}

impl ConnectorProperties {
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
if Self::is_new_fs_connector_hash_map(&props) {
_ = props
let connector = props
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
return Ok(ConnectorProperties::S3(Box::new(
S3Properties::try_from_hashmap(props)?,
)));
match connector.as_str() {
"s3_v2" => {
let assume_role = props.get("s3.assume_role").cloned();
return Ok(ConnectorProperties::OpenDalS3(Box::new(
OpendalS3Properties {
s3_properties: S3Properties::try_from_hashmap(props)?,
assume_role,
},
)));
}
"gcs" => {
return Ok(ConnectorProperties::Gcs(Box::new(
GcsProperties::try_from_hashmap(props)?,
)));
}
_ => {
unreachable!()
}
}
}

let connector = props
Expand Down
68 changes: 68 additions & 0 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;

use anyhow::anyhow;
use aws_sdk_s3::types::Object;
use risingwave_common::types::{JsonbVal, Timestamp};
Expand Down Expand Up @@ -67,6 +71,70 @@ impl FsSplit {
}
}

/// [`OpendalFsSplit`] Describes a file or a split of a file. A file is a generic concept,
/// and can be a local file, a distributed file system, or am object in S3 bucket.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct OpendalFsSplit<C>
where
C: Send + Clone + 'static,
{
pub name: String,
pub offset: usize,
pub size: usize,
marker: PhantomData<C>,
}

impl<C> From<&Object> for OpendalFsSplit<C>
where
C: Send + Clone + 'static,
{
fn from(value: &Object) -> Self {
Self {
name: value.key().unwrap().to_owned(),
offset: 0,
size: value.size().unwrap_or_default() as usize,
marker: PhantomData,
}
}
}

impl<C> SplitMetaData for OpendalFsSplit<C>
where
C: Sized + Send + Clone + 'static,
{
fn id(&self) -> SplitId {
self.name.as_str().into()
}

fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}

fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> {
let offset = start_offset.parse().unwrap();
self.offset = offset;
Ok(())
}
}

impl<C> OpendalFsSplit<C>
where
C: Send + Clone + 'static,
{
pub fn new(name: String, start: usize, size: usize) -> Self {
Self {
name,
offset: start,
size,
marker: PhantomData,
}
}
}

#[derive(Clone, Debug)]
pub struct FsPageItem {
pub name: String,
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use opendal_source::GcsProperties;
pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};

mod file_common;
pub mod file_common;
pub mod nd_streaming;
pub use file_common::{FsPage, FsPageItem, FsSplit};
pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit};
pub mod opendal_source;
mod s3;
pub mod s3_v2;
pub const S3_V2_CONNECTOR: &str = "s3_v2";
Loading
Loading