Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): simplify post-collect and do the logic in where schedule the command #15106

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 9 additions & 175 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use risingwave_pb::stream_plan::{
ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;

use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
use super::trace::TracedEpoch;
Expand Down Expand Up @@ -787,16 +786,7 @@ impl CommandContext {

Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
self.barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
self.barrier_manager_context
.source_manager
.apply_source_change(None, Some(split_assignment.clone()), None)
.await;
}
Command::SourceSplitAssignment(_) => {}

Command::DropStreamingJobs(actors) => {
// Tell compute nodes to drop actors.
Expand All @@ -823,191 +813,35 @@ impl CommandContext {
.hummock_manager
.unregister_table_ids_fail_fast(&table_ids)
.await;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
// The logic is the same as above, for hummock_manager.unregister_table_ids.
if let Err(e) = mgr
.catalog_manager
.cancel_create_table_procedure(
table_fragments.table_id().table_id,
table_fragments.internal_table_ids(),
)
.await
{
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
// If any table is, just panic, let meta do bootstrap recovery.
// Otherwise our persisted state is dirty.
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
mgr.catalog_manager.assert_tables_deleted(table_ids).await;
}

// We need to drop table fragments here,
// since this is not done in stream manager (foreground ddl)
// OR barrier manager (background ddl)
mgr.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(
table_fragments.table_id(),
)))
.await?;
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.try_abort_creating_streaming_job(table_id as _, true)
.await?;
}
}
}

Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_mview_actors,
init_split_assignment,
definition: _,
replace_table,
..
} => {
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
let mut dependent_table_actors =
Vec::with_capacity(upstream_mview_actors.len());
for (table_id, actors) in upstream_mview_actors {
let downstream_actors = dispatchers
.iter()
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(&k, v)| (k, v.clone()))
.collect();
dependent_table_actors.push((*table_id, downstream_actors));
}
mgr.fragment_manager
.post_create_table_fragments(
&table_fragments.table_id(),
dependent_table_actors,
init_split_assignment.clone(),
)
.await?;

if let Some(ReplaceTablePlan {
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment,
}) = replace_table
{
self.clean_up(old_table_fragments.actor_ids()).await?;

// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
}
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.post_collect_table_fragments(
table_fragments.table_id().table_id as _,
table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
if let Some(ReplaceTablePlan {
old_table_fragments,
..
}) = &replace_table
{
self.clean_up(old_table_fragments.actor_ids()).await?;
}

// Extract the fragments that include source operators.
let source_fragments = table_fragments.stream_source_fragments();

self.barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(init_split_assignment.clone()),
None,
)
.await;
}

Command::RescheduleFragment {
reschedules,
table_parallelism,
} => {
Command::RescheduleFragment { reschedules, .. } => {
let removed_actors = reschedules
.values()
.flat_map(|reschedule| reschedule.removed_actors.clone().into_iter())
.collect_vec();
self.clean_up(removed_actors).await?;
self.barrier_manager_context
.scale_controller
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
}

Command::ReplaceTable(ReplaceTablePlan {
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment,
..
}) => {
self.clean_up(old_table_fragments.actor_ids()).await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
}
MetadataManager::V2(mgr) => {
// Update actors and actor_dispatchers for new table fragments.
mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
}

// Apply the split changes in source manager.
self.barrier_manager_context
.source_manager
.drop_source_fragments(std::slice::from_ref(old_table_fragments))
.await;
let source_fragments = new_table_fragments.stream_source_fragments();
self.barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(init_split_assignment.clone()),
None,
)
.await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ impl CatalogManager {
}
}

pub async fn assert_tables_deleted(&self, table_ids: Vec<TableId>) {
pub async fn assert_tables_deleted(&self, table_ids: impl Iterator<Item = TableId>) {
let core = self.core.lock().await;
let tables = &core.database.tables;
for id in table_ids {
Expand Down
49 changes: 49 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::iter::once;

use risingwave_common::catalog::{TableId, TableOption};
use risingwave_meta_model_v2::SourceId;
Expand All @@ -22,6 +23,7 @@ use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment};
use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tracing::warn;

Expand Down Expand Up @@ -727,4 +729,51 @@ impl MetadataManager {
}
}
}

pub async fn cancel_streaming_job(&self, table_fragments: TableFragments) -> MetaResult<()> {
let table_id = table_fragments.table_id();
let internal_table_ids = table_fragments.internal_table_ids();
match self {
MetadataManager::V1(mgr) => {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
// The logic is the same as above, for hummock_manager.unregister_table_ids.
if let Err(e) = mgr
.catalog_manager
.cancel_create_table_procedure(table_id.table_id, internal_table_ids.clone())
.await
{
warn!(
table_id = table_id.table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
// If any table is, just panic, let meta do bootstrap recovery.
// Otherwise our persisted state is dirty.
mgr.catalog_manager
.assert_tables_deleted(
internal_table_ids
.iter()
.cloned()
.chain(once(table_id.table_id)),
)
.await;
}

// We need to drop table fragments here,
// since this is not done in stream manager (foreground ddl)
// OR barrier manager (background ddl)
mgr.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(once(table_id)))
.await
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.try_abort_creating_streaming_job(table_id.table_id as _, true)
.await?;
Ok(())
}
}
}
}
10 changes: 8 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2496,9 +2496,11 @@ impl GlobalStreamManager {

tracing::debug!("reschedule plan: {:?}", reschedule_fragment);

let table_parallelism = table_parallelism.unwrap_or_default();

let command = Command::RescheduleFragment {
reschedules: reschedule_fragment,
table_parallelism: table_parallelism.unwrap_or_default(),
reschedules: reschedule_fragment.clone(),
table_parallelism: table_parallelism.clone(),
};

match &self.metadata_manager {
Expand All @@ -2523,6 +2525,10 @@ impl GlobalStreamManager {
.run_config_change_command_with_pause(command)
.await?;

self.scale_controller
.post_apply_reschedule(&reschedule_fragment, &table_parallelism)
.await?;

tracing::info!("reschedule done");

Ok(())
Expand Down
13 changes: 11 additions & 2 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct SourceManager {
pub paused: Mutex<()>,
env: MetaSrvEnv,
barrier_scheduler: BarrierScheduler,
metadata_manager: MetadataManager,
core: Mutex<SourceManagerCore>,
metrics: Arc<MetaMetrics>,
}
Expand Down Expand Up @@ -598,7 +599,7 @@ impl SourceManager {
}

let core = Mutex::new(SourceManagerCore::new(
metadata_manager,
metadata_manager.clone(),
managed_sources,
source_fragments,
actor_splits,
Expand All @@ -607,6 +608,7 @@ impl SourceManager {
Ok(Self {
env,
barrier_scheduler,
metadata_manager,
core,
paused: Mutex::new(()),
metrics,
Expand Down Expand Up @@ -923,9 +925,16 @@ impl SourceManager {
};

if !split_assignment.is_empty() {
let command = Command::SourceSplitAssignment(split_assignment);
let command = Command::SourceSplitAssignment(split_assignment.clone());
tracing::info!(command = ?command, "pushing down split assignment command");
self.barrier_scheduler.run_command(command).await?;

self.metadata_manager
.update_actor_splits_by_split_assignment(&split_assignment)
.await?;

self.apply_source_change(None, Some(split_assignment.clone()), None)
.await;
}

Ok(())
Expand Down
Loading
Loading