Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): cherry pick 15355 #15503

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use thiserror_ext::AsReport;
use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, WorkerId};
use crate::manager::{DdlType, LocalNotification, MetadataManager, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;
Expand Down Expand Up @@ -820,8 +820,11 @@ impl CommandContext {
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
self.barrier_manager_context
.hummock_manager
.unregister_table_ids_fail_fast(&table_ids)
.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
table_ids,
))
.await;

match &self.barrier_manager_context.metadata_manager {
Expand Down
30 changes: 21 additions & 9 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{MetadataManager, WorkerId};
use crate::manager::{LocalNotification, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;
Expand Down Expand Up @@ -83,13 +83,17 @@ impl GlobalBarrierManagerContext {
.collect();
debug!("clean dirty table fragments: {:?}", to_drop_streaming_ids);

mgr.fragment_manager
let unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&to_drop_streaming_ids)
.await?;

// unregister compaction group for dirty table fragments.
self.hummock_manager
.unregister_table_fragments_vec(&to_drop_table_fragments)
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;

// clean up source connector dirty changes.
Expand All @@ -105,13 +109,14 @@ impl GlobalBarrierManagerContext {
} = mgr.catalog_controller.clean_dirty_creating_jobs().await?;

// unregister compaction group for cleaned state tables.
self.hummock_manager
.unregister_table_ids_fail_fast(
&state_table_ids
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
state_table_ids
.into_iter()
.map(|id| id as StateTableId)
.collect_vec(),
)
))
.await;

// unregister cleaned sources.
Expand Down Expand Up @@ -307,9 +312,16 @@ impl GlobalBarrierManagerContext {
if !cancelled.is_empty() {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
let unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&cancelled)
.await?;
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;
}
MetadataManager::V2(mgr) => {
for job_id in cancelled {
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}
use crate::manager::MetaSrvEnv;
use crate::model::{
BTreeMapEntryTransaction, BTreeMapEntryTransactionWrapper, BTreeMapTransaction,
BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, TableFragments, ValTransaction,
BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, ValTransaction,
};
use crate::storage::MetaStore;

Expand Down Expand Up @@ -142,8 +142,12 @@ impl HummockManager {
Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec())
}

