From 8a4db237bf88be53061f4291b8b136edef39544a Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 8 Mar 2024 12:54:55 +0800 Subject: [PATCH] fix(meta): cherry pick 15355 (#15503) --- e2e_test/batch/transaction/now.slt | 11 +++-- src/meta/src/barrier/command.rs | 9 ++-- src/meta/src/barrier/recovery.rs | 30 ++++++++---- .../manager/compaction_group_manager.rs | 12 ++--- src/meta/src/hummock/manager/worker.rs | 49 ++++++++----------- src/meta/src/manager/catalog/fragment.rs | 9 +++- src/meta/src/manager/notification.rs | 1 + src/meta/src/stream/stream_manager.rs | 26 +++++++--- 8 files changed, 85 insertions(+), 62 deletions(-) diff --git a/e2e_test/batch/transaction/now.slt b/e2e_test/batch/transaction/now.slt index 4f8d317f0426..e7da013a9cc1 100644 --- a/e2e_test/batch/transaction/now.slt +++ b/e2e_test/batch/transaction/now.slt @@ -33,11 +33,12 @@ except select * from mv; ---- -query T -select * from mv -except -select * from v; ----- +# https://github.com/risingwavelabs/risingwave/issues/15117 +# query T +# select * from mv +# except +# select * from v; +# ---- statement ok commit; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 86cc3fa07878..afeab58f9806 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,7 +41,7 @@ use thiserror_ext::AsReport; use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; use super::trace::TracedEpoch; use crate::barrier::GlobalBarrierManagerContext; -use crate::manager::{DdlType, MetadataManager, WorkerId}; +use crate::manager::{DdlType, LocalNotification, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; use crate::MetaResult; @@ -820,8 +820,11 @@ impl CommandContext { let mut table_ids = table_fragments.internal_table_ids(); table_ids.push(table_id); self.barrier_manager_context - .hummock_manager - .unregister_table_ids_fail_fast(&table_ids) + .env + .notification_manager() + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + table_ids, + )) .await; match &self.barrier_manager_context.metadata_manager { diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 48b91c5e1693..0d167fbaeddd 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -41,7 +41,7 @@ use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{Command, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; -use crate::manager::{MetadataManager, WorkerId}; +use crate::manager::{LocalNotification, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; @@ -83,13 +83,17 @@ impl GlobalBarrierManagerContext { .collect(); debug!("clean dirty table fragments: {:?}", to_drop_streaming_ids); - mgr.fragment_manager + let unregister_table_ids = mgr + .fragment_manager .drop_table_fragments_vec(&to_drop_streaming_ids) .await?; // unregister compaction group for dirty table fragments. - self.hummock_manager - .unregister_table_fragments_vec(&to_drop_table_fragments) + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + unregister_table_ids, + )) .await; // clean up source connector dirty changes. @@ -105,13 +109,14 @@ impl GlobalBarrierManagerContext { } = mgr.catalog_controller.clean_dirty_creating_jobs().await?; // unregister compaction group for cleaned state tables. - self.hummock_manager - .unregister_table_ids_fail_fast( - &state_table_ids + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + state_table_ids .into_iter() .map(|id| id as StateTableId) .collect_vec(), - ) + )) .await; // unregister cleaned sources. @@ -307,9 +312,16 @@ impl GlobalBarrierManagerContext { if !cancelled.is_empty() { match &self.metadata_manager { MetadataManager::V1(mgr) => { - mgr.fragment_manager + let unregister_table_ids = mgr + .fragment_manager .drop_table_fragments_vec(&cancelled) .await?; + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + unregister_table_ids, + )) + .await; } MetadataManager::V2(mgr) => { for job_id in cancelled { diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 8db45a228647..1a4ecf9539ab 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -53,7 +53,7 @@ use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id} use crate::manager::MetaSrvEnv; use crate::model::{ BTreeMapEntryTransaction, BTreeMapEntryTransactionWrapper, BTreeMapTransaction, - BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, TableFragments, ValTransaction, + BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, ValTransaction, }; use crate::storage::MetaStore; @@ -142,8 +142,12 @@ impl HummockManager { Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec()) } + #[cfg(test)] /// Unregisters `table_fragments` from compaction groups - pub async fn unregister_table_fragments_vec(&self, table_fragments: &[TableFragments]) { + pub async fn unregister_table_fragments_vec( + &self, + table_fragments: &[crate::model::TableFragments], + ) { self.unregister_table_ids_fail_fast( &table_fragments .iter() @@ -169,8 +173,6 @@ impl HummockManager { self.unregister_table_ids_fail_fast(&to_unregister).await; } - /// Prefer using `register_table_fragments`. - /// Use `register_table_ids` only when [`TableFragments`] is unavailable. /// The implementation acquires `versioning` lock. #[named] pub async fn register_table_ids( @@ -392,8 +394,6 @@ impl HummockManager { Ok(()) } - /// Prefer using `unregister_table_fragments_vec`. - /// Only use `unregister_table_ids_fail_fast` when [`TableFragments`] is unavailable. /// The implementation acquires `versioning` lock and `compaction_group_manager` lock. pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) { self.unregister_table_ids(table_ids) diff --git a/src/meta/src/hummock/manager/worker.rs b/src/meta/src/hummock/manager/worker.rs index 549365607b9b..7f29acd23e76 100644 --- a/src/meta/src/hummock/manager/worker.rs +++ b/src/meta/src/hummock/manager/worker.rs @@ -12,16 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::WorkerType; use sync_point::sync_point; use thiserror_ext::AsReport; use tokio::task::JoinHandle; -use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use crate::hummock::utils::RetryableError; use crate::hummock::{HummockManager, HummockManagerRef}; use crate::manager::LocalNotification; @@ -96,32 +92,27 @@ impl HummockManager { } async fn handle_local_notification(&self, notification: LocalNotification) { - let retry_strategy = ExponentialBackoff::from_millis(10) - .max_delay(Duration::from_secs(60)) - .map(jitter); - if let LocalNotification::WorkerNodeDeleted(worker_node) = notification { - if worker_node.get_type().unwrap() == WorkerType::Compactor { - self.compactor_manager.remove_compactor(worker_node.id); - } - tokio_retry::RetryIf::spawn( - retry_strategy.clone(), - || async { - if let Err(err) = self.release_contexts(vec![worker_node.id]).await { - tracing::warn!( - error = %err.as_report(), - "Failed to release hummock context {}, will retry", + match notification { + LocalNotification::WorkerNodeDeleted(worker_node) => { + if worker_node.get_type().unwrap() == WorkerType::Compactor { + self.compactor_manager.remove_compactor(worker_node.id); + } + self.release_contexts(vec![worker_node.id]) + .await + .unwrap_or_else(|err| { + panic!( + "Failed to release hummock context {}, error={}", worker_node.id, - ); - return Err(err); - } - Ok(()) - }, - RetryableError::default(), - ) - .await - .expect("retry until success"); - tracing::info!("Released hummock context {}", worker_node.id); - sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC"); + err.as_report() + ) + }); + tracing::info!("Released hummock context {}", worker_node.id); + sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC"); + } + LocalNotification::UnregisterTablesFromHummock(table_ids) => { + self.unregister_table_ids_fail_fast(&table_ids).await; + } + _ => {} } } } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 4883951c5f99..3e3d06b23be4 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -547,7 +547,10 @@ impl FragmentManager { /// tables. /// If table fragments already deleted, this should just be noop, /// the delete function (`table_fragments.remove`) will not return an error. - pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet) -> MetaResult<()> { + pub async fn drop_table_fragments_vec( + &self, + table_ids: &HashSet, + ) -> MetaResult> { let mut guard = self.core.write().await; let current_revision = guard.table_revision; @@ -559,7 +562,9 @@ impl FragmentManager { let mut dirty_sink_into_table_upstream_fragment_id = HashSet::new(); let mut table_fragments = BTreeMapTransaction::new(map); + let mut table_ids_to_unregister_from_hummock = vec![]; for table_fragment in &to_delete_table_fragments { + table_ids_to_unregister_from_hummock.extend(table_fragment.all_table_ids()); table_fragments.remove(table_fragment.table_id()); let to_remove_actor_ids: HashSet<_> = table_fragment.actor_ids().into_iter().collect(); let dependent_table_ids = table_fragment.dependent_table_ids(); @@ -634,7 +639,7 @@ impl FragmentManager { } } - Ok(()) + Ok(table_ids_to_unregister_from_hummock) } // When dropping sink into a table, there could be an unexpected meta reboot. At this time, the sink’s catalog might have been deleted, diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 166f35655aa3..e3e8ec6b629f 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -48,6 +48,7 @@ pub enum LocalNotification { SystemParamsChange(SystemParamsReader), FragmentMappingsUpsert(Vec), FragmentMappingsDelete(Vec), + UnregisterTablesFromHummock(Vec), } #[derive(Debug)] diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 4a617fde012a..e544170611d9 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -30,7 +30,7 @@ use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; -use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; +use crate::manager::{DdlType, LocalNotification, MetaSrvEnv, MetadataManager, StreamingJob}; use crate::model::{ActorId, TableFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -424,10 +424,13 @@ impl GlobalStreamManager { registered_table_ids.len(), table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1) ); + let notification_manager_ref = self.env.notification_manager_ref(); revert_funcs.push(Box::pin(async move { if create_type == CreateType::Foreground { - hummock_manager_ref - .unregister_table_ids_fail_fast(®istered_table_ids) + notification_manager_ref + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + registered_table_ids, + )) .await; } })); @@ -568,8 +571,11 @@ impl GlobalStreamManager { tracing::error!(error = ?err.as_report(), "failed to run drop command"); }); - self.hummock_manager - .unregister_table_ids_fail_fast(&state_table_ids) + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + state_table_ids, + )) .await; } } @@ -586,7 +592,8 @@ impl GlobalStreamManager { .await; // Drop table fragments directly. - mgr.fragment_manager + let unregister_table_ids = mgr + .fragment_manager .drop_table_fragments_vec(&table_ids.into_iter().collect()) .await?; @@ -604,8 +611,11 @@ impl GlobalStreamManager { }); // Unregister from compaction group afterwards. - self.hummock_manager - .unregister_table_fragments_vec(&table_fragments_vec) + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock( + unregister_table_ids, + )) .await; Ok(())