Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Apr 20, 2024
1 parent 95bec0b commit c3c04e0
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 64 deletions.
6 changes: 6 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,18 @@ pub struct Reschedule {
pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

/// Replacing an old table with a new one. Used for `ALTER TABLE` and sink into table. All actors in the table job will be rebuilt.
#[derive(Debug, Clone)]
pub struct ReplaceTablePlan {
pub old_table_fragments: TableFragments,
pub new_table_fragments: TableFragments,
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
/// We need to reassign splits for it.
///
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
/// backfill_splits.
pub init_split_assignment: SplitAssignment,
}

Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct ActorContext {

pub streaming_metrics: Arc<StreamingMetrics>,

/// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added.
pub dispatch_num: usize,
}

Expand Down
30 changes: 30 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,36 @@ impl Barrier {
}
}

/// Whether this barrier adds new downstream actors for the actor with `upstream_actor_id`.
pub fn has_new_downstream(&self, upstream_actor_id: ActorId) -> bool {
let Some(mutation) = self.mutation.as_deref() else {
return false;
};
match mutation {
// Add is for mv, index and sink creation.
Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
// AddAndUpdate is for sink-into-table.
Mutation::AddAndUpdate(
AddMutation { adds, .. },
UpdateMutation {
dispatchers,
actor_new_dispatchers,
..
},
) => {
adds.get(&upstream_actor_id).is_some()
|| actor_new_dispatchers.get(&upstream_actor_id).is_some()
|| dispatchers.get(&upstream_actor_id).is_some()
}
Mutation::Update(_)
| Mutation::Stop(_)
| Mutation::Pause
| Mutation::Resume
| Mutation::SourceChangeSplit(_)
| Mutation::Throttle(_) => false,
}
}

/// Whether this barrier requires the executor to pause its data stream on startup.
pub fn is_pause_on_startup(&self) -> bool {
match self.mutation.as_deref() {
Expand Down
38 changes: 4 additions & 34 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ use crate::common::table::state_table::StateTableInner;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContext, ActorContextRef, AddMutation, BoxedMessageStream, Execute,
Executor, Message, Mutation, StreamExecutorResult, UpdateMutation,
expect_first_barrier, ActorContext, ActorContextRef, BoxedMessageStream, Execute, Executor,
Message, StreamExecutorResult,
};
use crate::task::{ActorId, AtomicU64Ref};
use crate::task::AtomicU64Ref;

/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
Expand Down Expand Up @@ -231,10 +231,9 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
}
}
Message::Barrier(b) => {
let mutation = b.mutation.clone();
// If a downstream mv depends on the current table, we need to do conflict check again.
if !self.state_table.is_consistent_op()
&& Self::new_downstream_created(mutation, self.actor_context.id)
&& b.has_new_downstream(self.actor_context.id)
{
assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
self.state_table
Expand All @@ -259,35 +258,6 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
}
}
}

fn new_downstream_created(mutation: Option<Arc<Mutation>>, actor_id: ActorId) -> bool {
let Some(mutation) = mutation.as_deref() else {
return false;
};
match mutation {
// Add is for mv, index and sink creation.
Mutation::Add(AddMutation { adds, .. }) => adds.get(&actor_id).is_some(),
// AddAndUpdate is for sink-into-table.
Mutation::AddAndUpdate(
AddMutation { adds, .. },
UpdateMutation {
dispatchers,
actor_new_dispatchers: actor_dispatchers,
..
},
) => {
adds.get(&actor_id).is_some()
|| actor_dispatchers.get(&actor_id).is_some()
|| dispatchers.get(&actor_id).is_some()
}
Mutation::Update(_)
| Mutation::Stop(_)
| Mutation::Pause
| Mutation::Resume
| Mutation::SourceChangeSplit(_)
| Mutation::Throttle(_) => false,
}
}
}

impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct BackfillStateTableHandler<S: StateStore> {
}

impl<S: StateStore> BackfillStateTableHandler<S> {
/// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like.
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_store: StateTable::from_table_catalog(table_catalog, store, None).await,
Expand Down
19 changes: 8 additions & 11 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl<S: StateStore> SourceExecutor<S> {

// init in-memory split states with persisted state if any
core.init_split_state(boot_state.clone());
let mut is_uninitialized = core.split_state_store.is_empty().await?;
let mut is_uninitialized = self.actor_ctx.dispatch_num == 0;

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = Some(core);
Expand Down Expand Up @@ -532,6 +532,13 @@ impl<S: StateStore> SourceExecutor<S> {

let epoch = barrier.epoch;

if barrier.has_new_downstream(self.actor_ctx.id) {
if is_uninitialized {
stream.resume_stream();
is_uninitialized = false;
}
}

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
// XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic.
Expand Down Expand Up @@ -570,16 +577,6 @@ impl<S: StateStore> SourceExecutor<S> {
.await?;
}
}
Mutation::Add(AddMutation { adds, .. }) => {
// The shared source executor has a downstream MV now. Let's start working!
if adds.contains_key(&self.actor_ctx.id)
&& self.is_shared
&& is_uninitialized
{
stream.resume_stream();
is_uninitialized = false;
}
}
_ => {}
}
}
Expand Down
19 changes: 0 additions & 19 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_hummock_sdk::key::next_key;
use risingwave_pb::catalog::PbTable;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::TableDistribution;
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTable;
Expand Down Expand Up @@ -92,24 +91,6 @@ impl<S: StateStore> SourceStateTableHandler<S> {
.map_err(StreamExecutorError::from)
}

/// This source has not consumed any data yet.
pub async fn is_empty(&self) -> StreamExecutorResult<bool> {
assert_eq!(
self.state_table.vnodes(),
TableDistribution::singleton_vnode_bitmap_ref(),
"SourceExecutor's state table should have singleton distribution"
);
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);

let state_table_iter = self
.state_table
.iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
.await?;
pin_mut!(state_table_iter);

Ok(state_table_iter.next().await.is_none())
}

/// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor)
pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult<HashSet<SplitId>> {
let start = Bound::Excluded(row::once(Some(Self::string_to_scalar(
Expand Down

0 comments on commit c3c04e0

Please sign in to comment.