Skip to content

Commit

Permalink
implement fetch executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Sep 28, 2023
1 parent ae20d0b commit f2a1d4a
Show file tree
Hide file tree
Showing 8 changed files with 617 additions and 7 deletions.
14 changes: 14 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,20 @@ pub trait SourceLister: Sized {
fn paginate(self) -> BoxTryStream<FsPage>;
}

#[async_trait]
pub trait SourceReader: Sized + Send {
type Properties;

async fn new(
properties: Self::Properties,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> Result<Self>;

fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream;
}

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct S3FileReader {

impl S3FileReader {
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn stream_read_object(
pub async fn stream_read_object(
client_for_s3: s3_client::Client,
bucket_name: String,
split: FsSplit,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl S3FileReader {
}
}

async fn get_object(
pub async fn get_object(
client_for_s3: &s3_client::Client,
bucket_name: &str,
object_name: &str,
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/filesystem/s3_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
// limitations under the License.

pub mod lister;
pub mod reader;
92 changes: 92 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_s3::client as s3_client;
use futures_async_stream::try_stream;
use risingwave_common::error::RwError;

use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{s3_client, default_conn_config};
use crate::parser::{ParserConfig, ByteStreamSourceParserImpl};
use crate::source::{BoxSourceWithStateStream, StreamChunkWithState};
use crate::source::base::SplitMetaData;
use crate::source::filesystem::{FsSplit, S3FileReader, nd_streaming};
use crate::source::{SourceReader, filesystem::S3Properties, SourceContextRef, Column};

pub struct S3SourceReader {
bucket_name: String,
s3_client: s3_client::Client,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
}

impl S3SourceReader {
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
async fn build_read_stream_inner(
client_for_s3: s3_client::Client,
bucket_name: String,
source_ctx: SourceContextRef,
parser_config: ParserConfig,
split: FsSplit,
) {
let split_id = split.id();
let data_stream = S3FileReader::stream_read_object(client_for_s3, bucket_name, split, source_ctx.clone());
let parser = ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone()).await?;
let msg_stream = if matches!(
parser,
ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_)
) {
parser.into_stream(nd_streaming::split_stream(data_stream))
} else {
parser.into_stream(data_stream)
};

let actor_id = source_ctx.source_info.actor_id.to_string();
let source_id = source_ctx.source_info.source_id.to_string();
#[for_await]
for msg in msg_stream {
let msg = msg?;
source_ctx
.metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(msg.chunk.cardinality() as u64);
yield msg;
}
}
}

#[async_trait]
impl SourceReader for S3SourceReader {
type Properties = S3Properties;

async fn new(
properties: Self::Properties,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> Result<Self> {
let config = AwsAuthProps::from(&properties);

let sdk_config = config.build_config().await?;

let bucket_name = properties.bucket_name;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));

Ok(S3SourceReader {
bucket_name,
s3_client,
parser_config,
source_ctx,
})
}

fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream {
Self::build_read_stream_inner(
self.s3_client.clone(),
self.bucket_name.clone(),
self.source_ctx.clone(),
self.parser_config.clone(),
split
)
}
}
47 changes: 45 additions & 2 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use risingwave_common::util::select_all;
use risingwave_connector::dispatch_source_prop;
use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister;
use risingwave_connector::source::filesystem::FsPage;
use risingwave_connector::source::filesystem::{FsPage, FsSplit};
use risingwave_connector::source::filesystem::s3_v2::reader::S3SourceReader;
use risingwave_connector::source::{
create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SplitReader,
ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SourceReader, SplitReader,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -86,6 +87,48 @@ impl ConnectorSource {
Ok(lister.paginate())
}

// TODO: reuse stream_reader, and using SourceContext
// to discriminate source v1/v2
pub async fn source_reader(
&self,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
split: FsSplit,
) -> Result<BoxSourceWithStateStream> {
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;

let data_gen_columns = Some(
columns
.iter()
.map(|col| Column {
name: col.name.clone(),
data_type: col.data_type.clone(),
is_visible: col.is_visible(),
})
.collect_vec(),
);

let parser_config = ParserConfig {
specific: self.parser_config.clone(),
common: CommonParserConfig {
rw_columns: columns,
},
};

let mut reader = match config {
ConnectorProperties::S3(prop) => S3SourceReader::new(
*prop,
parser_config,
source_ctx,
data_gen_columns,
).await?,
_ => unreachable!(),
};

Ok(reader.build_read_stream(split))
}

pub async fn stream_reader(
&self,
state: ConnectorState,
Expand Down
Loading

0 comments on commit f2a1d4a

Please sign in to comment.