Skip to content

Commit

Permalink
refactor(stream): SourceExecutor minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 1, 2024
1 parent 82d1277 commit 90fc7e5
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = "0.1"
auto_enums = "0.8"
await-tree = { workspace = true }
bytes = "1"
cfg-if = "1"
delta_btree_map = { path = "../utils/delta_btree_map" }
educe = "0.5"
either = "1"
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/source/executor_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct StreamSourceCore<S: StateStore> {
pub(crate) split_state_store: SourceStateTableHandler<S>,

/// In-memory cache for the splits.
///
/// Source messages will only write the cache.
/// It is read on split change and rebuild stream reader on error.
pub(crate) state_cache: HashMap<SplitId, SplitImpl>,
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
)
})
.collect();
state_store_handler.take_snapshot(file_assignment).await?;
state_store_handler.set_states(file_assignment).await?;
state_store_handler.state_store.try_flush().await?;
}
_ => unreachable!(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl<S: StateStore> FsSourceExecutor<S> {

if !incompleted.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, incompleted = ?incompleted, "take snapshot");
core.split_state_store.take_snapshot(incompleted).await?
core.split_state_store.set_states(incompleted).await?
}

if !completed.is_empty() {
Expand Down
13 changes: 8 additions & 5 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(None)
}

// Note: `update_state_if_changed` will modify `state_cache`
/// Note: `update_state_if_changed` will modify `state_cache`
async fn update_state_if_changed(
&mut self,
state: ConnectorState,
Expand Down Expand Up @@ -293,7 +293,9 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(())
}

async fn take_snapshot_and_clear_cache(
/// - `target_state`: the new split info from barrier. `None` if no split update.
/// - `should_trim_state`: whether to trim state for dropped splits.
async fn persist_state_and_clear_cache(
&mut self,
epoch: EpochPair,
target_state: Option<Vec<SplitImpl>>,
Expand Down Expand Up @@ -332,16 +334,17 @@ impl<S: StateStore> SourceExecutor<S> {

if !cache.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot");
core.split_state_store.take_snapshot(cache).await?
core.split_state_store.set_states(cache).await?
}

// commit anyway, even if no message saved
core.split_state_store.state_store.commit(epoch).await?;

core.state_cache.clear();

Ok(())
}

/// try mem table spill
async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
core.split_state_store.state_store.try_flush().await?;
Expand Down Expand Up @@ -519,7 +522,7 @@ impl<S: StateStore> SourceExecutor<S> {
latest_split_info = target_state.clone();
}

self.take_snapshot_and_clear_cache(
self.persist_state_and_clear_cache(
epoch,
target_state,
should_trim_state,
Expand Down
43 changes: 21 additions & 22 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

cfg_if::cfg_if! {
if #[cfg(test)] {
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc};
}
}

use std::collections::HashSet;
use std::ops::{Bound, Deref};
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, Row};
Expand All @@ -27,12 +37,7 @@ use risingwave_common::util::epoch::EpochPair;
use risingwave_common::{bail, row};
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_hummock_sdk::key::next_key;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -140,10 +145,10 @@ impl<S: StateStore> SourceStateTableHandler<S> {

/// set all complete
/// can only used by `FsSourceExecutor`
pub(crate) async fn set_all_complete<SS>(&mut self, states: Vec<SS>) -> StreamExecutorResult<()>
where
SS: SplitMetaData,
{
pub(crate) async fn set_all_complete(
&mut self,
states: Vec<SplitImpl>,
) -> StreamExecutorResult<()> {
if states.is_empty() {
// TODO should be a clear Error Code
bail!("states require not null");
Expand Down Expand Up @@ -180,11 +185,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
Ok(())
}

/// This function provides the ability to persist the source state
/// and needs to be invoked by the ``SourceReader`` to call it,
/// and will return the error when the dependent ``StateStore`` handles the error.
/// The caller should ensure that the passed parameters are not empty.
pub async fn take_snapshot<SS>(&mut self, states: Vec<SS>) -> StreamExecutorResult<()>
pub async fn set_states<SS>(&mut self, states: Vec<SS>) -> StreamExecutorResult<()>
where
SS: SplitMetaData,
{
Expand All @@ -200,10 +201,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
Ok(())
}

pub async fn trim_state<SS>(&mut self, to_trim: &[SS]) -> StreamExecutorResult<()>
where
SS: SplitMetaData,
{
pub async fn trim_state(&mut self, to_trim: &[SplitImpl]) -> StreamExecutorResult<()> {
for split in to_trim {
tracing::info!("trimming source state for split {}", split.id());
self.delete(split.id()).await?;
Expand All @@ -228,8 +226,9 @@ impl<S: StateStore> SourceStateTableHandler<S> {
}
}

// align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used
// for test purpose and should not be used in production.
/// align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used
/// for test purpose and should not be used in production.
#[cfg(test)]
pub fn default_source_internal_table(id: u32) -> PbTable {
let make_column = |column_type: TypeName, column_id: i32| -> ColumnCatalog {
ColumnCatalog {
Expand Down Expand Up @@ -325,7 +324,7 @@ pub(crate) mod tests {

state_table_handler.init_epoch(epoch_1);
state_table_handler
.take_snapshot(vec![split_impl.clone()])
.set_states(vec![split_impl.clone()])
.await?;
state_table_handler.state_store.commit(epoch_2).await?;

Expand Down

0 comments on commit 90fc7e5

Please sign in to comment.