#[cfg(test)]
/// Unregisters `table_fragments` from compaction groups
pub async fn unregister_table_fragments_vec(&self, table_fragments: &[TableFragments]) {
pub async fn unregister_table_fragments_vec(
&self,
table_fragments: &[crate::model::TableFragments],
) {
self.unregister_table_ids_fail_fast(
&table_fragments
.iter()
Expand All @@ -169,8 +173,6 @@ impl HummockManager {
self.unregister_table_ids_fail_fast(&to_unregister).await;
}

/// Prefer using `register_table_fragments`.
/// Use `register_table_ids` only when [`TableFragments`] is unavailable.
/// The implementation acquires `versioning` lock.
#[named]
pub async fn register_table_ids(
Expand Down Expand Up @@ -392,8 +394,6 @@ impl HummockManager {
Ok(())
}

/// Prefer using `unregister_table_fragments_vec`.
/// Only use `unregister_table_ids_fail_fast` when [`TableFragments`] is unavailable.
/// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) {
self.unregister_table_ids(table_ids)
Expand Down
49 changes: 20 additions & 29 deletions src/meta/src/hummock/manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::common::WorkerType;
use sync_point::sync_point;
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tokio_retry::strategy::{jitter, ExponentialBackoff};

use crate::hummock::utils::RetryableError;
use crate::hummock::{HummockManager, HummockManagerRef};
use crate::manager::LocalNotification;

Expand Down Expand Up @@ -96,32 +92,27 @@ impl HummockManager {
}

async fn handle_local_notification(&self, notification: LocalNotification) {
let retry_strategy = ExponentialBackoff::from_millis(10)
.max_delay(Duration::from_secs(60))
.map(jitter);
if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
if worker_node.get_type().unwrap() == WorkerType::Compactor {
self.compactor_manager.remove_compactor(worker_node.id);
}
tokio_retry::RetryIf::spawn(
retry_strategy.clone(),
|| async {
if let Err(err) = self.release_contexts(vec![worker_node.id]).await {
tracing::warn!(
error = %err.as_report(),
"Failed to release hummock context {}, will retry",
match notification {
LocalNotification::WorkerNodeDeleted(worker_node) => {
if worker_node.get_type().unwrap() == WorkerType::Compactor {
self.compactor_manager.remove_compactor(worker_node.id);
}
self.release_contexts(vec![worker_node.id])
.await
.unwrap_or_else(|err| {
panic!(
"Failed to release hummock context {}, error={}",
worker_node.id,
);
return Err(err);
}
Ok(())
},
RetryableError::default(),
)
.await
.expect("retry until success");
tracing::info!("Released hummock context {}", worker_node.id);
sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
err.as_report()
)
});
tracing::info!("Released hummock context {}", worker_node.id);
sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
}
LocalNotification::UnregisterTablesFromHummock(table_ids) => {
self.unregister_table_ids_fail_fast(&table_ids).await;
}
_ => {}
}
}
}
9 changes: 7 additions & 2 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,10 @@ impl FragmentManager {
/// tables.
/// If table fragments already deleted, this should just be noop,
/// the delete function (`table_fragments.remove`) will not return an error.
pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet<TableId>) -> MetaResult<()> {
pub async fn drop_table_fragments_vec(
&self,
table_ids: &HashSet<TableId>,
) -> MetaResult<Vec<u32>> {
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;

Expand All @@ -559,7 +562,9 @@ impl FragmentManager {

let mut dirty_sink_into_table_upstream_fragment_id = HashSet::new();
let mut table_fragments = BTreeMapTransaction::new(map);
let mut table_ids_to_unregister_from_hummock = vec![];
for table_fragment in &to_delete_table_fragments {
table_ids_to_unregister_from_hummock.extend(table_fragment.all_table_ids());
table_fragments.remove(table_fragment.table_id());
let to_remove_actor_ids: HashSet<_> = table_fragment.actor_ids().into_iter().collect();
let dependent_table_ids = table_fragment.dependent_table_ids();
Expand Down Expand Up @@ -634,7 +639,7 @@ impl FragmentManager {
}
}

Ok(())
Ok(table_ids_to_unregister_from_hummock)
}

// When dropping sink into a table, there could be an unexpected meta reboot. At this time, the sink’s catalog might have been deleted,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum LocalNotification {
SystemParamsChange(SystemParamsReader),
FragmentMappingsUpsert(Vec<FragmentId>),
FragmentMappingsDelete(Vec<FragmentId>),
UnregisterTablesFromHummock(Vec<u32>),
}

#[derive(Debug)]
Expand Down
26 changes: 18 additions & 8 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tracing::Instrument;
use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy};
use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager};
use crate::hummock::HummockManagerRef;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob};
use crate::manager::{DdlType, LocalNotification, MetaSrvEnv, MetadataManager, StreamingJob};
use crate::model::{ActorId, TableFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -424,10 +424,13 @@ impl GlobalStreamManager {
registered_table_ids.len(),
table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1)
);
let notification_manager_ref = self.env.notification_manager_ref();
revert_funcs.push(Box::pin(async move {
if create_type == CreateType::Foreground {
hummock_manager_ref
.unregister_table_ids_fail_fast(&registered_table_ids)
notification_manager_ref
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
registered_table_ids,
))
.await;
}
}));
Expand Down Expand Up @@ -568,8 +571,11 @@ impl GlobalStreamManager {
tracing::error!(error = ?err.as_report(), "failed to run drop command");
});

self.hummock_manager
.unregister_table_ids_fail_fast(&state_table_ids)
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
state_table_ids,
))
.await;
}
}
Expand All @@ -586,7 +592,8 @@ impl GlobalStreamManager {
.await;

// Drop table fragments directly.
mgr.fragment_manager
let unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&table_ids.into_iter().collect())
.await?;

Expand All @@ -604,8 +611,11 @@ impl GlobalStreamManager {
});

// Unregister from compaction group afterwards.
self.hummock_manager
.unregister_table_fragments_vec(&table_fragments_vec)
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;

Ok(())
Expand Down
Loading