Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(state-table): make consistent op an enum in state table #16471

Merged
merged 3 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 104 additions & 29 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct StateTableInner<
/// 2. Computing output pk indices to used them for backfill state.
output_indices: Vec<usize>,

is_consistent_op: bool,
op_consistency_level: StateTableOpConsistencyLevel,
}

/// `StateTable` will use `BasicSerde` as default
Expand Down Expand Up @@ -218,7 +218,10 @@ where
}
}

fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel {
fn consistent_old_value_op(
row_serde: impl ValueRowSerde,
is_log_store: bool,
) -> OpConsistencyLevel {
OpConsistencyLevel::ConsistentOldValue {
check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
if first == second {
Expand All @@ -245,10 +248,23 @@ fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel
true
}
}),
is_log_store: false,
is_log_store,
}
}

#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum StateTableOpConsistencyLevel {
/// Op is inconsistent
Inconsistent,
/// Op is consistent.
/// - Insert op should ensure that the key does not exist previously
/// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
ConsistentOldValue,
/// The requirement on operation consistency is the same as `ConsistentOldValue`.
/// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
LogStoreEnabled,
}

// initialize
// FIXME(kwannoel): Enforce that none of the constructors here
// should be used by replicated state table.
Expand All @@ -266,7 +282,13 @@ where
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> Self {
Self::from_table_catalog_inner(table_catalog, store, vnodes, true, vec![]).await
Self::from_table_catalog_with_consistency_level(
table_catalog,
store,
vnodes,
StateTableOpConsistencyLevel::ConsistentOldValue,
)
.await
}

/// Create state table from table catalog and store with sanity check disabled.
Expand All @@ -275,15 +297,31 @@ where
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> Self {
Self::from_table_catalog_inner(table_catalog, store, vnodes, false, vec![]).await
Self::from_table_catalog_with_consistency_level(
table_catalog,
store,
vnodes,
StateTableOpConsistencyLevel::Inconsistent,
)
.await
}

pub async fn from_table_catalog_with_consistency_level(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
consistency_level: StateTableOpConsistencyLevel,
) -> Self {
Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
.await
}

/// Create state table from table catalog and store.
async fn from_table_catalog_inner(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
is_consistent_op: bool,
op_consistency_level: StateTableOpConsistencyLevel,
output_column_ids: Vec<ColumnId>,
) -> Self {
let table_id = TableId::new(table_catalog.id);
Expand Down Expand Up @@ -369,18 +407,23 @@ where
)
};

let is_consistent_op = if crate::consistency::insane() {
let state_table_op_consistency_level = if crate::consistency::insane() {
// In insane mode, we will have inconsistent operations applied on the table, even if
// our executor code do not expect that.
false
StateTableOpConsistencyLevel::Inconsistent
} else {
is_consistent_op
op_consistency_level
};
let op_consistency_level = if is_consistent_op {
let row_serde = make_row_serde();
consistent_old_value_op(row_serde)
} else {
OpConsistencyLevel::Inconsistent
let op_consistency_level = match state_table_op_consistency_level {
StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
StateTableOpConsistencyLevel::ConsistentOldValue => {
let row_serde = make_row_serde();
consistent_old_value_op(row_serde, false)
}
StateTableOpConsistencyLevel::LogStoreEnabled => {
let row_serde = make_row_serde();
consistent_old_value_op(row_serde, true)
}
};

let table_option = TableOption::new(table_catalog.retention_seconds);
Expand Down Expand Up @@ -466,7 +509,7 @@ where
data_types,
output_indices,
i2o_mapping,
is_consistent_op,
op_consistency_level: state_table_op_consistency_level,
}
}

