Skip to content

Commit

Permalink
create mv with actor group
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 8, 2023
1 parent 211e991 commit 8841892
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 105 deletions.
2 changes: 2 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ message WorkerNode {
repeated ParallelUnit parallel_units = 5;
Property property = 6;

// uint32 parallelism = 8;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
optional uint32 transactional_id = 7;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ message TableFragments {
common.ParallelUnitMapping vnode_mapping = 5;

uint32 fragment_group_id = 8;
stream_plan.ActorMapping actor_group_mapping = 9;
stream_plan.ActorMapping actor_group_mapping = 9; // TODO: this can be recorded in other places

repeated uint32 state_table_ids = 6;
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
Expand Down
10 changes: 9 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,15 @@ impl GlobalBarrierManager {
.cluster_manager
.list_active_streaming_compute_nodes()
.await;
let all_actor_infos = self.fragment_manager.load_all_actors(check_state).await;
let assignments = self
.cluster_manager
.get_streaming_cluster_info()
.await
.assignments;
let all_actor_infos = self
.fragment_manager
.load_all_actors_2(&assignments, check_state)
.await;

let info = BarrierActorInfo::resolve(all_nodes, all_actor_infos);

Expand Down
49 changes: 48 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::hash::{ActorGroupId, ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_connector::source::SplitImpl;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand Down Expand Up @@ -183,6 +183,11 @@ impl FragmentManager {
!self.core.read().await.table_fragments.is_empty()
}

async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) {
// TODO: notify actor group mapping
}

#[cfg(any())]
async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) {
// Notify all fragment mapping to frontend nodes
for fragment in table_fragment.fragments.values() {
Expand Down Expand Up @@ -564,6 +569,7 @@ impl FragmentManager {

/// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or
/// collected
#[deprecated]
pub async fn load_all_actors(
&self,
check_state: impl Fn(ActorState, TableId, ActorId) -> bool,
Expand Down Expand Up @@ -603,6 +609,47 @@ impl FragmentManager {
}
}

/// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or
/// collected
pub async fn load_all_actors_2(
&self,
assignments: &HashMap<ActorGroupId, WorkerId>,
check_state: impl Fn(ActorState, TableId, ActorId) -> bool,
) -> ActorInfos {
let mut actor_maps = HashMap::new();
let mut barrier_inject_actor_maps = HashMap::new();

let map = &self.core.read().await.table_fragments;
for fragments in map.values() {
let table_id = fragments.table_id();
let barrier_inject_actors = fragments.barrier_inject_actor_ids();

for actor in fragments.fragments.values().flat_map(|f| &f.actors) {
let actor_state = fragments.actor_status[&actor.actor_id].state();
if check_state(actor_state, table_id, actor.actor_id) {
let worker_id = assignments[&actor.actor_group_id];

actor_maps
.entry(worker_id)
.or_insert_with(Vec::new)
.push(actor.actor_id);

if barrier_inject_actors.contains(&actor.actor_id) {
barrier_inject_actor_maps
.entry(worker_id)
.or_insert_with(Vec::new)
.push(actor.actor_id);
}
}
}
}

ActorInfos {
actor_maps,
barrier_inject_actor_maps,
}
}

async fn migrate_fragment_actors_inner(
&self,
migration_plan: &MigrationPlan,
Expand Down
20 changes: 18 additions & 2 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use itertools::Itertools;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::hash::{ActorGroupId, ParallelUnitId};
use risingwave_pb::common::worker_node::{Property, State};
use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType};
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
Expand Down Expand Up @@ -277,6 +277,16 @@ impl ClusterManager {
Ok(())
}

pub async fn record_new_assignments(&self, assignments: &HashMap<ActorGroupId, WorkerId>) {
let mut core = self.core.write().await;

for (&k, &v) in assignments {
if let Some(old_v) = core.assignments.insert(k, v) {
assert_eq!(old_v, v);
}
}
}

pub async fn delete_worker_node(&self, host_address: HostAddress) -> MetaResult<WorkerType> {
let mut core = self.core.write().await;
let worker = core.get_worker_by_host_checked(host_address.clone())?;
Expand Down Expand Up @@ -494,7 +504,9 @@ impl ClusterManager {
#[derive(Debug, Clone)]
pub struct StreamingClusterInfo {
/// All **active** compute nodes in the cluster.
pub worker_nodes: HashMap<u32, WorkerNode>,
pub worker_nodes: HashMap<WorkerId, WorkerNode>,

pub assignments: HashMap<ActorGroupId, WorkerId>,

/// All parallel units of the **active** compute nodes in the cluster.
pub parallel_units: HashMap<ParallelUnitId, ParallelUnit>,
Expand All @@ -507,6 +519,8 @@ pub struct ClusterManagerCore {
/// Record for workers in the cluster.
workers: HashMap<WorkerKey, Worker>,

assignments: HashMap<ActorGroupId, WorkerId>, // TODO: should we persist this?

/// Record for tracking available machine ids, one is available.
available_transactional_ids: VecDeque<u32>,
}
Expand Down Expand Up @@ -573,6 +587,7 @@ impl ClusterManagerCore {
.into_iter()
.map(|w| (WorkerKey(w.key().unwrap()), w))
.collect(),
assignments: Default::default(), // TODO: recover
available_transactional_ids,
})
}
Expand Down Expand Up @@ -693,6 +708,7 @@ impl ClusterManagerCore {
worker_nodes: active_workers,
parallel_units: active_parallel_units,
unschedulable_parallel_units,
assignments: self.assignments.clone(),
}
}

Expand Down
33 changes: 21 additions & 12 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,38 @@ impl MetadataModel for TableFragments {
impl TableFragments {
/// Create a new `TableFragments` with state of `Initial`, with other fields empty.
pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
Self::new(
table_id,
fragments,
&BTreeMap::new(),
StreamEnvironment::default(),
)
Self::new(table_id, fragments, StreamEnvironment::default())
}

/// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
/// `Inactive` on the given parallel units.
pub fn new(
table_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, ParallelUnit>,
// actor_locations: &BTreeMap<ActorId, ParallelUnit>,
env: StreamEnvironment,
) -> Self {
let actor_status = actor_locations
.iter()
.map(|(&actor_id, parallel_unit)| {
// let actor_status = actor_locations
// .iter()
// .map(|(&actor_id, parallel_unit)| {
// (
// actor_id,
// ActorStatus {
// parallel_unit: Some(parallel_unit.clone()),
// state: ActorState::Inactive as i32,
// },
// )
// })
// .collect();

let actor_status = fragments
.values()
.flat_map(|f| &f.actors)
.map(|a| {
(
actor_id,
a.actor_id,
ActorStatus {
parallel_unit: Some(parallel_unit.clone()),
parallel_unit: None,
state: ActorState::Inactive as i32,
},
)
Expand Down
22 changes: 12 additions & 10 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::VirtualNode;
use risingwave_common::hash::{ActorGroupId, VirtualNode};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
Expand All @@ -36,7 +37,8 @@ use crate::barrier::BarrierManagerRef;
use crate::manager::{
CatalogManagerRef, ClusterManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId,
IdCategory, IndexId, LocalNotification, MetaSrvEnv, NotificationVersion, RelationIdEnum,
SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, ViewId,
SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, ViewId, WorkerId,
WorkerLocations,
};
use crate::model::{StreamEnvironment, TableFragments};
use crate::rpc::cloud_provider::AwsEc2Client;
Expand Down Expand Up @@ -101,6 +103,12 @@ pub enum DdlCommand {
DropConnection(ConnectionId),
}

pub struct NewClusterStreamingInfo {
pub assignments: BTreeMap<ActorGroupId, WorkerId>,

pub worker_locations: WorkerLocations,
}

#[derive(Clone)]
pub struct DdlController {
env: MetaSrvEnv,
Expand Down Expand Up @@ -624,8 +632,7 @@ impl DdlController {
// 3. Build the table fragments structure that will be persisted in the stream manager,
// and the context that contains all information needed for building the
// actors on the compute nodes.
let table_fragments =
TableFragments::new(id.into(), graph, &building_locations.actor_locations, env);
let table_fragments = TableFragments::new(id.into(), graph, env);

let ctx = CreateStreamingJobContext {
dispatchers,
Expand Down Expand Up @@ -929,12 +936,7 @@ impl DdlController {
// 4. Build the table fragments structure that will be persisted in the stream manager, and
// the context that contains all information needed for building the actors on the compute
// nodes.
let table_fragments = TableFragments::new(
dummy_id.into(),
graph,
&building_locations.actor_locations,
env,
);
let table_fragments = TableFragments::new(dummy_id.into(), graph, env);

let ctx = ReplaceTableContext {
old_table_fragments,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ mod schedule;

pub use actor::{ActorGraphBuildResult, ActorGraphBuilder};
pub use fragment::{CompleteStreamFragmentGraph, StreamFragmentGraph};
pub use group_schedule::LocationsV2;
pub use schedule::Locations;
Loading

0 comments on commit 8841892

Please sign in to comment.