Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): ban scaling of the mviews being created #5732

Merged
merged 10 commits into from
Oct 10, 2022
Merged
25 changes: 17 additions & 8 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ service HeartbeatService {

// Fragments of a Materialized View
message TableFragments {
// Current state of actor
enum ActorState {
// The state of the fragments of this table
enum State {
UNSPECIFIED = 0;
// Initial state after creation
INACTIVE = 1;
// Running normally
RUNNING = 2;
// The materialized view is creating
CREATING = 1;
// The materialized view has been created
CREATED = 2;
}
// Runtime information of an actor
message ActorStatus {
// Current state of actor
enum ActorState {
UNSPECIFIED = 0;
// Initial state after creation
INACTIVE = 1;
// Running normally
RUNNING = 2;
}
// Current on which parallel unit
common.ParallelUnit parallel_unit = 1;
// Current state
Expand All @@ -67,8 +75,9 @@ message TableFragments {
repeated uint32 upstream_fragment_ids = 7;
}
uint32 table_id = 1;
map<uint32, Fragment> fragments = 2;
map<uint32, ActorStatus> actor_status = 3;
State state = 2;
map<uint32, Fragment> fragments = 3;
map<uint32, ActorStatus> actor_status = 4;
}

// TODO: remove this when dashboard refactored.
Expand Down
21 changes: 18 additions & 3 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap};

use comfy_table::{Attribute, Cell, Row, Table};
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::GetClusterInfoResponse;

use crate::common::MetaServiceOpts;
Expand All @@ -38,6 +39,8 @@ pub async fn cluster_info() -> anyhow::Result<()> {

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
let mut fragments = BTreeMap::new();
// Fragment ID -> Table Fragments' State
let mut fragment_states = HashMap::new();

for table_fragment in &table_fragments {
for (&id, fragment) in &table_fragment.fragments {
Expand All @@ -53,6 +56,7 @@ pub async fn cluster_info() -> anyhow::Result<()> {
.or_insert_with(HashMap::new)
.insert(parallel_unit.id, (parallel_unit, actor));
}
fragment_states.insert(id, table_fragment.state());
}
}

Expand All @@ -69,13 +73,23 @@ pub async fn cluster_info() -> anyhow::Result<()> {

let mut table = Table::new();

let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
match fragment_states[&fid] {
State::Unspecified => unreachable!(),
State::Creating => cell.add_attribute(Attribute::CrossedOut),
State::Created => cell,
}
};

// Compute Node, Parallel Unit, Frag 1, Frag 2, ..., Frag N
table.set_header({
let mut row = Row::new();
row.add_cell("Compute Node".into());
row.add_cell("Parallel Unit".into());
for f in fragments.keys() {
row.add_cell(format!("Frag {f}").into());
for &fid in fragments.keys() {
let cell = Cell::new(format!("Frag {fid}"));
let cell = cross_out_if_creating(cell, fid);
row.add_cell(cell);
}
row
});
Expand All @@ -96,12 +110,13 @@ pub async fn cluster_info() -> anyhow::Result<()> {
.add_attribute(Attribute::Bold)
});
row.add_cell(pu.into());
for f in fragments.values() {
for (&fid, f) in &fragments {
let cell = if let Some((_pu, actor)) = f.get(&pu) {
actor.actor_id.into()
} else {
"-".into()
};
let cell = cross_out_if_creating(cell, fid);
row.add_cell(cell);
}
table.add_row(row);
Expand Down
30 changes: 18 additions & 12 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::barrier::CommandChanges;
use crate::manager::{FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::storage::MetaStore;
use crate::stream::{fetch_source_fragments, SourceManagerRef};
use crate::stream::SourceManagerRef;
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -87,11 +87,13 @@ pub enum Command {

/// `CreateMaterializedView` command generates a `Add` barrier by given info.
///
/// Barriers from the actors to be created, which is marked as `Creating` at first, will STILL
/// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
/// be collected since the barrier should be passthroughed.
/// After the barrier is collected, these newly created actors will be marked as `Created`. And
/// it adds the table fragments info to meta store. However, the creating progress will last
/// for a while until the `finish` channel is signaled.
///
/// After the barrier is collected, these newly created actors will be marked as `Running`. And
/// it adds the table fragments info to meta store. However, the creating progress will **last
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
/// will be set to `Created`.
CreateMaterializedView {
table_fragments: TableFragments,
table_sink_map: HashMap<TableId, Vec<ActorId>>,
Expand Down Expand Up @@ -462,7 +464,7 @@ where
dependent_table_actors.push((*table_id, downstream_actors));
}
self.fragment_manager
.finish_create_table_fragments(
.post_create_table_fragments(
&table_fragments.table_id(),
dependent_table_actors,
)
Expand All @@ -474,11 +476,7 @@ where
self.snapshot_manager.pin(self.prev_epoch).await?;

// Extract the fragments that include source operators.
let source_fragments = {
let mut source_fragments = HashMap::new();
fetch_source_fragments(&mut source_fragments, table_fragments);
source_fragments
};
let source_fragments = table_fragments.source_fragments();

self.source_manager
.patch_update(
Expand Down Expand Up @@ -564,7 +562,15 @@ where
pub async fn pre_finish(&self) -> MetaResult<()> {
#[allow(clippy::single_match)]
match &self.command {
Command::CreateMaterializedView { .. } => {
Command::CreateMaterializedView {
table_fragments, ..
} => {
// Update the state of the table fragments from `Creating` to `Created`, so that the
// fragments can be scaled.
self.fragment_manager
.mark_table_fragments_created(table_fragments.table_id())
.await?;

// Since the compute node reports that the chain actors have caught up with the
// upstream and finished the creation, we can unpin the snapshot.
// TODO: we can unpin the snapshot earlier, when the snapshot ingestion is done.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::{HummockSstableId, LocalSstableInfo};
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::stream_plan::Barrier;
use risingwave_pb::stream_service::{
BarrierCompleteRequest, BarrierCompleteResponse, InjectBarrierRequest,
Expand Down
40 changes: 35 additions & 5 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use risingwave_common::types::ParallelUnitId;
use risingwave_common::{bail, try_match_expand};
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, State};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{Dispatcher, FragmentType, StreamActor, StreamNode};
use tokio::sync::{RwLock, RwLockReadGuard};
Expand Down Expand Up @@ -183,7 +184,7 @@ where
}

/// Start create a new `TableFragments` and insert it into meta store, currently the actors'
/// state is `ActorState::Inactive`.
/// state is `ActorState::Inactive` and the table fragments' state is `State::Creating`.
pub async fn start_create_table_fragments(
&self,
table_fragment: TableFragments,
Expand Down Expand Up @@ -217,16 +218,22 @@ where
Ok(())
}

/// Finish create a new `TableFragments` and update the actors' state to `ActorState::Running`,
/// besides also update all dependent tables' downstream actors info.
pub async fn finish_create_table_fragments(
/// Called after the barrier collection of `CreateMaterializedView` command, which updates the
/// actors' state to `ActorState::Running`, besides also updates all dependent tables'
/// downstream actors info.
///
/// Note that the table fragments' state will be kept `Creating`, which is only updated when the
/// materialized view is completely created.
pub async fn post_create_table_fragments(
&self,
table_id: &TableId,
dependent_table_actors: Vec<(TableId, HashMap<ActorId, Vec<Dispatcher>>)>,
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;

if let Some(table_fragments) = map.get(table_id) {
assert_eq!(table_fragments.state(), State::Creating);

let mut transaction = Transaction::default();

let mut table_fragments = table_fragments.clone();
Expand Down Expand Up @@ -265,6 +272,29 @@ where
}
}

/// Called after the finish of `CreateMaterializedView` command, i.e., materialized view is
/// completely created, which updates the state from `Creating` to `Created`.
pub async fn mark_table_fragments_created(&self, table_id: TableId) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;

if let Some(table_fragments) = map.get(&table_id) {
assert_eq!(table_fragments.state(), State::Creating);

let mut transaction = Transaction::default();

let mut table_fragments = table_fragments.clone();
table_fragments.set_state(State::Created);
table_fragments.upsert_in_transaction(&mut transaction)?;

self.env.meta_store().txn(transaction).await?;
map.insert(table_id, table_fragments);

Ok(())
} else {
bail!("table_fragment not exist: id={}", table_id)
}
}

/// Drop table fragments info and remove downstream actor infos in fragments from its dependent
/// tables.
pub async fn drop_table_fragments(&self, table_id: &TableId) -> MetaResult<()> {
Expand Down
48 changes: 45 additions & 3 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::util::is_stream_source;
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping};
use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus, Fragment};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::TableFragments as ProstTableFragments;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{FragmentType, SourceNode, StreamActor, StreamNode};

use super::{ActorId, FragmentId};
use crate::manager::WorkerId;
use crate::manager::{SourceId, WorkerId};
use crate::model::{MetadataModel, MetadataModelResult};

/// Column family name for table fragments.
Expand All @@ -39,6 +41,9 @@ pub struct TableFragments {
/// The table id.
table_id: TableId,

/// The state of the table fragments.
state: State,

/// The table fragments.
pub(crate) fragments: BTreeMap<FragmentId, Fragment>,

Expand All @@ -57,6 +62,7 @@ impl MetadataModel for TableFragments {
fn to_protobuf(&self) -> Self::ProstType {
Self::ProstType {
table_id: self.table_id.table_id(),
state: self.state as _,
fragments: self.fragments.clone().into_iter().collect(),
actor_status: self.actor_status.clone().into_iter().collect(),
}
Expand All @@ -65,6 +71,7 @@ impl MetadataModel for TableFragments {
fn from_protobuf(prost: Self::ProstType) -> Self {
Self {
table_id: TableId::new(prost.table_id),
state: prost.state(),
fragments: prost.fragments.into_iter().collect(),
actor_status: prost.actor_status.into_iter().collect(),
}
Expand All @@ -76,9 +83,11 @@ impl MetadataModel for TableFragments {
}

impl TableFragments {
/// Create a new `TableFragments` with state of `Creating`.
pub fn new(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
Self {
table_id,
state: State::Creating,
fragments,
actor_status: BTreeMap::default(),
}
Expand All @@ -102,6 +111,16 @@ impl TableFragments {
self.table_id
}

/// Returns the state of the table fragments.
pub fn state(&self) -> State {
self.state
}

/// Set the state of the table fragments.
pub fn set_state(&mut self, state: State) {
self.state = state;
}

/// Returns sink fragment vnode mapping.
/// Note that: the real sink fragment is also stored as `TableFragments`, it's possible that
/// there's no fragment with `FragmentType::Sink` exists.
Expand Down Expand Up @@ -191,6 +210,29 @@ impl TableFragments {
None
}

/// Extract the fragments that include source operators, grouping by source id.
pub fn source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
let mut source_fragments = HashMap::new();

for fragment in self.fragments() {
for actor in &fragment.actors {
if let Some(source_id) =
TableFragments::find_source_node(actor.nodes.as_ref().unwrap())
.filter(|s| is_stream_source(s))
.map(|s| s.source_id)
{
source_fragments
.entry(source_id)
.or_insert(BTreeSet::new())
.insert(fragment.fragment_id as FragmentId);

break;
}
}
}
source_fragments
}

/// Returns actors that contains Chain node.
pub fn chain_actor_ids(&self) -> HashSet<ActorId> {
self.fragments
Expand Down
Loading