Expand Down Expand Up @@ -602,7 +645,7 @@ where
};
let op_consistency_level = if is_consistent_op {
let row_serde = make_row_serde();
consistent_old_value_op(row_serde)
consistent_old_value_op(row_serde, false)
} else {
OpConsistencyLevel::Inconsistent
};
Expand Down Expand Up @@ -648,7 +691,11 @@ where
data_types,
output_indices: vec![],
i2o_mapping: ColIndexMapping::new(vec![], 0),
is_consistent_op,
op_consistency_level: if is_consistent_op {
StateTableOpConsistencyLevel::ConsistentOldValue
} else {
StateTableOpConsistencyLevel::Inconsistent
},
}
}

Expand Down Expand Up @@ -711,7 +758,11 @@ where
}

pub fn is_consistent_op(&self) -> bool {
self.is_consistent_op
matches!(
self.op_consistency_level,
StateTableOpConsistencyLevel::ConsistentOldValue
| StateTableOpConsistencyLevel::LogStoreEnabled
)
}
}

Expand All @@ -728,7 +779,14 @@ where
vnodes: Option<Arc<Bitmap>>,
output_column_ids: Vec<ColumnId>,
) -> Self {
Self::from_table_catalog_inner(table_catalog, store, vnodes, false, output_column_ids).await
Self::from_table_catalog_inner(
table_catalog,
store,
vnodes,
StateTableOpConsistencyLevel::Inconsistent,
output_column_ids,
)
.await
}
}

Expand Down Expand Up @@ -1084,22 +1142,39 @@ where
}

pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
self.commit_with_switch_consistent_op(new_epoch, None).await
self.commit_inner(new_epoch, None).await
}

pub async fn commit_with_switch_consistent_op(
pub async fn commit_may_switch_consistent_op(
&mut self,
new_epoch: EpochPair,
switch_consistent_op: Option<bool>,
op_consistency_level: StateTableOpConsistencyLevel,
) -> StreamExecutorResult<()> {
if self.op_consistency_level != op_consistency_level {
self.commit_inner(new_epoch, Some(op_consistency_level))
.await
} else {
self.commit_inner(new_epoch, None).await
}
}

async fn commit_inner(
&mut self,
new_epoch: EpochPair,
switch_consistent_op: Option<StateTableOpConsistencyLevel>,
) -> StreamExecutorResult<()> {
assert_eq!(self.epoch(), new_epoch.prev);
let switch_op_consistency_level = switch_consistent_op.map(|enable_consistent_op| {
assert_ne!(self.is_consistent_op, enable_consistent_op);
self.is_consistent_op = enable_consistent_op;
if enable_consistent_op {
consistent_old_value_op(self.row_serde.clone())
} else {
OpConsistencyLevel::Inconsistent
let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
assert_ne!(self.op_consistency_level, new_consistency_level);
self.op_consistency_level = new_consistency_level;
match new_consistency_level {
StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
StateTableOpConsistencyLevel::ConsistentOldValue => {
consistent_old_value_op(self.row_serde.clone(), false)
}
StateTableOpConsistencyLevel::LogStoreEnabled => {
consistent_old_value_op(self.row_serde.clone(), true)
}
}
});
trace!(
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct ActorContext {

pub streaming_metrics: Arc<StreamingMetrics>,

pub dispatch_num: usize,
pub initial_dispatch_num: usize,
}

pub type ActorContextRef = Arc<ActorContext>;
Expand All @@ -65,15 +65,15 @@ impl ActorContext {
total_mem_val: Arc::new(TrAdder::new()),
streaming_metrics: Arc::new(StreamingMetrics::unused()),
// Set 1 for test to enable sanity check on table
dispatch_num: 1,
initial_dispatch_num: 1,
})
}

pub fn create(
stream_actor: &PbStreamActor,
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
dispatch_num: usize,
initial_dispatch_num: usize,
) -> ActorContextRef {
Arc::new(Self {
id: stream_actor.actor_id,
Expand All @@ -83,7 +83,7 @@ impl ActorContext {
last_mem_val: Arc::new(0.into()),
total_mem_val,
streaming_metrics,
dispatch_num,
initial_dispatch_num,
})
}

Expand Down
58 changes: 39 additions & 19 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTableInner;
use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
use crate::executor::prelude::*;
use crate::executor::{AddMutation, UpdateMutation};

Expand All @@ -58,6 +58,21 @@ pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
conflict_behavior: ConflictBehavior,

version_column_index: Option<u32>,

may_have_downstream: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is may_have_downstream, instead of has_downstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag only gets changed when there is newly created downstream, and won't be changed when some downstreams are dropped, so the value can be false positive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. That's the same with my understanding. But are there any problems if we also handle the drop and re-enable the switch?

@BugenZhao mentioned this optimization
#16348 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we cannot do this because there is no enough information in the barrier with drop streaming job command. @chenzl25 Can you provide with more details?

But I think it can be implemented if we incrementally maintain a full copy of downstream information in the mv executor.

Copy link
Contributor

@chenzl25 chenzl25 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take AddMutation as an example. It contains pub adds: HashMap<ActorId, Vec<PbDispatcher>> to represent how many dispatchers are to be added for an actor, but for UpdateMutation, it only has dropped_actors: HashSet<ActorId> so we don't have enough information to maintain the dispatcher number for MaterializedView.

}

fn get_op_consistency_level(
conflict_behavior: ConflictBehavior,
may_have_downstream: bool,
) -> StateTableOpConsistencyLevel {
if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
// Table with overwrite conflict behavior could disable conflict check
// if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
StateTableOpConsistencyLevel::Inconsistent
} else {
StateTableOpConsistencyLevel::ConsistentOldValue
}
}

impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
Expand Down Expand Up @@ -90,16 +105,16 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
);

