Skip to content

Commit

Permalink
refactor list executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Sep 26, 2023
1 parent f6e57e7 commit ae20d0b
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 76 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ pub trait SourceLister: Sized {
type Properties;

async fn new(properties: Self::Properties) -> Result<Self>;
fn paginate(self) -> BoxTryStream<Vec<FsPage>>;
fn paginate(self) -> BoxTryStream<FsPage>;
}

#[cfg(test)]
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ impl FsSplit {
}
}

pub struct FsPage {
pub struct FsPageItem {
pub name: String,
pub size: usize,
pub timestamp: Timestamp,
}

impl FsPage {
impl FsPageItem {
pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self {
Self {
name,
Expand All @@ -71,3 +71,5 @@ impl FsPage {
}
}
}

pub type FsPage = Vec<FsPageItem>;
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};

mod file_common;
pub mod nd_streaming;
pub use file_common::{FsPage, FsSplit};
pub use file_common::{FsPage, FsPageItem, FsSplit};
mod s3;
pub mod s3_v2;
pub const S3_V2_CONNECTOR: &str = "s3_v2";
10 changes: 5 additions & 5 deletions src/connector/src/source/filesystem/s3_v2/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::Timestamp;
use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{default_conn_config, s3_client};
use crate::source::filesystem::file_common::{FsPage, FsSplit};
use crate::source::filesystem::S3Properties;
use crate::source::filesystem::{S3Properties, FsPageItem};
use crate::source::{BoxTryStream, SourceLister};

/// Get the prefix from a glob
Expand Down Expand Up @@ -69,9 +69,9 @@ pub struct S3SourceLister {
}

impl S3SourceLister {
#[try_stream(boxed, ok = Vec<FsPage>, error = RwError)]
#[try_stream(boxed, ok = FsPage, error = RwError)]
async fn paginate_inner(self) {
'round: loop { // start a new round
loop { // start a new round
let mut next_continuation_token = None;
'truncated: loop { // loop to paginate
let mut req = self
Expand Down Expand Up @@ -102,7 +102,7 @@ impl S3SourceLister {
.into_iter()
.map(|obj| {
let aws_ts = obj.last_modified().unwrap();
FsPage::new(
FsPageItem::new(
obj.key().unwrap().to_owned(),
obj.size() as usize,
Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()),
Expand Down Expand Up @@ -146,7 +146,7 @@ impl SourceLister for S3SourceLister {
})
}

fn paginate(self) -> BoxTryStream<Vec<FsPage>> {
fn paginate(self) -> BoxTryStream<FsPage> {
self.paginate_inner()
}
}
2 changes: 1 addition & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ConnectorSource {
.collect::<Result<Vec<SourceColumnDesc>>>()
}

pub async fn source_lister(&self) -> Result<BoxTryStream<Vec<FsPage>>> {
pub async fn source_lister(&self) -> Result<BoxTryStream<FsPage>> {
let config = self.config.clone();
let lister = match config {
ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?,
Expand Down
108 changes: 43 additions & 65 deletions src/stream/src/executor/source_v2/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use risingwave_common::array::Op;
use risingwave_common::catalog::Schema;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_connector::source::filesystem::FsPage;
use risingwave_connector::source::{
BoxTryStream, SourceCtrlOpts, StreamChunkWithState
};
use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts};
use risingwave_connector::ConnectorParams;
use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder};
use risingwave_storage::StateStore;
Expand All @@ -34,7 +32,7 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::*;

pub struct ListExecutor<S: StateStore> {
pub struct FsListExecutor<S: StateStore> {
actor_ctx: ActorContextRef,

identity: String,
Expand Down Expand Up @@ -62,7 +60,7 @@ pub struct ListExecutor<S: StateStore> {
connector_params: ConnectorParams,
}

impl<S: StateStore> ListExecutor<S> {
impl<S: StateStore> FsListExecutor<S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
actor_ctx: ActorContextRef,
Expand All @@ -78,7 +76,7 @@ impl<S: StateStore> ListExecutor<S> {
) -> Self {
Self {
actor_ctx,
identity: format!("ListExecutor {:X}", executor_id),
identity: format!("FsListExecutor {:X}", executor_id),
schema,
pk_indices,
stream_source_core,
Expand All @@ -90,46 +88,38 @@ impl<S: StateStore> ListExecutor<S> {
}
}

async fn build_fs_source_lister(
async fn build_chunked_paginate_stream(
&self,
source_desc: &SourceDesc,
) -> StreamExecutorResult<BoxTryStream<Vec<FsPage>>> {
source_desc
) -> StreamExecutorResult<BoxTryStream<StreamChunk>> {
let stream = source_desc
.source
.source_lister()
.await
.map_err(StreamExecutorError::connector_error)
.map_err(StreamExecutorError::connector_error)?;

Ok(stream.map(|item| {
item.map(Self::map_fs_page_to_chunk)
}).boxed())
}

async fn fetch_one_page_chunk(
&self,
paginate_stream: &mut BoxTryStream<Vec<FsPage>>,
) -> StreamExecutorResult<StreamChunk> {
match paginate_stream.next().await {
Some(Ok(page)) => {
let rows = page
.into_iter()
.map(|split| {
(
Op::Insert,
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(split.name.into_boxed_str())),
Some(ScalarImpl::Timestamp(split.timestamp)),
Some(ScalarImpl::Int64(split.size as i64)),
]),
)
})
.collect::<Vec<_>>();
Ok(StreamChunk::from_rows(
&rows,
&[DataType::Varchar, DataType::Timestamp, DataType::Int64],
))
},
Some(Err(err)) => Err(StreamExecutorError::connector_error(err)),
None => unreachable!(), // paginate_stream never ends
}
fn map_fs_page_to_chunk(page: FsPage) -> StreamChunk {
let rows = page
.into_iter()
.map(|split| {(
Op::Insert,
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(split.name.into_boxed_str())),
Some(ScalarImpl::Timestamp(split.timestamp)),
Some(ScalarImpl::Int64(split.size as i64)),
]),
)})
.collect::<Vec<_>>();
StreamChunk::from_rows(
&rows,
&[DataType::Varchar, DataType::Timestamp, DataType::Int64],
)
}


#[try_stream(ok = Message, error = StreamExecutorError)]
async fn into_stream(mut self) {
Expand All @@ -154,18 +144,15 @@ impl<S: StateStore> ListExecutor<S> {
.build()
.map_err(StreamExecutorError::connector_error)?;

// TODO: init state store epoch

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = Some(core);

// TODO: recover state
let mut paginate_stream = self.build_fs_source_lister(&source_desc).await?;
let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc).await?;

let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
let mut stream = StreamReaderWithPause::<true, _>::new(
barrier_stream,
tokio_stream::pending().boxed(),
chunked_paginate_stream
);

if barrier.is_pause_on_startup() {
Expand All @@ -178,43 +165,34 @@ impl<S: StateStore> ListExecutor<S> {
match msg {
Err(_) => (),
Ok(msg) => match msg {
// Barrier arrives.
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
let mut is_pause_resume = false;
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
stream.pause_stream();
is_pause_resume = true;
},
Mutation::Resume => {
stream.resume_stream();
is_pause_resume = true;
}
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => (),
}
}

if !is_pause_resume {
// TODO: persist some state here
let chunk = self.fetch_one_page_chunk(&mut paginate_stream).await?;
yield Message::Chunk(chunk);
}

yield msg; // propagate the barrier

// Propagate the barrier.
yield msg;
}
// Only barrier can be received.
_ => unreachable!(),
},
// Right arm is always pending.
_ => unreachable!(),
// Chunked FsPage arrives.
Either::Right(chunk) => {
yield Message::Chunk(chunk);
}
},
}
}
}
}

impl<S: StateStore> Executor for ListExecutor<S> {
impl<S: StateStore> Executor for FsListExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.into_stream().boxed()
}
Expand Down Expand Up @@ -296,7 +274,7 @@ mod tests {

let system_params_manager = LocalSystemParamsManager::for_test();

let executor = ListExecutor::new(
let executor = FsListExecutor::new(
ActorContext::create(0),
schema,
pk_indices,
Expand Down
19 changes: 18 additions & 1 deletion src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::source::external::{ExternalTableType, SchemaTableName};
use risingwave_connector::source::SourceCtrlOpts;
use risingwave_connector::source::{SourceCtrlOpts, S3_V2_CONNECTOR};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_source::source_desc::SourceDescBuilder;
use risingwave_storage::panic_store::PanicStateStore;
Expand All @@ -26,10 +26,12 @@ use super::*;
use crate::executor::external::ExternalStorageTable;
use crate::executor::source::StreamSourceCore;
use crate::executor::source_executor::SourceExecutor;
use crate::executor::source_v2::list_executor::FsListExecutor;
use crate::executor::state_table_handler::SourceStateTableHandler;
use crate::executor::{CdcBackfillExecutor, FlowControlExecutor, FsSourceExecutor};

const FS_CONNECTORS: &[&str] = &["s3"];
const FS_V2_CONNECTORS: &[&str] = &[S3_V2_CONNECTOR];
pub struct SourceExecutorBuilder;

#[async_trait::async_trait]
Expand Down Expand Up @@ -115,6 +117,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.map(|c| c.to_ascii_lowercase())
.unwrap_or_default();
let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str());
let is_fs_v2_connector = FS_V2_CONNECTORS.contains(&connector.as_str());

if is_fs_connector {
FsSourceExecutor::new(
Expand All @@ -129,6 +132,20 @@ impl ExecutorBuilder for SourceExecutorBuilder {
source_ctrl_opts,
)?
.boxed()
} else if is_fs_v2_connector {
FsListExecutor::new(
params.actor_context.clone(),
schema.clone(),
params.pk_indices.clone(),
Some(stream_source_core),
params.executor_stats.clone(),
barrier_receiver,
system_params,
params.executor_id,
source_ctrl_opts.clone(),
params.env.connector_params(),
)
.boxed()
} else {
let source_exec = SourceExecutor::new(
params.actor_context.clone(),
Expand Down

0 comments on commit ae20d0b

Please sign in to comment.