Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): SourceExecutor minor refactor #14706

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = "0.1"
auto_enums = "0.8"
await-tree = { workspace = true }
bytes = "1"
cfg-if = "1"
delta_btree_map = { path = "../utils/delta_btree_map" }
educe = "0.5"
either = "1"
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/source/executor_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct StreamSourceCore<S: StateStore> {
pub(crate) split_state_store: SourceStateTableHandler<S>,

/// In-memory cache for the splits.
///
/// Source messages will only write the cache.
/// It is read on split change and rebuild stream reader on error.
pub(crate) state_cache: HashMap<SplitId, SplitImpl>,
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
)
})
.collect();
state_store_handler.take_snapshot(file_assignment).await?;
state_store_handler.set_states(file_assignment).await?;
state_store_handler.state_store.try_flush().await?;
}
_ => unreachable!(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl<S: StateStore> FsSourceExecutor<S> {

if !incompleted.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, incompleted = ?incompleted, "take snapshot");
core.split_state_store.take_snapshot(incompleted).await?
core.split_state_store.set_states(incompleted).await?
}

if !completed.is_empty() {
Expand Down
13 changes: 8 additions & 5 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(None)
}

// Note: `update_state_if_changed` will modify `state_cache`
/// Note: `update_state_if_changed` will modify `state_cache`
async fn update_state_if_changed(
&mut self,
state: ConnectorState,
Expand Down Expand Up @@ -293,7 +293,9 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(())
}

async fn take_snapshot_and_clear_cache(
/// - `target_state`: the new split info from barrier. `None` if no split update.
/// - `should_trim_state`: whether to trim state for dropped splits.
async fn persist_state_and_clear_cache(
&mut self,
epoch: EpochPair,
target_state: Option<Vec<SplitImpl>>,
Expand Down Expand Up @@ -332,16 +334,17 @@ impl<S: StateStore> SourceExecutor<S> {

if !cache.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot");
core.split_state_store.take_snapshot(cache).await?
core.split_state_store.set_states(cache).await?
}

// commit anyway, even if no message saved
core.split_state_store.state_store.commit(epoch).await?;

core.state_cache.clear();

Ok(())
}

/// try mem table spill
async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
core.split_state_store.state_store.try_flush().await?;
Expand Down Expand Up @@ -519,7 +522,7 @@ impl<S: StateStore> SourceExecutor<S> {
latest_split_info = target_state.clone();
}

self.take_snapshot_and_clear_cache(
self.persist_state_and_clear_cache(
epoch,
target_state,
should_trim_state,
Expand Down
43 changes: 21 additions & 22 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

cfg_if::cfg_if! {
if #[cfg(test)] {
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc};
}
}

use std::collections::HashSet;
use std::ops::{Bound, Deref};
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, Row};
Expand All @@ -27,12 +37,7 @@ use risingwave_common::util::epoch::EpochPair;
use risingwave_common::{bail, row};
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_hummock_sdk::key::next_key;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -140,10 +145,10 @@ impl<S: StateStore> SourceStateTableHandler<S> {

/// set all complete
/// can only used by `FsSourceExecutor`
pub(crate) async fn set_all_complete<SS>(&mut self, states: Vec<SS>) -> StreamExecutorResult<()>
where
SS: SplitMetaData,
{
pub(crate) async fn set_all_complete(
&mut self,
states: Vec<SplitImpl>,
) -> StreamExecutorResult<()> {
if states.is_empty() {
// TODO should be a clear Error Code
bail!("states require not null");
Expand Down Expand Up @@ -180,11 +185,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
Ok(())
}

/// This function provides the ability to persist the source state
/// and needs to be invoked by the ``SourceReader`` to call it,
/// and will return the error when the dependent ``StateStore`` handles the error.
/// The caller should ensure that the passed parameters are not empty.
pub async fn take_snapshot<SS>(&mut self, states: Vec<SS>) -> StreamExecutorResult<()>
Comment on lines -183 to -187
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comments are verbose (and a little outdated?).

Function name set_states might be enough to describe what it does.

pub async fn set_states<SS>(&mut self, states: Vec<SS>) -> StreamExecutorResult<()>
where
SS: SplitMetaData,
{
Expand All @@ -200,10 +201,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
Ok(())
}

pub async fn trim_state<SS>(&mut self, to_trim: &[SS]) -> StreamExecutorResult<()>
where
SS: SplitMetaData,
{
pub async fn trim_state(&mut self, to_trim: &[SplitImpl]) -> StreamExecutorResult<()> {
for split in to_trim {
tracing::info!("trimming source state for split {}", split.id());
self.delete(split.id()).await?;
Expand All @@ -228,8 +226,9 @@ impl<S: StateStore> SourceStateTableHandler<S> {
}
}

// align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used
// for test purpose and should not be used in production.
/// align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used
/// for test purpose and should not be used in production.
#[cfg(test)]
pub fn default_source_internal_table(id: u32) -> PbTable {
let make_column = |column_type: TypeName, column_id: i32| -> ColumnCatalog {
ColumnCatalog {
Expand Down Expand Up @@ -325,7 +324,7 @@ pub(crate) mod tests {

state_table_handler.init_epoch(epoch_1);
state_table_handler
.take_snapshot(vec![split_impl.clone()])
.set_states(vec![split_impl.clone()])
.await?;
state_table_handler.state_store.commit(epoch_2).await?;

Expand Down
Loading