let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();

let may_have_downstream = actor_context.initial_dispatch_num != 0;
let op_consistency_level = get_op_consistency_level(conflict_behavior, may_have_downstream);
// Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
let state_table = if matches!(conflict_behavior, ConflictBehavior::Overwrite)
&& actor_context.dispatch_num == 0
{
// Table with overwrite conflict behavior could disable conflict check if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
StateTableInner::from_table_catalog_inconsistent_op(table_catalog, store, vnodes).await
} else {
StateTableInner::from_table_catalog(table_catalog, store, vnodes).await
};
let state_table = StateTableInner::from_table_catalog_with_consistency_level(
table_catalog,
store,
vnodes,
op_consistency_level,
)
.await;

let metrics_info =
MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
Expand All @@ -117,6 +132,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
),
conflict_behavior,
version_column_index,
may_have_downstream,
}
}

Expand Down Expand Up @@ -223,17 +239,20 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
}
}
Message::Barrier(b) => {
let mutation = b.mutation.clone();
let mutation = b.mutation.as_deref();
// If a downstream mv depends on the current table, we need to do conflict check again.
if !self.state_table.is_consistent_op()
if !self.may_have_downstream
&& Self::new_downstream_created(mutation, self.actor_context.id)
{
self.may_have_downstream = true;
}
let op_consistency_level =
get_op_consistency_level(self.conflict_behavior, self.may_have_downstream);
self.state_table
.commit_may_switch_consistent_op(b.epoch, op_consistency_level)
.await?;
if !self.state_table.is_consistent_op() {
assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
self.state_table
.commit_with_switch_consistent_op(b.epoch, Some(true))
.await?;
} else {
self.state_table.commit(b.epoch).await?;
}

// Update the vnode bitmap for the state table if asked.
Expand All @@ -252,8 +271,8 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
}
}

fn new_downstream_created(mutation: Option<Arc<Mutation>>, actor_id: ActorId) -> bool {
let Some(mutation) = mutation.as_deref() else {
fn new_downstream_created(mutation: Option<&Mutation>, actor_id: ActorId) -> bool {
let Some(mutation) = mutation else {
return false;
};
match mutation {
Expand Down Expand Up @@ -330,6 +349,7 @@ impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
),
conflict_behavior,
version_column_index: None,
may_have_downstream: true,
}
}
}
Expand Down
Loading