Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): implement new fs source #12547

Closed
wants to merge 11 commits into from
25 changes: 24 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_rpc_client::ConnectorClient;
use serde::de::DeserializeOwned;

use super::datagen::DatagenMeta;
use super::filesystem::FsSplit;
use super::filesystem::{FsPage, FsSplit};
use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
Expand Down Expand Up @@ -509,6 +509,29 @@ pub trait SplitMetaData: Sized {
/// [`None`] and the created source stream will be a pending stream.
pub type ConnectorState = Option<Vec<SplitImpl>>;

#[async_trait]
pub trait SourceLister: Sized {
type Split: SplitMetaData + Send;
type Properties;

async fn new(properties: Self::Properties) -> Result<Self>;
fn paginate(self) -> BoxTryStream<FsPage>;
}

#[async_trait]
pub trait SourceReader: Sized + Send {
type Properties;

async fn new(
properties: Self::Properties,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> Result<Self>;

fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream;
}

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
20 changes: 19 additions & 1 deletion src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::anyhow;
use risingwave_common::types::JsonbVal;
use risingwave_common::types::{JsonbVal, Timestamp};
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand Down Expand Up @@ -55,3 +55,21 @@ impl FsSplit {
}
}
}

pub struct FsPageItem {
pub name: String,
pub size: usize,
pub timestamp: Timestamp,
}

impl FsPageItem {
pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self {
Self {
name,
size,
timestamp,
}
}
}

pub type FsPage = Vec<FsPageItem>;
4 changes: 2 additions & 2 deletions src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};

mod file_common;
pub mod nd_streaming;
pub use file_common::FsSplit;
pub use file_common::{FsPage, FsPageItem, FsSplit};
mod s3;

pub mod s3_v2;
pub const S3_V2_CONNECTOR: &str = "s3_v2";
4 changes: 2 additions & 2 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct S3FileReader {

impl S3FileReader {
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn stream_read_object(
pub async fn stream_read_object(
client_for_s3: s3_client::Client,
bucket_name: String,
split: FsSplit,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl S3FileReader {
}
}

async fn get_object(
pub async fn get_object(
client_for_s3: &s3_client::Client,
bucket_name: &str,
object_name: &str,
Expand Down
152 changes: 152 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/lister.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::error::RwError;
use risingwave_common::types::Timestamp;

use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{default_conn_config, s3_client};
use crate::source::filesystem::file_common::{FsPage, FsSplit};
use crate::source::filesystem::{S3Properties, FsPageItem};
use crate::source::{BoxTryStream, SourceLister};

/// Get the prefix from a glob
fn get_prefix(glob: &str) -> String {
let mut escaped = false;
let mut escaped_filter = false;
glob.chars()
.take_while(|c| match (c, &escaped) {
('*', false) => false,
('[', false) => false,
('{', false) => false,
('\\', false) => {
escaped = true;
true
}
(_, false) => true,
(_, true) => {
escaped = false;
true
}
})
.filter(|c| match (c, &escaped_filter) {
(_, true) => {
escaped_filter = false;
true
}
('\\', false) => {
escaped_filter = true;
false
}
(_, _) => true,
})
.collect()
}

pub struct S3SourceLister {
client: Client,
// prefix is used to reduce the number of objects to be listed
prefix: Option<String>,
matcher: Option<glob::Pattern>,
bucket_name: String,
}

impl S3SourceLister {
#[try_stream(boxed, ok = FsPage, error = RwError)]
async fn paginate_inner(self) {
loop { // start a new round
let mut next_continuation_token = None;
'truncated: loop { // loop to paginate
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket_name)
.set_prefix(self.prefix.clone());
if let Some(continuation_token) = next_continuation_token.take() {
req = req.continuation_token(continuation_token);
}
let mut res = req
.send()
.await
.map_err(|e| anyhow!(DisplayErrorContext(e)))?;

yield res
.contents
.take()
.unwrap_or_default()
.iter()
.filter(|obj| obj.key().is_some())
.filter(|obj| {
self.matcher
.as_ref()
.map(|m| m.matches(obj.key().unwrap()))
.unwrap_or(true)
})
.into_iter()
.map(|obj| {
let aws_ts = obj.last_modified().unwrap();
FsPageItem::new(
obj.key().unwrap().to_owned(),
obj.size() as usize,
Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()),
)
})
.collect_vec();

if res.is_truncated() {
next_continuation_token = Some(res.next_continuation_token.unwrap())
} else {
break 'truncated;
}
}
}
}
}

#[async_trait]
impl SourceLister for S3SourceLister {
type Properties = S3Properties;
type Split = FsSplit;

async fn new(properties: Self::Properties) -> Result<Self> {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};

Ok(Self {
bucket_name: properties.bucket_name,
matcher,
prefix,
client: s3_client,
})
}

fn paginate(self) -> BoxTryStream<FsPage> {
self.paginate_inner()
}
}
16 changes: 16 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod lister;
pub mod reader;
92 changes: 92 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_s3::client as s3_client;
use futures_async_stream::try_stream;
use risingwave_common::error::RwError;

use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{s3_client, default_conn_config};
use crate::parser::{ParserConfig, ByteStreamSourceParserImpl};
use crate::source::{BoxSourceWithStateStream, StreamChunkWithState};
use crate::source::base::SplitMetaData;
use crate::source::filesystem::{FsSplit, S3FileReader, nd_streaming};
use crate::source::{SourceReader, filesystem::S3Properties, SourceContextRef, Column};

pub struct S3SourceReader {
bucket_name: String,
s3_client: s3_client::Client,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
}

impl S3SourceReader {
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
async fn build_read_stream_inner(
client_for_s3: s3_client::Client,
bucket_name: String,
source_ctx: SourceContextRef,
parser_config: ParserConfig,
split: FsSplit,
) {
let split_id = split.id();
let data_stream = S3FileReader::stream_read_object(client_for_s3, bucket_name, split, source_ctx.clone());
let parser = ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone()).await?;
let msg_stream = if matches!(
parser,
ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_)
) {
parser.into_stream(nd_streaming::split_stream(data_stream))
} else {
parser.into_stream(data_stream)
};

let actor_id = source_ctx.source_info.actor_id.to_string();
let source_id = source_ctx.source_info.source_id.to_string();
#[for_await]
for msg in msg_stream {
let msg = msg?;
source_ctx
.metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(msg.chunk.cardinality() as u64);
yield msg;
}
}
}

#[async_trait]
impl SourceReader for S3SourceReader {
type Properties = S3Properties;

async fn new(
properties: Self::Properties,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> Result<Self> {
let config = AwsAuthProps::from(&properties);

let sdk_config = config.build_config().await?;

let bucket_name = properties.bucket_name;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));

Ok(S3SourceReader {
bucket_name,
s3_client,
parser_config,
source_ctx,
})
}

fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream {
Self::build_read_stream_inner(
self.s3_client.clone(),
self.bucket_name.clone(),
self.source_ctx.clone(),
self.parser_config.clone(),
split
)
}
}
Loading