diff --git a/Cargo.lock b/Cargo.lock index 5414d6a4e991d..ebaceffb0dac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4706,7 +4706,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal", + "opendal 0.43.0 (registry+https://github.com/rust-lang/crates.io-index)", "ordered-float 3.9.1", "parquet 49.0.0", "prometheus", @@ -6206,6 +6206,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "opendal" +version = "0.43.0" +source = "git+https://github.com/apache/incubator-opendal?rev=9a222e4d72b328a24d5775b1565292f4636bbe69#9a222e4d72b328a24d5775b1565292f4636bbe69" +dependencies = [ + "anyhow", + "async-compat", + "async-trait", + "backon", + "base64 0.21.4", + "bytes", + "chrono", + "flagset", + "futures", + "http 0.2.9", + "log", + "md-5", + "once_cell", + "parking_lot 0.12.1", + "percent-encoding", + "pin-project", + "quick-xml 0.30.0", + "reqsign", + "reqwest", + "serde", + "serde_json", + "sha2", + "tokio", + "uuid", +] + [[package]] name = "openidconnect" version = "3.4.0" @@ -8469,6 +8500,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", + "opendal 0.43.0 (git+https://github.com/apache/incubator-opendal?rev=9a222e4d72b328a24d5775b1565292f4636bbe69)", "parking_lot 0.12.1", "paste", "pretty_assertions", @@ -9029,7 +9061,7 @@ dependencies = [ "itertools 0.12.0", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal", + "opendal 0.43.0 (registry+https://github.com/rust-lang/crates.io-index)", "prometheus", "risingwave_common", "rustls", @@ -12375,6 +12407,7 @@ dependencies = [ "futures-task", "futures-util", "generic-array", + "getrandom", "governor", "hashbrown 0.12.3", "hashbrown 0.14.0", diff --git a/ci/scripts/s3-source-test.sh b/ci/scripts/s3-source-test.sh index 9fce76f000e31..364883bb90781 100755 --- a/ci/scripts/s3-source-test.sh +++ b/ci/scripts/s3-source-test.sh @@ -29,7 +29,7 @@ echo "--- starting risingwave cluster with connector node" cargo make ci-start ci-1cn-1fe echo "--- Run test" -python3 -m pip install minio psycopg2-binary +python3 -m pip install minio psycopg2-binary opendal python3 e2e_test/s3/$script echo "--- Kill cluster" diff --git a/e2e_test/s3/gcs_source.py b/e2e_test/s3/gcs_source.py new file mode 100644 index 0000000000000..c917f2c2d33fd --- /dev/null +++ b/e2e_test/s3/gcs_source.py @@ -0,0 +1,130 @@ +import os +import sys +import csv +import json +import random +import psycopg2 +import opendal + +from time import sleep +from io import StringIO +from functools import partial + +def gen_data(file_num, item_num_per_file): + assert item_num_per_file % 2 == 0, \ + f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + return [ + [{ + 'id': file_id * item_num_per_file + item_id, + 'name': f'{file_id}_{item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + } for item_id in range(item_num_per_file)] + for file_id in range(file_num) + ] + +def format_json(data): + return [ + '\n'.join([json.dumps(item) for item in file]) + for file in data + ] + + +def do_test(config, file_num, item_num_per_file, prefix, fmt, credential): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f'gcs_test_{fmt}' + + def _encode(): + return 'JSON' + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE {_table()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 'gcs', + match_pattern = '{prefix}*.{fmt}', + gcs.bucket_name = '{config['GCS_BUCKET']}', + gcs.credentials = '{credential}', + ) FORMAT PLAIN ENCODE {_encode()};''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_table()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") + sleep(30) + + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) + + print('Test pass') + + cur.execute(f'drop table {_table()}') + cur.close() + conn.close() + + +if __name__ == "__main__": + FILE_NUM = 4001 + ITEM_NUM_PER_FILE = 2 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + + fmt = sys.argv[1] + FORMATTER = { + 'json': format_json, + } + assert fmt in FORMATTER, f"Unsupported format: {fmt}" + formatted_files = FORMATTER[fmt](data) + + config = json.loads(os.environ["GCS_SOURCE_TEST_CONF"]) + run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.{fmt}' + _gcs = lambda idx: f"{run_id}_data_{idx}.{fmt}" + credential_str = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS"]) + # put gcs files + op = opendal.Operator("gcs", root="/", bucket=config["GCS_BUCKET"], credential=credential_str) + + print("upload file to gcs") + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + file_bytes = file_str.encode('utf-8') + op.write(_gcs(idx), file_bytes) + + # do test + print("do test") + do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt, credential_str) + + # clean up gcs files + print("clean up gcs files") + for idx, _ in enumerate(formatted_files): + op.delete(_gcs(idx)) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 1d7ad5ef58f77..0d2c88e84aa49 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -79,6 +79,7 @@ mysql_common = { version = "0.31", default-features = false, features = [ ] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" +opendal = { git = "https://github.com/apache/incubator-opendal", rev = "9a222e4d72b328a24d5775b1565292f4636bbe69" } parking_lot = "0.12" paste = "1" prometheus = { version = "0.13", features = ["process"] } diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index fdc3ed8867297..d777faf9bb2cc 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -23,6 +23,7 @@ macro_rules! for_all_classified_sources { { Citus } }, // other sources + // todo: file source do not nest with mq source. { { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit }, { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit }, @@ -32,6 +33,8 @@ macro_rules! for_all_classified_sources { { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit }, { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit }, { S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }, + { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, + { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit} } $( diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8c80afe8d577f..8db65bb9681bf 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -42,11 +42,12 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; +use super::OPENDAL_S3_CONNECTOR; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; -use crate::source::filesystem::{FsPageItem, S3Properties, S3_V2_CONNECTOR}; +use crate::source::filesystem::opendal_source::OpendalS3Properties; +use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties}; use crate::source::monitor::EnumeratorMetrics; -use crate::source::S3_CONNECTOR; use crate::{ dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, impl_split, match_source_name_str, @@ -78,7 +79,7 @@ impl TryFromHashmap for P { } } -pub async fn create_split_reader( +pub async fn create_split_reader( prop: P, splits: Vec, parser_config: ParserConfig, @@ -367,44 +368,43 @@ impl ConnectorProperties { pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(S3_V2_CONNECTOR)) + .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) .unwrap_or(false) } pub fn is_new_fs_connector_hash_map(with_properties: &HashMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(S3_V2_CONNECTOR)) + .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) .unwrap_or(false) } - - pub fn rewrite_upstream_source_key_hash_map(with_properties: &mut HashMap) { - let connector = with_properties.remove(UPSTREAM_SOURCE_KEY).unwrap(); - match connector.as_str() { - S3_V2_CONNECTOR => { - tracing::info!( - "using new fs source, rewrite connector from '{}' to '{}'", - S3_V2_CONNECTOR, - S3_CONNECTOR - ); - with_properties.insert(UPSTREAM_SOURCE_KEY.to_string(), S3_CONNECTOR.to_string()); - } - _ => { - with_properties.insert(UPSTREAM_SOURCE_KEY.to_string(), connector); - } - } - } } impl ConnectorProperties { pub fn extract(mut with_properties: HashMap) -> Result { if Self::is_new_fs_connector_hash_map(&with_properties) { - _ = with_properties + let connector = with_properties .remove(UPSTREAM_SOURCE_KEY) .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; - return Ok(ConnectorProperties::S3(Box::new( - S3Properties::try_from_hashmap(with_properties)?, - ))); + match connector.as_str() { + "s3_v2" => { + let assume_role = with_properties.get("s3.assume_role").cloned(); + return Ok(ConnectorProperties::OpendalS3(Box::new( + OpendalS3Properties { + s3_properties: S3Properties::try_from_hashmap(with_properties)?, + assume_role, + }, + ))); + } + "gcs" => { + return Ok(ConnectorProperties::Gcs(Box::new( + GcsProperties::try_from_hashmap(with_properties)?, + ))); + } + _ => { + unreachable!() + } + } } let connector = with_properties diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index ef47724e92233..a865ee7e4b3ed 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -11,11 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; + use anyhow::anyhow; use aws_sdk_s3::types::Object; -use risingwave_common::types::{JsonbVal, Timestamp}; +use risingwave_common::types::{JsonbVal, Timestamptz}; use serde::{Deserialize, Serialize}; +use super::opendal_source::OpendalSource; use crate::source::{SplitId, SplitMetaData}; /// [`FsSplit`] Describes a file or a split of a file. A file is a generic concept, @@ -67,22 +72,72 @@ impl FsSplit { } } -#[derive(Clone, Debug)] -pub struct FsPageItem { +/// [`OpendalFsSplit`] Describes a file or a split of a file. A file is a generic concept, +/// and can be a local file, a distributed file system, or am object in S3 bucket. +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct OpendalFsSplit { pub name: String, - pub size: i64, - pub timestamp: Timestamp, + pub offset: usize, + pub size: usize, + _marker: PhantomData, } -pub type FsPage = Vec; - -impl From<&Object> for FsPageItem { +impl From<&Object> for OpendalFsSplit { fn from(value: &Object) -> Self { - let aws_ts = value.last_modified().unwrap(); Self { name: value.key().unwrap().to_owned(), - size: value.size().unwrap_or_default(), - timestamp: Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()), + offset: 0, + size: value.size().unwrap_or_default() as usize, + _marker: PhantomData, } } } + +impl SplitMetaData for OpendalFsSplit { + fn id(&self) -> SplitId { + self.name.as_str().into() + } + + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) + } + + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() + } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + let offset = start_offset.parse().unwrap(); + self.offset = offset; + Ok(()) + } +} + +impl OpendalFsSplit { + pub fn new(name: String, start: usize, size: usize) -> Self { + Self { + name, + offset: start, + size, + _marker: PhantomData, + } + } + + pub fn empty_split() -> Self { + Self { + name: "empty_split".to_string(), + offset: 0, + size: 0, + _marker: PhantomData, + } + } +} + +#[derive(Clone, Debug)] +pub struct FsPageItem { + pub name: String, + pub size: i64, + pub timestamp: Timestamptz, +} + +pub type FsPage = Vec; diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index 8f2587384280b..e7c2a4fa72288 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub use opendal_source::GcsProperties; pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; - -mod file_common; +pub mod file_common; pub mod nd_streaming; -pub use file_common::{FsPage, FsPageItem, FsSplit}; +pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit}; +pub mod opendal_source; mod s3; pub mod s3_v2; -pub const S3_V2_CONNECTOR: &str = "s3_v2"; diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs new file mode 100644 index 0000000000000..d5f303fbe1b5f --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -0,0 +1,63 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; + +use anyhow::Context; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Gcs; +use opendal::Operator; + +use super::opendal_enumerator::OpendalEnumerator; +use super::{GcsProperties, OpendalSource}; +use crate::source::filesystem::s3::enumerator::get_prefix; + +impl OpendalEnumerator { + /// create opendal gcs source. + pub fn new_gcs_source(gcs_properties: GcsProperties) -> anyhow::Result { + // Create gcs builder. + let mut builder = Gcs::default(); + + builder.bucket(&gcs_properties.bucket_name); + + // if credential env is set, use it. Otherwise, ADC will be used. + let cred = gcs_properties.credential; + if let Some(cred) = cred { + builder.credential(&cred); + } + + if let Some(service_account) = gcs_properties.service_account { + builder.service_account(&service_account); + } + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + let (prefix, matcher) = if let Some(pattern) = gcs_properties.match_pattern.as_ref() { + let prefix = get_prefix(pattern); + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(prefix), Some(matcher)) + } else { + (None, None) + }; + Ok(Self { + op, + prefix, + matcher, + marker: PhantomData, + }) + } +} diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs new file mode 100644 index 0000000000000..d72f94badb85c --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -0,0 +1,94 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod gcs_source; + +pub use gcs_source::*; +pub mod s3_source; +pub use s3_source::*; +use serde::Deserialize; +pub mod opendal_enumerator; +pub mod opendal_reader; + +use self::opendal_enumerator::OpendalEnumerator; +use self::opendal_reader::OpendalReader; +use super::{OpendalFsSplit, S3Properties}; +use crate::source::SourceProperties; + +pub const GCS_CONNECTOR: &str = "gcs"; +// The new s3_v2 will use opendal. +pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; + +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct GcsProperties { + #[serde(rename = "gcs.bucket_name")] + pub bucket_name: String, + #[serde(rename = "gcs.credential")] + pub credential: Option, + #[serde(rename = "gcs.service_account", default)] + pub service_account: Option, + #[serde(rename = "match_pattern", default)] + pub match_pattern: Option, +} + +impl SourceProperties for GcsProperties { + type Split = OpendalFsSplit; + type SplitEnumerator = OpendalEnumerator; + type SplitReader = OpendalReader; + + const SOURCE_NAME: &'static str = GCS_CONNECTOR; +} + +pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq { + type Properties: SourceProperties + Send + Sync; + + fn new_enumerator(properties: Self::Properties) -> anyhow::Result>; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OpendalS3; + +impl OpendalSource for OpendalS3 { + type Properties = OpendalS3Properties; + + fn new_enumerator(properties: Self::Properties) -> anyhow::Result> { + OpendalEnumerator::new_s3_source(properties.s3_properties, properties.assume_role) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OpendalGcs; + +impl OpendalSource for OpendalGcs { + type Properties = GcsProperties; + + fn new_enumerator(properties: Self::Properties) -> anyhow::Result> { + OpendalEnumerator::new_gcs_source(properties) + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct OpendalS3Properties { + pub s3_properties: S3Properties, + #[serde(rename = "s3.assume_role", default)] + pub assume_role: Option, +} + +impl SourceProperties for OpendalS3Properties { + type Split = OpendalFsSplit; + type SplitEnumerator = OpendalEnumerator; + type SplitReader = OpendalReader; + + const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR; +} diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs new file mode 100644 index 0000000000000..1a10cdef1309b --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -0,0 +1,103 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use futures::stream::{self, BoxStream}; +use futures::StreamExt; +use opendal::{Metakey, Operator}; +use risingwave_common::types::Timestamptz; + +use super::OpendalSource; +use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; + +#[derive(Debug, Clone)] +pub struct OpendalEnumerator { + pub(crate) op: Operator, + // prefix is used to reduce the number of objects to be listed + pub(crate) prefix: Option, + pub(crate) matcher: Option, + pub(crate) marker: PhantomData, +} + +#[async_trait] +impl SplitEnumerator for OpendalEnumerator { + type Properties = Src::Properties; + type Split = OpendalFsSplit; + + async fn new( + properties: Src::Properties, + _context: SourceEnumeratorContextRef, + ) -> anyhow::Result { + Src::new_enumerator(properties) + } + + async fn list_splits(&mut self) -> anyhow::Result>> { + let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); + + Ok(vec![empty_split]) + } +} + +impl OpendalEnumerator { + pub async fn list(&self) -> anyhow::Result { + let prefix = match &self.prefix { + Some(prefix) => prefix, + None => "", + }; + + let object_lister = self + .op + .lister_with(prefix) + .recursive(true) + .metakey(Metakey::ContentLength | Metakey::LastModified) + .await?; + let stream = stream::unfold(object_lister, |mut object_lister| async move { + match object_lister.next().await { + Some(Ok(object)) => { + let name = object.path().to_string(); + let om = object.metadata(); + + let t = match om.last_modified() { + Some(t) => t, + None => DateTime::::from_timestamp(0, 0).unwrap_or_default(), + }; + let timestamp = Timestamptz::from(t); + let size = om.content_length() as i64; + let metadata = FsPageItem { + name, + size, + timestamp, + }; + Some((Ok(metadata), object_lister)) + } + Some(Err(err)) => Some((Err(err.into()), object_lister)), + None => { + tracing::info!("list object completed."); + None + } + } + }); + + Ok(stream.boxed()) + } + + pub fn get_matcher(&self) -> &Option { + &self.matcher + } +} +pub type ObjectMetadataIter = BoxStream<'static, anyhow::Result>; diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs new file mode 100644 index 0000000000000..d2758ba9bb0ef --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -0,0 +1,166 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::{Ok, Result}; +use async_trait::async_trait; +use futures::TryStreamExt; +use futures_async_stream::try_stream; +use opendal::Operator; +use risingwave_common::error::RwError; +use tokio::io::BufReader; +use tokio_util::io::{ReaderStream, StreamReader}; + +use super::opendal_enumerator::OpendalEnumerator; +use super::OpendalSource; +use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; +use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; +use crate::source::{ + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, + SplitReader, StreamChunkWithState, +}; + +const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; +const STREAM_READER_CAPACITY: usize = 4096; +#[derive(Debug, Clone)] +pub struct OpendalReader { + connector: OpendalEnumerator, + splits: Vec>, + parser_config: ParserConfig, + source_ctx: SourceContextRef, +} +#[async_trait] +impl SplitReader for OpendalReader { + type Properties = Src::Properties; + type Split = OpendalFsSplit; + + async fn new( + properties: Src::Properties, + splits: Vec>, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + _columns: Option>, + ) -> Result { + let connector = Src::new_enumerator(properties)?; + let opendal_reader = OpendalReader { + connector, + splits, + parser_config, + source_ctx, + }; + Ok(opendal_reader) + } + + fn into_stream(self) -> BoxSourceWithStateStream { + self.into_chunk_stream() + } +} + +impl OpendalReader { + #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + async fn into_chunk_stream(self) { + for split in self.splits { + let actor_id = self.source_ctx.source_info.actor_id.to_string(); + let source_id = self.source_ctx.source_info.source_id.to_string(); + let source_ctx = self.source_ctx.clone(); + + let split_id = split.id(); + + let data_stream = + Self::stream_read_object(self.connector.op.clone(), split, self.source_ctx.clone()); + + let parser = + ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; + let msg_stream = if matches!( + parser, + ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) + ) { + parser.into_stream(nd_streaming::split_stream(data_stream)) + } else { + parser.into_stream(data_stream) + }; + #[for_await] + for msg in msg_stream { + let msg = msg?; + self.source_ctx + .metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(msg.chunk.cardinality() as u64); + yield msg; + } + } + } + + #[try_stream(boxed, ok = Vec, error = anyhow::Error)] + pub async fn stream_read_object( + op: Operator, + split: OpendalFsSplit, + source_ctx: SourceContextRef, + ) { + let actor_id = source_ctx.source_info.actor_id.to_string(); + let source_id = source_ctx.source_info.source_id.to_string(); + let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size; + let split_id = split.id(); + + let object_name = split.name.clone(); + + let reader = op + .reader_with(&object_name) + .range(split.offset as u64..) + .await?; + + let stream_reader = StreamReader::new( + reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + let buf_reader = BufReader::new(stream_reader); + let stream = ReaderStream::with_capacity(buf_reader, STREAM_READER_CAPACITY); + + let mut offset: usize = split.offset; + let mut batch_size: usize = 0; + let mut batch = Vec::new(); + #[for_await] + for read in stream { + let bytes = read?; + let len = bytes.len(); + let msg = SourceMessage { + key: None, + payload: Some(bytes.as_ref().to_vec()), + offset: offset.to_string(), + split_id: split.id(), + meta: SourceMeta::Empty, + }; + offset += len; + batch_size += len; + batch.push(msg); + if batch.len() >= max_chunk_size { + source_ctx + .metrics + .partition_input_bytes + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(batch_size as u64); + let yield_batch = std::mem::take(&mut batch); + batch_size = 0; + yield yield_batch; + } + } + if !batch.is_empty() { + source_ctx + .metrics + .partition_input_bytes + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(batch_size as u64); + yield batch; + } + } +} diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs new file mode 100644 index 0000000000000..22bac20be8dc0 --- /dev/null +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -0,0 +1,88 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; + +use anyhow::Context; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::Operator; + +use super::opendal_enumerator::OpendalEnumerator; +use super::OpendalSource; +use crate::source::filesystem::s3::enumerator::get_prefix; +use crate::source::filesystem::S3Properties; + +impl OpendalEnumerator { + /// create opendal s3 source. + pub fn new_s3_source( + s3_properties: S3Properties, + assume_role: Option, + ) -> anyhow::Result { + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&s3_properties.bucket_name); + builder.region(&s3_properties.region_name); + + if let Some(endpoint_url) = s3_properties.endpoint_url { + builder.endpoint(&endpoint_url); + } + + if let Some(access) = s3_properties.access { + builder.access_key_id(&access); + } else { + tracing::error!( + "access key id of aws s3 is not set, bucket {}", + s3_properties.bucket_name + ); + } + + if let Some(secret) = s3_properties.secret { + builder.secret_access_key(&secret); + } else { + tracing::error!( + "secret access key of aws s3 is not set, bucket {}", + s3_properties.bucket_name + ); + } + + builder.enable_virtual_host_style(); + + if let Some(assume_role) = assume_role { + builder.role_arn(&assume_role); + } + + builder.disable_config_load(); + let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() { + let prefix = get_prefix(pattern); + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(prefix), Some(matcher)) + } else { + (None, None) + }; + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(Self { + op, + prefix, + matcher, + marker: PhantomData, + }) + } +} diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index 517afd2dfe411..c60f8994f76f1 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -mod enumerator; +pub mod enumerator; pub use enumerator::S3SplitEnumerator; mod source; @@ -24,7 +24,7 @@ use crate::source::SourceProperties; pub const S3_CONNECTOR: &str = "s3"; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, PartialEq)] pub struct S3Properties { #[serde(rename = "s3.region_name")] pub region_name: String, diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index e4132ee615d3f..4d51dbc4d2b44 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -124,9 +124,9 @@ impl S3FileReader { .partition_input_bytes .with_label_values(&[&actor_id, &source_id, &split_id]) .inc_by(batch_size as u64); + let yield_batch = std::mem::take(&mut batch); batch_size = 0; - yield batch.clone(); - batch.clear(); + yield yield_batch; } } if !batch.is_empty() { diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 7bdf0602efa3a..ca62c76789873 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -39,7 +39,8 @@ pub mod test_source; pub use manager::{SourceColumnDesc, SourceColumnType}; pub use mock_external_table::MockExternalTableReader; -pub use crate::source::filesystem::{S3_CONNECTOR, S3_V2_CONNECTOR}; +pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR}; +pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; pub use crate::source::pulsar::PULSAR_CONNECTOR; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 8b285f6359752..a04627cf2b5ad 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -43,8 +43,8 @@ use risingwave_connector::source::external::CdcTableType; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ - GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, - PULSAR_CONNECTOR, S3_CONNECTOR, S3_V2_CONNECTOR, + GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, + NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, @@ -1007,7 +1007,10 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Csv, Encode::Json], ), - S3_V2_CONNECTOR => hashmap!( + OPENDAL_S3_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv, Encode::Json], + ), + GCS_CONNECTOR => hashmap!( Format::Plain => vec![Encode::Csv, Encode::Json], ), MYSQL_CDC_CONNECTOR => hashmap!( diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index db2e0817b450b..1b44e434842ec 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -182,7 +182,7 @@ impl LogicalSource { column_desc: ColumnDesc::from_field_with_column_id( &Field { name: "last_edit_time".to_string(), - data_type: DataType::Timestamp, + data_type: DataType::Timestamptz, sub_fields: vec![], type_name: "".to_string(), }, diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index b5388e3390aab..46e28231a23e3 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -74,7 +74,7 @@ impl ObjectStore for OpendalObjectStore { async fn streaming_upload(&self, path: &str) -> ObjectResult { Ok(Box::new( - OpenDalStreamingUploader::new(self.op.clone(), path.to_string()).await?, + OpendalStreamingUploader::new(self.op.clone(), path.to_string()).await?, )) } @@ -118,7 +118,7 @@ impl ObjectStore for OpendalObjectStore { let reader = self.op.reader_with(path).range(range).await?; let stream = reader .into_stream() - .map(|item| item.map_err(|e| ObjectError::internal(format!("OpenDalError: {:?}", e)))); + .map(|item| item.map_err(|e| ObjectError::internal(format!("OpendalError: {:?}", e)))); Ok(Box::pin(stream)) } @@ -229,10 +229,10 @@ impl OpendalObjectStore { } /// Store multiple parts in a map, and concatenate them on finish. -pub struct OpenDalStreamingUploader { +pub struct OpendalStreamingUploader { writer: Writer, } -impl OpenDalStreamingUploader { +impl OpendalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?; Ok(Self { writer }) @@ -242,7 +242,7 @@ impl OpenDalStreamingUploader { const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[async_trait::async_trait] -impl StreamingUploader for OpenDalStreamingUploader { +impl StreamingUploader for OpendalStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { self.writer.write(data).await?; Ok(()) diff --git a/src/risedevtool/src/bin/risedev-compose.rs b/src/risedevtool/src/bin/risedev-compose.rs index 63925d919bb2b..7f3df20893438 100644 --- a/src/risedevtool/src/bin/risedev-compose.rs +++ b/src/risedevtool/src/bin/risedev-compose.rs @@ -203,7 +203,7 @@ fn main() -> Result<()> { ServiceConfig::ZooKeeper(_) => { return Err(anyhow!("not supported, please use redpanda instead")) } - ServiceConfig::OpenDal(_) => continue, + ServiceConfig::Opendal(_) => continue, ServiceConfig::AwsS3(_) => continue, ServiceConfig::RedPanda(c) => { if opts.deploy { diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 3d922b161bda4..94d8f9305a021 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -112,7 +112,7 @@ fn task_main( ServiceConfig::Redis(c) => Some((c.port, c.id.clone())), ServiceConfig::ZooKeeper(c) => Some((c.port, c.id.clone())), ServiceConfig::AwsS3(_) => None, - ServiceConfig::OpenDal(_) => None, + ServiceConfig::Opendal(_) => None, ServiceConfig::RedPanda(_) => None, }; @@ -271,7 +271,7 @@ fn task_main( ctx.pb .set_message(format!("using AWS s3 bucket {}", c.bucket)); } - ServiceConfig::OpenDal(c) => { + ServiceConfig::Opendal(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); diff --git a/src/risedevtool/src/config.rs b/src/risedevtool/src/config.rs index 09e530487d4f0..2b1248627e927 100644 --- a/src/risedevtool/src/config.rs +++ b/src/risedevtool/src/config.rs @@ -166,7 +166,7 @@ impl ConfigExpander { "prometheus" => ServiceConfig::Prometheus(serde_yaml::from_str(&out_str)?), "grafana" => ServiceConfig::Grafana(serde_yaml::from_str(&out_str)?), "tempo" => ServiceConfig::Tempo(serde_yaml::from_str(&out_str)?), - "opendal" => ServiceConfig::OpenDal(serde_yaml::from_str(&out_str)?), + "opendal" => ServiceConfig::Opendal(serde_yaml::from_str(&out_str)?), "aws-s3" => ServiceConfig::AwsS3(serde_yaml::from_str(&out_str)?), "kafka" => ServiceConfig::Kafka(serde_yaml::from_str(&out_str)?), "pubsub" => ServiceConfig::Pubsub(serde_yaml::from_str(&out_str)?), diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 516ae872d6c31..8421b53d44572 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -327,7 +327,7 @@ pub enum ServiceConfig { Prometheus(PrometheusConfig), Grafana(GrafanaConfig), Tempo(TempoConfig), - OpenDal(OpendalConfig), + Opendal(OpendalConfig), AwsS3(AwsS3Config), Kafka(KafkaConfig), Pubsub(PubsubConfig), @@ -354,7 +354,7 @@ impl ServiceConfig { Self::Pubsub(c) => &c.id, Self::Redis(c) => &c.id, Self::RedPanda(c) => &c.id, - Self::OpenDal(c) => &c.id, + Self::Opendal(c) => &c.id, } } } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index a336719503b00..00c7e33196abf 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -28,14 +28,17 @@ use risingwave_common::error::{Result, RwError}; use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; -use risingwave_connector::source::filesystem::{FsPage, FsPageItem, S3SplitEnumerator}; +use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; +use risingwave_connector::source::filesystem::opendal_source::{ + OpendalGcs, OpendalS3, OpendalSource, +}; +use risingwave_connector::source::filesystem::FsPageItem; use risingwave_connector::source::{ create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, - ConnectorState, FsFilterCtrlCtx, FsListInner, SourceColumnDesc, SourceContext, - SourceEnumeratorContext, SplitEnumerator, SplitReader, + ConnectorState, FsFilterCtrlCtx, SourceColumnDesc, SourceContext, SplitReader, }; use tokio::time; -use tokio::time::{Duration, MissedTickBehavior}; +use tokio::time::Duration; #[derive(Clone, Debug)] pub struct ConnectorSource { @@ -87,23 +90,21 @@ impl ConnectorSource { .collect::>>() } - pub async fn get_source_list(&self) -> Result> { + pub fn get_source_list(&self) -> Result> { let config = self.config.clone(); - let lister = match config { - ConnectorProperties::S3(prop) => { - S3SplitEnumerator::new(*prop, Arc::new(SourceEnumeratorContext::default())).await? + match config { + ConnectorProperties::Gcs(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_gcs_source(*prop)?; + Ok(build_opendal_fs_list_stream(lister)) + } + ConnectorProperties::OpendalS3(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; + Ok(build_opendal_fs_list_stream(lister)) } other => bail!("Unsupported source: {:?}", other), - }; - - Ok(build_fs_list_stream( - FsListCtrlContext { - interval: Duration::from_secs(60), - last_tick: None, - filter_ctx: FsFilterCtrlCtx, - }, - lister, - )) + } } pub async fn stream_reader( @@ -137,14 +138,12 @@ impl ConnectorSource { }; let support_multiple_splits = config.support_multiple_splits(); - dispatch_source_prop!(config, prop, { let readers = if support_multiple_splits { tracing::debug!( "spawning connector split reader for multiple splits {:?}", splits ); - let reader = create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns) .await?; @@ -171,34 +170,29 @@ impl ConnectorSource { } } -#[try_stream(boxed, ok = FsPage, error = RwError)] -async fn build_fs_list_stream( - mut ctrl_ctx: FsListCtrlContext, - mut list_op: impl FsListInner + Send + 'static, -) { - let mut interval = time::interval(ctrl_ctx.interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - // controlling whether request for next page - fn page_ctrl_logic(_ctx: &FsListCtrlContext, has_finished: bool, _page_num: usize) -> bool { - !has_finished - } - - loop { - let mut page_num = 0; - ctrl_ctx.last_tick = Some(time::Instant::now()); - 'inner: loop { - let (fs_page, has_finished) = list_op.get_next_page::().await?; - let matched_items = fs_page - .into_iter() - .filter(|item| list_op.filter_policy(&ctrl_ctx.filter_ctx, page_num, item)) - .collect_vec(); - yield matched_items; - page_num += 1; - if !page_ctrl_logic(&ctrl_ctx, has_finished, page_num) { - break 'inner; +#[try_stream(boxed, ok = FsPageItem, error = RwError)] +async fn build_opendal_fs_list_stream(lister: OpendalEnumerator) { + let matcher = lister.get_matcher(); + let mut object_metadata_iter = lister.list().await?; + + while let Some(list_res) = object_metadata_iter.next().await { + match list_res { + Ok(res) => { + if matcher + .as_ref() + .map(|m| m.matches(&res.name)) + .unwrap_or(true) + { + yield res + } else { + // Currrntly due to the lack of prefix list, we just skip the unmatched files. + continue; + } + } + Err(err) => { + tracing::error!("list object fail, err {}", err); + return Err(err.into()); } } - interval.tick().await; } } diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 5d0ff54ecf38d..1d9c828c35cdc 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -20,7 +20,7 @@ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; use risingwave_connector::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; use risingwave_connector::source::monitor::SourceMetrics; -use risingwave_connector::source::{ConnectorProperties, SourceColumnDesc, SourceColumnType}; +use risingwave_connector::source::{SourceColumnDesc, SourceColumnType}; use risingwave_connector::ConnectorParams; use risingwave_pb::catalog::PbStreamSourceInfo; use risingwave_pb::plan_common::PbColumnCatalog; @@ -36,8 +36,6 @@ pub struct SourceDesc { pub source: ConnectorSource, pub columns: Vec, pub metrics: Arc, - - pub is_new_fs_source: bool, } /// `FsSourceDesc` describes a stream source. @@ -99,18 +97,11 @@ impl SourceDescBuilder { columns } - pub fn build(mut self) -> Result { + pub fn build(self) -> Result { let columns = self.column_catalogs_to_source_column_descs(); let psrser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?; - let is_new_fs_source = - ConnectorProperties::is_new_fs_connector_hash_map(&self.with_properties); - if is_new_fs_source { - // new fs source requires `connector='s3_v2' but we simply reuse S3 connector` - ConnectorProperties::rewrite_upstream_source_key_hash_map(&mut self.with_properties); - } - let source = ConnectorSource::new( self.with_properties, columns.clone(), @@ -122,7 +113,6 @@ impl SourceDescBuilder { source, columns, metrics: self.metrics, - is_new_fs_source, }) } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index c0b33c389e5bb..d7c19ade79063 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; +use std::marker::PhantomData; use std::ops::Bound; use std::sync::Arc; @@ -24,7 +25,10 @@ use risingwave_common::catalog::{ColumnId, Schema, TableId}; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{ScalarRef, ScalarRefImpl}; -use risingwave_connector::source::filesystem::FsSplit; +use risingwave_connector::source::filesystem::opendal_source::{ + OpendalGcs, OpendalS3, OpendalSource, +}; +use risingwave_connector::source::filesystem::OpendalFsSplit; use risingwave_connector::source::{ BoxSourceWithStateStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, StreamChunkWithState, @@ -46,7 +50,7 @@ const SPLIT_BATCH_SIZE: usize = 1000; type SplitBatch = Option>; -pub struct FsFetchExecutor { +pub struct FsFetchExecutor { actor_ctx: ActorContextRef, info: ExecutorInfo, @@ -61,9 +65,11 @@ pub struct FsFetchExecutor { // config for the connector node connector_params: ConnectorParams, + + _marker: PhantomData, } -impl FsFetchExecutor { +impl FsFetchExecutor { #[allow(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, @@ -80,6 +86,7 @@ impl FsFetchExecutor { upstream: Some(upstream), source_ctrl_opts, connector_params, + _marker: PhantomData, } } @@ -103,13 +110,23 @@ impl FsFetchExecutor { ) .await?; pin_mut!(table_iter); - + let properties = source_desc.source.config.clone(); while let Some(item) = table_iter.next().await { let row = item?; let split = match row.datum_at(1) { - Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - SplitImpl::from(FsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?) - } + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties { + risingwave_connector::source::ConnectorProperties::Gcs(_) => { + let split: OpendalFsSplit = + OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; + SplitImpl::from(split) + } + risingwave_connector::source::ConnectorProperties::OpendalS3(_) => { + let split: OpendalFsSplit = + OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; + SplitImpl::from(split) + } + _ => unreachable!(), + }, _ => unreachable!(), }; batch.push(split); @@ -119,7 +136,6 @@ impl FsFetchExecutor { } } } - if batch.is_empty() { stream.replace_data_stream(stream::pending().boxed()); } else { @@ -264,7 +280,11 @@ impl FsFetchExecutor { .map(|row| { let filename = row.datum_at(0).unwrap().into_utf8(); let size = row.datum_at(2).unwrap().into_int64(); - FsSplit::new(filename.to_owned(), 0, size as usize) + OpendalFsSplit::::new( + filename.to_owned(), + 0, + size as usize, + ) }) .collect(); state_store_handler.take_snapshot(file_assignment).await?; @@ -280,14 +300,17 @@ impl FsFetchExecutor { split_offset_mapping, }) => { let mapping = split_offset_mapping.unwrap(); - for (split_id, offset) in mapping { + debug_assert_eq!(mapping.len(), 1); + if let Some((split_id, offset)) = mapping.into_iter().next() { let row = state_store_handler .get(split_id.clone()) .await? .expect("The fs_split should be in the state table."); let fs_split = match row.datum_at(1) { Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - FsSplit::restore_from_json(jsonb_ref.to_owned_scalar())? + OpendalFsSplit::::restore_from_json( + jsonb_ref.to_owned_scalar(), + )? } _ => unreachable!(), }; @@ -311,7 +334,7 @@ impl FsFetchExecutor { } } -impl Executor for FsFetchExecutor { +impl Executor for FsFetchExecutor { fn execute(self: Box) -> BoxedMessageStream { self.into_stream().boxed() } @@ -329,7 +352,7 @@ impl Executor for FsFetchExecutor { } } -impl Debug for FsFetchExecutor { +impl Debug for FsFetchExecutor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { if let Some(core) = &self.stream_source_core { f.debug_struct("FsFetchExecutor") diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 500e8a58e89f0..8df2e9aadb5af 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -22,7 +22,7 @@ use futures_async_stream::try_stream; use risingwave_common::array::Op; use risingwave_common::catalog::Schema; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; -use risingwave_connector::source::filesystem::FsPage; +use risingwave_connector::source::filesystem::FsPageItem; use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts}; use risingwave_connector::ConnectorParams; use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; @@ -35,6 +35,8 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; +const CHUNK_SIZE: usize = 1024; + #[allow(dead_code)] pub struct FsListExecutor { actor_ctx: ActorContextRef, @@ -83,39 +85,42 @@ impl FsListExecutor { } } - async fn build_chunked_paginate_stream( + #[allow(clippy::disallowed_types)] + fn build_chunked_paginate_stream( &self, source_desc: &SourceDesc, ) -> StreamExecutorResult> { - let stream = source_desc + let stream: std::pin::Pin< + Box> + Send>, + > = source_desc .source .get_source_list() - .await .map_err(StreamExecutorError::connector_error)?; - Ok(stream - .map(|item| item.map(Self::map_fs_page_to_chunk)) - .boxed()) - } - - fn map_fs_page_to_chunk(page: FsPage) -> StreamChunk { - let rows = page - .into_iter() - .map(|split| { - ( - Op::Insert, - OwnedRow::new(vec![ - Some(ScalarImpl::Utf8(split.name.into_boxed_str())), - Some(ScalarImpl::Timestamp(split.timestamp)), - Some(ScalarImpl::Int64(split.size)), - ]), - ) - }) - .collect::>(); - StreamChunk::from_rows( - &rows, - &[DataType::Varchar, DataType::Timestamp, DataType::Int64], - ) + // Group FsPageItem stream into chunks of size 1024. + let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| { + let rows = chunk + .into_iter() + .map(|item| { + let page_item = item.unwrap(); + ( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())), + Some(ScalarImpl::Timestamptz(page_item.timestamp)), + Some(ScalarImpl::Int64(page_item.size)), + ]), + ) + }) + .collect::>(); + + Ok(StreamChunk::from_rows( + &rows, + &[DataType::Varchar, DataType::Timestamptz, DataType::Int64], + )) + }); + + Ok(chunked_stream.boxed()) } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -144,7 +149,7 @@ impl FsListExecutor { // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); - let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc).await?; + let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc)?; let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 101a6fb39b600..4ffa13d6fae05 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -15,7 +15,8 @@ use std::sync::Arc; use risingwave_common::catalog::{ColumnId, TableId}; -use risingwave_connector::source::SourceCtrlOpts; +use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; +use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; use risingwave_pb::stream_plan::StreamFsFetchNode; use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::StateStore; @@ -46,7 +47,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { let source_id = TableId::new(source.source_id); let source_name = source.source_name.clone(); let source_info = source.get_info()?; - + let properties = ConnectorProperties::extract(source.with_properties.clone())?; let source_desc_builder = SourceDescBuilder::new( source.columns.clone(), params.env.source_metrics(), @@ -57,7 +58,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { params.env.config().developer.connector_message_buffer_size, params.info.pk_indices.clone(), ); - let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, }; @@ -87,16 +87,31 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { state_table_handler, ); - let executor = FsFetchExecutor::new( - params.actor_context.clone(), - params.info, - stream_source_core, - upstream, - source_ctrl_opts, - params.env.connector_params(), - ) - .boxed(); - + let executor = match properties { + risingwave_connector::source::ConnectorProperties::Gcs(_) => { + FsFetchExecutor::<_, OpendalGcs>::new( + params.actor_context.clone(), + params.info, + stream_source_core, + upstream, + source_ctrl_opts, + params.env.connector_params(), + ) + .boxed() + } + risingwave_connector::source::ConnectorProperties::OpendalS3(_) => { + FsFetchExecutor::<_, OpendalS3>::new( + params.actor_context.clone(), + params.info, + stream_source_core, + upstream, + source_ctrl_opts, + params.env.connector_params(), + ) + .boxed() + } + _ => unreachable!(), + }; let rate_limit = source.rate_limit.map(|x| x as _); Ok(FlowControlExecutor::new(executor, params.actor_context, rate_limit).boxed()) } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 63b7e0c31e546..1fd062e702b66 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -55,6 +55,7 @@ futures-sink = { version = "0.3" } futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } +getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["std"] } governor = { version = "0.6", default-features = false, features = ["dashmap", "jitter", "std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } @@ -105,7 +106,7 @@ redis = { version = "0.24", features = ["async-std-comp", "tokio-comp"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" } -reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] } ring = { version = "0.16", features = ["std"] } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } rustls = { version = "0.21" } @@ -159,6 +160,7 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt", either = { version = "1", features = ["serde"] } fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } +getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" }