From ec22e86f4f10d5488c533e88ff4efb81e85c5996 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 8 Jul 2022 23:25:12 +0800 Subject: [PATCH] refactor(meta): improve create mview / scheduler readability (#3748) * refactor(meta): improve create mview / scheduler readability Signed-off-by: Bugen Zhao * add some comments for hash mapping Signed-off-by: Bugen Zhao * refine docs Signed-off-by: Bugen Zhao * make yuanxin happy Signed-off-by: Bugen Zhao Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- src/meta/src/lib.rs | 1 + src/meta/src/manager/hash_mapping.rs | 17 +- src/meta/src/stream/meta.rs | 1 + src/meta/src/stream/scheduler.rs | 164 ++++++------- src/meta/src/stream/stream_manager.rs | 336 ++++++++++++++------------ 5 files changed, 263 insertions(+), 256 deletions(-) diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index e07a31e584ede..0ab7b95366403 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -36,6 +36,7 @@ #![feature(drain_filter)] #![feature(custom_test_frameworks)] #![feature(lint_reasons)] +#![feature(map_try_insert)] #![cfg_attr(coverage, feature(no_coverage))] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] diff --git a/src/meta/src/manager/hash_mapping.rs b/src/meta/src/manager/hash_mapping.rs index a84834120b16b..139f4a7f673ed 100644 --- a/src/meta/src/manager/hash_mapping.rs +++ b/src/meta/src/manager/hash_mapping.rs @@ -88,16 +88,6 @@ impl HashMappingManager { .map(|info| info.vnode_mapping.clone()) } - pub fn set_need_consolidation(&self, newflag: bool) { - let mut core = self.core.lock(); - core.need_sst_consolidation = newflag; - } - - pub fn get_need_consolidation(&self) -> bool { - let core = self.core.lock(); - core.need_sst_consolidation - } - /// For test. #[cfg(test)] fn get_fragment_mapping_info(&self, fragment_id: &FragmentId) -> Option { @@ -121,7 +111,6 @@ struct HashMappingInfo { } struct HashMappingManagerCore { - need_sst_consolidation: bool, /// Mapping from fragment to hash mapping information. One fragment will have exactly one vnode /// mapping, which describes the data distribution of the fragment. hash_mapping_infos: HashMap, @@ -132,12 +121,15 @@ struct HashMappingManagerCore { impl HashMappingManagerCore { fn new() -> Self { Self { - need_sst_consolidation: true, hash_mapping_infos: HashMap::new(), state_table_fragment_mapping: HashMap::new(), } } + /// Build a [`HashMappingInfo`] and record it for the fragment based on given `parallel_units`. + /// + /// For example, if `parallel_units` is `[0, 1, 2]`, and the total vnode count is 10, we'll + /// generate mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`. fn build_fragment_hash_mapping( &mut self, fragment_id: FragmentId, @@ -187,6 +179,7 @@ impl HashMappingManagerCore { vnode_mapping } + /// Construct a [`HashMappingInfo`] with given `vnode_mapping` and record it for the fragment. fn set_fragment_hash_mapping( &mut self, fragment_id: FragmentId, diff --git a/src/meta/src/stream/meta.rs b/src/meta/src/stream/meta.rs index 72cbefa2506d1..0f4fac6774a0a 100644 --- a/src/meta/src/stream/meta.rs +++ b/src/meta/src/stream/meta.rs @@ -76,6 +76,7 @@ where .map(|tf| (tf.table_id(), tf)) .collect(); + // Extract vnode mapping info from listed `table_fragments` to hash mapping manager. Self::restore_vnode_mappings(env.hash_mapping_manager_ref(), &table_fragments)?; Ok(Self { diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index 527135a497327..d02851bf910f8 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -13,14 +13,18 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; +use std::iter::empty; use std::sync::atomic::{AtomicUsize, Ordering}; +use anyhow::anyhow; +use risingwave_common::bail; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::error::ErrorCode::InternalError; -use risingwave_common::error::{internal_error, Result}; +use risingwave_common::error::Result; use risingwave_common::types::VIRTUAL_NODE_COUNT; use risingwave_common::util::compress::compress_data; -use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType}; +use risingwave_pb::common::{ + ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType, WorkerNode, +}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::Fragment; @@ -38,38 +42,44 @@ pub struct Scheduler { /// Round robin counter for singleton fragments single_rr: AtomicUsize, } + /// [`ScheduledLocations`] represents the location of scheduled result. pub struct ScheduledLocations { /// actor location map. pub actor_locations: BTreeMap, /// worker location map. - pub node_locations: WorkerLocations, + pub worker_locations: WorkerLocations, } impl ScheduledLocations { + #[cfg_attr(not(test), expect(dead_code))] pub fn new() -> Self { + Self::with_workers(empty()) + } + + pub fn with_workers(workers: impl IntoIterator) -> Self { Self { - actor_locations: BTreeMap::new(), - node_locations: HashMap::new(), + actor_locations: Default::default(), + worker_locations: workers.into_iter().map(|w| (w.id, w)).collect(), } } - /// [`Self::node_actors`] returns all actors for every node. - pub fn node_actors(&self) -> HashMap> { - let mut node_actors = HashMap::new(); + /// Returns all actors for every worker node. + pub fn worker_actors(&self) -> HashMap> { + let mut worker_actors = HashMap::new(); self.actor_locations .iter() .for_each(|(actor_id, parallel_unit)| { - node_actors + worker_actors .entry(parallel_unit.worker_node_id) .or_insert_with(Vec::new) .push(*actor_id); }); - node_actors + worker_actors } - /// [`Self::actor_info_map`] returns the `ActorInfo` map for every actor. + /// Returns the `ActorInfo` map for every actor. pub fn actor_info_map(&self) -> HashMap { self.actor_locations .iter() @@ -78,7 +88,7 @@ impl ScheduledLocations { *actor_id, ActorInfo { actor_id: *actor_id, - host: self.node_locations[¶llel_unit.worker_node_id] + host: self.worker_locations[¶llel_unit.worker_node_id] .host .clone(), }, @@ -87,40 +97,37 @@ impl ScheduledLocations { .collect::>() } - /// [`Self::actor_infos`] returns the `ActorInfo` slice. - pub fn actor_infos(&self) -> Vec { + /// Returns an iterator of `ActorInfo`. + pub fn actor_infos(&self) -> impl Iterator + '_ { self.actor_locations .iter() .map(|(actor_id, parallel_unit)| ActorInfo { actor_id: *actor_id, - host: self.node_locations[¶llel_unit.worker_node_id] + host: self.worker_locations[¶llel_unit.worker_node_id] .host .clone(), }) - .collect::>() } - /// Find a placement location that is on the same node of given actor ids. + /// Find a placement location that is on the same worker node of given actor ids. pub fn schedule_colocate_with(&self, actor_ids: &[ActorId]) -> Result { let mut result_location = None; for actor_id in actor_ids { - let location = self.actor_locations.get(actor_id); - if let Some(location) = location { - if result_location.is_none() { - result_location = Some(location.clone()); - } else if location != result_location.as_ref().unwrap() { - return Err(internal_error(format!( + let location = self + .actor_locations + .get(actor_id) + .ok_or_else(|| anyhow!("actor location not found: {}", actor_id))?; + match &result_location { + None => result_location = Some(location.clone()), + Some(result_location) if result_location != location => { + bail!( "cannot satisfy placement rule: {} is at {:?}, while others are on {:?}", actor_id, location, - result_location.as_ref().unwrap() - ))); + result_location + ); } - } else { - return Err(internal_error(format!( - "actor location not found: {}", - actor_id - ))); + _ => {} } } Ok(result_location.unwrap()) @@ -154,51 +161,40 @@ where locations: &mut ScheduledLocations, ) -> Result<()> { if fragment.actors.is_empty() { - return Err(InternalError("fragment has no actor".to_string()).into()); + bail!("fragment has no actor"); } - self.hash_mapping_manager.set_need_consolidation(true); - if fragment.distribution_type == FragmentDistributionType::Single as i32 { // Singleton fragment let actor = &fragment.actors[0]; - if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() { - // Schedule the fragment to the same parallel unit as upstream. - let parallel_unit = locations.schedule_colocate_with(&actor.upstream_actor_id)?; + let parallel_unit = + if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() { + // Schedule the fragment to the same parallel unit as upstream. + locations.schedule_colocate_with(&actor.upstream_actor_id)? + } else { + // Choose one parallel unit to schedule from single parallel units. + let single_parallel_units = self + .cluster_manager + .list_parallel_units(Some(ParallelUnitType::Single)) + .await; + let single_idx = self + .single_rr + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| { + Some((idx + 1) % single_parallel_units.len()) + }) + .unwrap(); + + single_parallel_units[single_idx].clone() + }; - // Build vnode mapping. - self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?; + // Build vnode mapping. However, we'll leave vnode field of actors unset for singletons. + self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?; - // Record actor locations. - locations - .actor_locations - .insert(fragment.actors[0].actor_id, parallel_unit); - } else { - // Choose one parallel unit to schedule from single parallel units. - let single_parallel_units = self - .cluster_manager - .list_parallel_units(Some(ParallelUnitType::Single)) - .await; - let single_idx = self - .single_rr - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| { - Some((idx + 1) % single_parallel_units.len()) - }) - .map_err(|_| internal_error("failed to round robin id"))?; - - // Build vnode mapping. - self.set_fragment_vnode_mapping( - fragment, - &[single_parallel_units[single_idx].clone()], - )?; - - // Record actor locations. - locations.actor_locations.insert( - fragment.actors[0].actor_id, - single_parallel_units[single_idx].clone(), - ); - } + // Record actor locations. + locations + .actor_locations + .insert(fragment.actors[0].actor_id, parallel_unit); } else { // Normal fragment @@ -237,26 +233,18 @@ where // Record actor locations and set vnodes into the actors. for (idx, actor) in fragment.actors.iter_mut().enumerate() { - if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() { - let parallel_unit = - locations.schedule_colocate_with(&actor.upstream_actor_id)?; - actor.vnode_bitmap = - Some(vnode_bitmaps.get(¶llel_unit.id).unwrap().to_protobuf()); - locations - .actor_locations - .insert(actor.actor_id, parallel_unit); - } else { - actor.vnode_bitmap = Some( - vnode_bitmaps - .get(¶llel_units[idx % parallel_units.len()].id) - .unwrap() - .to_protobuf(), - ); - locations.actor_locations.insert( - actor.actor_id, - parallel_units[idx % parallel_units.len()].clone(), - ); - } + let parallel_unit = + if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() { + locations.schedule_colocate_with(&actor.upstream_actor_id)? + } else { + parallel_units[idx % parallel_units.len()].clone() + }; + + actor.vnode_bitmap = + Some(vnode_bitmaps.get(¶llel_unit.id).unwrap().to_protobuf()); + locations + .actor_locations + .insert(actor.actor_id, parallel_unit); } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 52fd817cfb331..084ad9ea35eef 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -16,8 +16,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::catalog::TableId; -use risingwave_common::error::{internal_error, Result}; +use risingwave_common::error::Result; use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT}; use risingwave_pb::catalog::{Source, Table}; use risingwave_pb::common::{ActorInfo, ParallelUnitMapping, WorkerType}; @@ -290,37 +291,43 @@ where vec![], |revert_funcs: Vec>| { tokio::spawn(async move { - for revert_func in revert_funcs { + for revert_func in revert_funcs.into_iter().rev() { revert_func.await; } }); }, ); - let nodes = self - .cluster_manager - .list_worker_node( - WorkerType::ComputeNode, - Some(risingwave_pb::common::worker_node::State::Running), - ) - .await; - if nodes.is_empty() { - return Err(internal_error("no available compute node in the cluster")); - } + // Schedule actors to parallel units. `locations` will record the parallel unit that an + // actor is scheduled to, and the worker node this parallel unit is on. + let locations = { + // List all running worker nodes. + let workers = self + .cluster_manager + .list_worker_node( + WorkerType::ComputeNode, + Some(risingwave_pb::common::worker_node::State::Running), + ) + .await; + if workers.is_empty() { + bail!("no available compute node in the cluster"); + } - let mut locations = ScheduledLocations::new(); - locations.node_locations = nodes.into_iter().map(|node| (node.id, node)).collect(); + // Create empty locations. + let mut locations = ScheduledLocations::with_workers(workers); - let topological_order = table_fragments.generate_topological_order(); + // Schedule each fragment(actors) to nodes, recorded in `locations`. + // Vnode mapping in fragment will be filled in as well. + let topological_order = table_fragments.generate_topological_order(); + for fragment_id in topological_order { + let fragment = table_fragments.fragments.get_mut(&fragment_id).unwrap(); + self.scheduler.schedule(fragment, &mut locations).await?; + } - // Schedule each fragment(actors) to nodes. Vnode mapping in fragment will be filled in - // as well. - for fragment_id in topological_order { - let fragment = table_fragments.fragments.get_mut(&fragment_id).unwrap(); - self.scheduler.schedule(fragment, &mut locations).await?; - } + locations + }; - // resolve chain node infos, including: + // Resolve chain node infos, including: // 1. insert upstream actor id in merge node // 2. insert parallel unit id in batch query node self.resolve_chain_node( @@ -332,92 +339,87 @@ where ) .await?; - // Verify whether all same_as_upstream constraints are satisfied. - // - // Currently, the scheduler (when there's no scale-in or scale-out) will always schedule - // chain node on the same node as upstreams. However, this constraint will easily be broken - // if parallel units are not aligned between upstream nodes. - - // Record actor -> fragment mapping for finding out downstream fragments. - let mut actor_to_vnode_mapping = HashMap::new(); - for fragment in table_fragments.fragments.values() { - for actor in &fragment.actors { - actor_to_vnode_mapping.insert(actor.actor_id, fragment.vnode_mapping.clone()); + // Record vnode to parallel unit mapping for actors. + let actor_to_vnode_mapping = { + let mut mapping = HashMap::new(); + for fragment in table_fragments.fragments.values() { + for actor in &fragment.actors { + mapping + .try_insert(actor.actor_id, fragment.vnode_mapping.clone()) + .unwrap(); + } } - } + mapping + }; // Fill hash dispatcher's mapping with scheduled locations. - table_fragments - .fragments - .iter_mut() - .for_each(|(_, fragment)| { - fragment.actors.iter_mut().for_each(|actor| { - actor.dispatcher.iter_mut().for_each(|dispatcher| { - if dispatcher.get_type().unwrap() == DispatcherType::Hash { - let downstream_actor_id = - dispatcher.downstream_actor_id.first().unwrap_or_else(|| { - panic!( - "hash dispatcher should have at least one downstream actor" - ); - }); - let hash_mapping = actor_to_vnode_mapping - .get(downstream_actor_id) - .unwrap() - .as_ref() - .unwrap_or_else(|| { - panic!( - "actor {} should have a vnode mapping", - downstream_actor_id - ); - }); - - let downstream_actors = &dispatcher.downstream_actor_id; - - // `self.hash_parallel_count` as the number of its downstream actors. - // However, since the frontend optimizer is still WIP, there exists some - // unoptimized situation where a hash dispatcher has ONLY ONE downstream - // actor, which makes it behave like a simple dispatcher. As a - // workaround, we specially compute the consistent hash mapping here. - // The `if` branch could be removed after the optimizer has been fully - // implemented. - if downstream_actors.len() == 1 { - dispatcher.hash_mapping = Some(ActorMapping { - original_indices: vec![VIRTUAL_NODE_COUNT as u64 - 1], - data: vec![downstream_actors[0]], - }); - } else { - // extract "parallel unit -> downstream actor" mapping from - // locations. - let parallel_unit_actor_map = downstream_actors - .iter() - .map(|actor_id| { - ( - locations.actor_locations.get(actor_id).unwrap().id, - *actor_id, - ) - }) - .collect::>(); - let ParallelUnitMapping { - original_indices, - data, - .. - } = hash_mapping; - let data = data - .iter() - .map(|parallel_unit_id| { - parallel_unit_actor_map[parallel_unit_id] - }) - .collect_vec(); - dispatcher.hash_mapping = Some(ActorMapping { - original_indices: original_indices.clone(), - data, - }); - }; - } - }); - }) - }); + for fragment in table_fragments.fragments.values_mut() { + // Filter out hash dispatchers in this fragment. + let dispatchers = fragment + .actors + .iter_mut() + .flat_map(|actor| actor.dispatcher.iter_mut()) + .filter(|d| d.get_type().unwrap() == DispatcherType::Hash); + + for dispatcher in dispatchers { + match dispatcher.downstream_actor_id.as_slice() { + [] => panic!("hash dispatcher should have at least one downstream actor"), + + // There exists some unoptimized situation where a hash dispatcher has ONLY ONE + // downstream actor, which makes it behave like a simple dispatcher. As a + // workaround, we specially compute the consistent hash mapping here. + // This arm could be removed after the optimizer has been fully implemented. + &[single_downstream_actor] => { + dispatcher.hash_mapping = Some(ActorMapping { + original_indices: vec![VIRTUAL_NODE_COUNT as u64 - 1], + data: vec![single_downstream_actor], + }); + } + + // For normal cases, we can simply transform the mapping from downstream actors + // to current hash dispatchers. + downstream_actors @ &[first_downstream_actor, ..] => { + // All actors in the downstream fragment should have the same parallel unit + // mapping, find it with the first downstream actor. + let downstream_vnode_mapping = actor_to_vnode_mapping + .get(&first_downstream_actor) + .unwrap() + .as_ref() + .unwrap_or_else(|| { + panic!("no vnode mapping for actor {}", &first_downstream_actor); + }); + + // Mapping from the parallel unit to downstream actors. + let parallel_unit_actor_map = downstream_actors + .iter() + .map(|actor_id| { + ( + locations.actor_locations.get(actor_id).unwrap().id, + *actor_id, + ) + }) + .collect::>(); + + // Trasform the mapping of parallel unit to the mapping of actor. + let ParallelUnitMapping { + original_indices, + data, + .. + } = downstream_vnode_mapping; + let data = data + .iter() + .map(|parallel_unit_id| parallel_unit_actor_map[parallel_unit_id]) + .collect_vec(); + dispatcher.hash_mapping = Some(ActorMapping { + original_indices: original_indices.clone(), + data, + }); + } + } + } + } + // Mark the actors to be built as `State::Building`. let actor_info = locations .actor_locations .iter() @@ -431,27 +433,36 @@ where ) }) .collect(); - table_fragments.set_actor_status(actor_info); let actor_map = table_fragments.actor_map(); // Actors on each stream node will need to know where their upstream lies. `actor_info` - // includes such information. It contains: 1. actors in the current create - // materialized view request. 2. all upstream actors. - let mut actor_infos_to_broadcast = locations.actor_infos(); - actor_infos_to_broadcast.extend(upstream_node_actors.iter().flat_map( - |(node_id, upstreams)| { - upstreams.iter().map(|up_id| ActorInfo { - actor_id: *up_id, - host: locations.node_locations.get(node_id).unwrap().host.clone(), - }) - }, - )); + // includes such information. It contains: + // 1. actors in the current create-materialized-view request. + // 2. all upstream actors. + let actor_infos_to_broadcast = { + let current = locations.actor_infos(); + let upstream = upstream_node_actors + .iter() + .flat_map(|(node_id, upstreams)| { + upstreams.iter().map(|up_id| ActorInfo { + actor_id: *up_id, + host: locations + .worker_locations + .get(node_id) + .unwrap() + .host + .clone(), + }) + }); + current.chain(upstream).collect_vec() + }; let actor_host_infos = locations.actor_info_map(); - let node_actors = locations.node_actors(); + let node_actors = locations.worker_actors(); + // (upstream_actor_id, dispatcher_id) -> Vec let dispatches = dispatches .iter() .map(|(up_id, down_ids)| { @@ -470,42 +481,46 @@ where }) .collect::>(); - let up_id_to_down_info = dispatches - .iter() - .map(|((up_id, _dispatcher_id), down_info)| (*up_id, down_info.clone())) - .collect::>(); + // Hanging channels for each worker node. + let mut node_hanging_channels = { + // upstream_actor_id -> Vec + let up_id_to_down_info = dispatches + .iter() + .map(|((up_id, _dispatcher_id), down_info)| (*up_id, down_info)) + .collect::>(); - let mut node_hanging_channels = upstream_node_actors - .iter() - .map(|(node_id, up_ids)| { - ( - *node_id, - up_ids - .iter() - .flat_map(|up_id| { - up_id_to_down_info - .get(up_id) - .expect("expected dispatches info") - .iter() - .map(|down_info| HangingChannel { - upstream: Some(ActorInfo { - actor_id: *up_id, - host: None, - }), - downstream: Some(down_info.clone()), - }) - }) - .collect_vec(), - ) - }) - .collect::>(); + upstream_node_actors + .iter() + .map(|(node_id, up_ids)| { + ( + *node_id, + up_ids + .iter() + .flat_map(|up_id| { + up_id_to_down_info + .get(up_id) + .expect("expected dispatches info") + .iter() + .map(|down_info| HangingChannel { + upstream: Some(ActorInfo { + actor_id: *up_id, + host: None, + }), + downstream: Some(down_info.clone()), + }) + }) + .collect_vec(), + ) + }) + .collect::>() + }; // We send RPC request in two stages. // The first stage does 2 things: broadcast actor info, and send local actor ids to // different WorkerNodes. Such that each WorkerNode knows the overall actor // allocation, but not actually builds it. We initialize all channels in this stage. for (node_id, actors) in &node_actors { - let node = locations.node_locations.get(node_id).unwrap(); + let node = locations.worker_locations.get(node_id).unwrap(); let client = self.client_pool.get(node).await?; @@ -533,8 +548,9 @@ where .await?; } + // Build remaining hanging channels on compute nodes. for (node_id, hanging_channels) in node_hanging_channels { - let node = locations.node_locations.get(&node_id).unwrap(); + let node = locations.worker_locations.get(&node_id).unwrap(); let client = self.client_pool.get(node).await?; let request_id = Uuid::new_v4().to_string(); @@ -567,7 +583,7 @@ where // In the second stage, each [`WorkerNode`] builds local actors and connect them with // channels. for (node_id, actors) in node_actors { - let node = locations.node_locations.get(&node_id).unwrap(); + let node = locations.worker_locations.get(&node_id).unwrap(); let client = self.client_pool.get(node).await?; @@ -582,8 +598,12 @@ where .await?; } - let mut source_fragments = HashMap::new(); - fetch_source_fragments(&mut source_fragments, &table_fragments); + // Extract the fragments that include source operators. + let source_fragments = { + let mut source_fragments = HashMap::new(); + fetch_source_fragments(&mut source_fragments, &table_fragments); + source_fragments + }; // Add table fragments to meta store with state: `State::Creating`. self.fragment_manager @@ -610,12 +630,12 @@ where .cancel_create_table_fragments(&table_id) .await?; return Err(err); - } else { - self.source_manager - .patch_update(Some(source_fragments), Some(init_split_assignment)) - .await?; } + self.source_manager + .patch_update(Some(source_fragments), Some(init_split_assignment)) + .await?; + revert_funcs.clear(); Ok(()) } @@ -628,8 +648,12 @@ where .select_table_fragments_by_table_id(table_id) .await?; - let mut source_fragments = HashMap::new(); - fetch_source_fragments(&mut source_fragments, &table_fragments); + // Extract the fragments that include source operators. + let source_fragments = { + let mut source_fragments = HashMap::new(); + fetch_source_fragments(&mut source_fragments, &table_fragments); + source_fragments + }; self.barrier_manager .run_command(Command::DropMaterializedView(*table_id))