From 632dfdac6b6d82e2166d3b373367ca09ac674328 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 27 Sep 2023 19:11:14 +0800 Subject: [PATCH] fmt --- ci/scripts/run-backfill-tests.sh | 8 +-- src/meta/src/barrier/command.rs | 2 +- src/meta/src/barrier/mod.rs | 7 ++- src/meta/src/barrier/progress.rs | 28 +++++----- src/meta/src/barrier/recovery.rs | 4 +- src/meta/src/manager/catalog/database.rs | 4 +- src/meta/src/manager/catalog/mod.rs | 52 +++++-------------- src/meta/src/rpc/ddl_controller.rs | 24 --------- .../src/rpc/service/notification_service.rs | 2 +- src/meta/src/stream/stream_manager.rs | 8 ++- 10 files changed, 43 insertions(+), 96 deletions(-) diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index 5f8d9ff84307..0f6a03519d4f 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -185,10 +185,10 @@ test_foreground_ddl_no_recover() { main() { set -euo pipefail - # test_snapshot_and_upstream_read - # test_background_ddl_recovery - # test_background_ddl_cancel - # test_foreground_ddl_no_recover + test_snapshot_and_upstream_read + test_background_ddl_recovery + test_background_ddl_cancel + test_foreground_ddl_no_recover test_foreground_ddl_cancel } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index fb7307304017..4c8235cbab2d 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -667,7 +667,7 @@ impl CommandContext { let node_actors = table_fragments.worker_actor_ids(); self.clean_up(node_actors).await?; self.catalog_manager - .cancel_create_table_procedure_with_id( + .cancel_create_table_procedure( table_fragments.table_id().table_id, self.fragment_manager.clone(), ) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 046a5fd66601..1445b266da9e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -56,12 +56,11 @@ use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::HummockManagerRef; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ - CatalogManager, CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, - MetaSrvEnv, WorkerId, + CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv, + WorkerId, }; use crate::model::{ActorId, BarrierManagerState}; use crate::rpc::metrics::MetaMetrics; -use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -979,7 +978,7 @@ impl GlobalBarrierManager { drop(tracker); for (table, internal_tables, finished) in receivers { let catalog_manager = self.catalog_manager.clone(); - let fragment_manager = self.fragment_manager.clone(); + let _fragment_manager = self.fragment_manager.clone(); tokio::spawn(async move { let res: MetaResult<()> = try { finished diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 6d88001da785..e94f1eb59f85 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -14,13 +14,10 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::hash::Hash; use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::hash::marker::Actor; -use risingwave_common::row::Chain; use risingwave_common::util::epoch::Epoch; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; @@ -31,7 +28,6 @@ use super::notifier::Notifier; use crate::barrier::Command; use crate::model::ActorId; -type CreateMviewEpoch = Epoch; type ConsumedRows = u64; #[derive(Clone, Copy, Debug)] @@ -161,19 +157,19 @@ pub(super) struct TrackingCommand { /// called on registered notifiers. /// /// Tracking is done as follows: -/// Several ActorIds constitute a StreamJob. -/// A StreamJob is IDead by the Epoch of its initial barrier, -/// i.e. CreateMviewEpoch. +/// Several `ActorIds` constitute a `StreamJob`. +/// A `StreamJob` is `IDead` by the Epoch of its initial barrier, +/// i.e. `CreateMviewEpoch`. /// We can ID it that way because the initial barrier should ONLY -/// be used for exactly one StreamJob. +/// be used for exactly one `StreamJob`. /// We don't allow multiple stream jobs scheduled on the same barrier. /// -/// With `actor_map` we can use any `ActorId` to find the ID of the StreamJob, -/// and with `progress_map` we can use the ID of the StreamJob +/// With `actor_map` we can use any `ActorId` to find the ID of the `StreamJob`, +/// and with `progress_map` we can use the ID of the `StreamJob` /// to view its progress. /// -/// We track the progress of each ActorId in a StreamJob, -/// because ALL of their progress constitutes the progress of the StreamJob. +/// We track the progress of each `ActorId` in a `StreamJob`, +/// because ALL of their progress constitutes the progress of the `StreamJob`. pub(super) struct CreateMviewProgressTracker { // TODO(kwannoel): The real purpose of `CreateMviewEpoch` // Is to serve as a unique identifier for a stream job. @@ -189,8 +185,8 @@ pub(super) struct CreateMviewProgressTracker { } impl CreateMviewProgressTracker { - /// Backfill progress and tracking_commands are the only dynamic parts of the state. - /// For BackfillProgress, it can also be derived from state_table. + /// Backfill progress and `tracking_commands` are the only dynamic parts of the state. + /// For `BackfillProgress`, it can also be derived from `state_table`. /// However, this requires the stream graph to init BEFORE meta /// recovers fully. /// To support that meta needs to recover in 2 parts: @@ -206,7 +202,7 @@ impl CreateMviewProgressTracker { /// The `actor_map` contains the mapping from actor to its stream job identifier /// (the epoch where it was created). /// - /// Just use TableId to stream id. + /// Just use `TableId` to stream id. /// Report the status to local barrier manager. /// /// Need to add some extra fields in initialize barrier to include @@ -239,7 +235,7 @@ impl CreateMviewProgressTracker { ) -> Self { let mut actor_map = HashMap::new(); let mut progress_map = HashMap::new(); - for (creating_table_id, actors) in table_map.into_iter() { + for (creating_table_id, actors) in table_map { let mut states = HashMap::new(); for actor in actors { actor_map.insert(actor, creating_table_id); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 085dabb06671..0384255c1872 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -18,8 +18,6 @@ use std::time::{Duration, Instant}; use futures::future::try_join_all; use itertools::Itertools; -use risingwave_common::catalog::TableId; -use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; @@ -220,7 +218,7 @@ impl GlobalBarrierManager { tokio::sync::mpsc::unbounded_channel(); self.inject_barrier(command_ctx.clone(), &barrier_complete_tx) .await; - let res = match barrier_complete_rx.recv().await.unwrap().result { + let _res = match barrier_complete_rx.recv().await.unwrap().result { Ok(response) => { if let Err(err) = command_ctx.post_collect().await { warn!(err = ?err, "post_collect failed"); diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 0cf610978722..7ceeb5c37a80 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -261,11 +261,11 @@ impl DatabaseManager { pub fn list_creating_tables(&self) -> Vec { self.tables .values() - .cloned() - .filter(|t| { + .filter(|&t| { t.stream_job_status == PbStreamJobStatus::Creating as i32 && t.table_type == TableType::MaterializedView as i32 }) + .cloned() .collect_vec() // self.in_progress_creating_tables // .values() diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 88872d7b1885..27d5d59c3a3b 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -700,7 +700,7 @@ impl CatalogManager { /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. /// TODO(kwannoel): Should both `CREATE TABLE` and `CREATE MATERIALIZED VIEW` - /// commit_meta? Or it doesn't matter? + /// `commit_meta`? Or it doesn't matter? pub async fn start_create_table_procedure( &self, table: &Table, @@ -742,14 +742,6 @@ impl CatalogManager { } } - // FIXME - pub async fn check_key(&self, table: &Table) -> bool { - let core = &mut *self.core.lock().await; - let database_core = &mut core.database; - let key = (table.database_id, table.schema_id, table.name.clone()); - database_core.in_progress_creation_tracker.contains(&key) - } - fn check_table_creating(tables: &BTreeMap, table: &Table) { if let Some(t) = tables.get(&table.id) && let Ok(StreamJobStatus::Creating) = t.get_stream_job_status() @@ -842,48 +834,30 @@ impl CatalogManager { Ok(version) } - pub async fn cancel_create_table_procedure_with_id( + pub async fn cancel_create_table_procedure( &self, table_id: TableId, fragment_manager: FragmentManagerRef, ) -> MetaResult<()> { - { - let core = &mut *self.core.lock().await; + let core = &mut self.core.lock().await; + let table = { let database_core = &mut core.database; let tables = &mut database_core.tables; - if let Some(table) = tables.get(&table_id).cloned() { - self.cancel_create_table_procedure_inner(core, &table, fragment_manager) - .await?; - return Ok(()); - } - } - bail!("Table ID: {table_id} missing when attempting to cancel job") - } + let Some(table) = tables.get(&table_id).cloned() else { + bail!("Table ID: {table_id} missing when attempting to cancel job") + }; + table + }; - pub async fn cancel_create_table_procedure( - &self, - table: &Table, - fragment_manager: FragmentManagerRef, - ) -> MetaResult<()> { - let core = &mut *self.core.lock().await; - self.cancel_create_table_procedure_inner(core, table, fragment_manager) - .await - } + { + let user_core = &mut core.user; + user_core.decrease_ref(table.owner); + } - pub async fn cancel_create_table_procedure_inner( - &self, - core: &mut CatalogManagerCore, - table: &Table, - fragment_manager: FragmentManagerRef, - ) -> MetaResult<()> { let database_core = &mut core.database; - let user_core = &mut core.user; - Self::check_table_creating(&database_core.tables, &table); - for &dependent_relation_id in &table.dependent_relations { database_core.decrease_ref_count(dependent_relation_id); } - user_core.decrease_ref(table.owner); let mut table_ids = vec![table.id]; let fragment = fragment_manager diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 805d606a8b6f..511dcdd5f6de 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -402,11 +402,6 @@ impl DdlController { Ok(()) } - // FIXME - async fn check_key(&self, table: &Table) -> bool { - self.catalog_manager.check_key(table).await - } - async fn create_streaming_job( &self, mut stream_job: StreamingJob, @@ -460,16 +455,6 @@ impl DdlController { } }; - // At this point the key should be present in the creating tables. - match stream_job { - StreamingJob::Table(_, ref t) => { - if !self.check_key(t).await { - println!("Key not present after building stream job for {t:#?}") - } - } - _ => {} - } - match create_type { CreateType::Foreground | CreateType::Unspecified => { self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables) @@ -711,15 +696,6 @@ impl DdlController { .mark_creating_tables(&creating_tables) .await; - match stream_job { - StreamingJob::Table(_, ref t) => { - if !self.check_key(t).await { - println!("Key not present within building stream job for {t:#?}") - } - } - _ => {} - } - Ok((ctx, table_fragments)) } diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 0f6f857c7d3d..a8a32e3f06d6 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_pb::backup_service::MetaBackupManifestId; -use risingwave_pb::catalog::{PbStreamJobStatus, Table}; +use risingwave_pb::catalog::Table; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::hummock::WriteLimits; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 83661d2635eb..73cce030c596 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -112,7 +112,10 @@ impl CreatingStreamingJobInfo { jobs.remove(&job_id); } - async fn cancel_jobs(&self, job_ids: Vec) -> (HashMap>, Vec) { + async fn cancel_jobs( + &self, + job_ids: Vec, + ) -> (HashMap>, Vec) { let mut jobs = self.streaming_jobs.lock().await; let mut receivers = HashMap::new(); let mut recovered_job_ids = vec![]; @@ -582,7 +585,8 @@ impl GlobalStreamManager { for fragment in fragments { self.barrier_scheduler .run_command(Command::CancelStreamingJob(fragment)) - .await.expect("should be able to cancel recovered stream job"); + .await + .expect("should be able to cancel recovered stream job"); } cancelled_ids.extend(recovered_job_ids); cancelled_ids