diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 3277f255ebe8..3c9dacd790b6 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -30,12 +30,11 @@ use risingwave_meta::hummock::compaction::CompactStatus; use risingwave_meta::hummock::model::CompactionGroup; use risingwave_meta::manager::model::SystemParamsModel; use risingwave_meta::model; -use risingwave_meta::model::{ClusterId, MetadataModel, NotificationVersion, TableParallelism}; +use risingwave_meta::model::{ClusterId, MetadataModel, TableParallelism}; use risingwave_meta::storage::{ - EtcdMetaStore, MetaStore, MetaStoreBoxExt, MetaStoreError, MetaStoreRef, + EtcdMetaStore, EtcdSnapshot, MetaStore, MetaStoreError, Snapshot, WrappedEtcdClient as EtcdClient, DEFAULT_COLUMN_FAMILY, }; -use risingwave_meta::stream::TableRevision; use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_model_v2::catalog_version::VersionCategory; use risingwave_meta_model_v2::compaction_status::LevelHandlers; @@ -93,7 +92,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an let client = EtcdClient::connect(from.endpoints.clone(), Some(options.clone()), auth_enabled) .await .context("failed to connect etcd")?; - let meta_store = EtcdMetaStore::new(client).into_ref(); + let meta_store = EtcdMetaStore::new(client); + let snapshot = meta_store.snapshot().await; // 2. init sql meta store. let mut options = sea_orm::ConnectOptions::new(target); @@ -114,7 +114,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an .expect("failed to init sql backend"); // cluster Id. - let cluster_id: Uuid = ClusterId::from_meta_store(&meta_store) + let cluster_id: Uuid = ClusterId::from_snapshot::(&snapshot) .await? .expect("cluster id not found") .parse()?; @@ -135,7 +135,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("cluster id updated to {}", cluster_id); // system parameters. - let system_parameters = PbSystemParams::get(&meta_store) + let system_parameters = PbSystemParams::get_at_snapshot::(&snapshot) .await? .expect("system parameters not found"); SystemParameter::insert_many(system_params_to_model(&system_parameters)?) @@ -144,7 +144,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("system parameters migrated"); // workers. - let workers = model::Worker::list(&meta_store).await?; + let workers = model::Worker::list_at_snapshot::(&snapshot).await?; let next_worker_id = workers .iter() .map(|w| w.worker_node.id + 1) @@ -174,18 +174,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("worker nodes migrated"); // catalogs. - let databases = PbDatabase::list(&meta_store).await?; - let schemas = PbSchema::list(&meta_store).await?; - let users = PbUserInfo::list(&meta_store).await?; - let tables = PbTable::list(&meta_store).await?; - let sources = PbSource::list(&meta_store).await?; - let sinks = PbSink::list(&meta_store).await?; - let indexes = PbIndex::list(&meta_store).await?; - let views = PbView::list(&meta_store).await?; - let functions = PbFunction::list(&meta_store).await?; - let connections = PbConnection::list(&meta_store).await?; - let subscriptions = PbSubscription::list(&meta_store).await?; - let secrets = PbSecret::list(&meta_store).await?; + let databases = PbDatabase::list_at_snapshot::(&snapshot).await?; + let schemas = PbSchema::list_at_snapshot::(&snapshot).await?; + let users = PbUserInfo::list_at_snapshot::(&snapshot).await?; + let tables = PbTable::list_at_snapshot::(&snapshot).await?; + let sources = PbSource::list_at_snapshot::(&snapshot).await?; + let sinks = PbSink::list_at_snapshot::(&snapshot).await?; + let indexes = PbIndex::list_at_snapshot::(&snapshot).await?; + let views = PbView::list_at_snapshot::(&snapshot).await?; + let functions = PbFunction::list_at_snapshot::(&snapshot).await?; + let connections = PbConnection::list_at_snapshot::(&snapshot).await?; + let subscriptions = PbSubscription::list_at_snapshot::(&snapshot).await?; + let secrets = PbSecret::list_at_snapshot::(&snapshot).await?; // inuse object ids. let mut inuse_obj_ids = tables @@ -415,7 +415,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an } // table fragments. - let table_fragments = model::TableFragments::list(&meta_store).await?; + let table_fragments = + model::TableFragments::list_at_snapshot::(&snapshot).await?; let mut fragment_job_map = HashMap::new(); let mut fragments = vec![]; let mut actors = vec![]; @@ -681,20 +682,20 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("user privileges migrated"); // notification. - let notification_version = NotificationVersion::new(&meta_store).await; + let notification_version = load_current_id(&snapshot, "notification_version", None).await; CatalogVersion::insert(catalog_version::ActiveModel { name: Set(VersionCategory::Notification), - version: Set(notification_version.version() as _), + version: Set(notification_version as _), }) .exec(&meta_store_sql.conn) .await?; println!("notification version migrated"); // table revision. - let table_revision = TableRevision::get(&meta_store).await?; + let table_revision = load_current_id(&snapshot, "table_revision", None).await; CatalogVersion::insert(catalog_version::ActiveModel { name: Set(VersionCategory::TableRevision), - version: Set(table_revision.inner() as _), + version: Set(table_revision as _), }) .exec(&meta_store_sql.conn) .await?; @@ -702,7 +703,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // hummock. // hummock pinned snapshots - let pinned_snapshots = HummockPinnedSnapshot::list(&meta_store).await?; + let pinned_snapshots = + HummockPinnedSnapshot::list_at_snapshot::(&snapshot).await?; if !pinned_snapshots.is_empty() { hummock_pinned_snapshot::Entity::insert_many( pinned_snapshots @@ -719,7 +721,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("hummock pinned snapshots migrated"); // hummock pinned version - let pinned_version = HummockPinnedVersion::list(&meta_store).await?; + let pinned_version = HummockPinnedVersion::list_at_snapshot::(&snapshot).await?; if !pinned_version.is_empty() { hummock_pinned_version::Entity::insert_many( pinned_version @@ -736,7 +738,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("hummock pinned version migrated"); // hummock version delta - let version_delta = HummockVersionDelta::list(&meta_store).await?; + let version_delta = HummockVersionDelta::list_at_snapshot::(&snapshot).await?; if !version_delta.is_empty() { hummock_version_delta::Entity::insert_many( version_delta @@ -757,7 +759,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("hummock version delta migrated"); // hummock version stat - let version_stats = HummockVersionStats::list(&meta_store) + let version_stats = HummockVersionStats::list_at_snapshot::(&snapshot) .await? .into_iter() .next(); @@ -773,7 +775,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an // compaction // compaction config - let compaction_groups = CompactionGroup::list(&meta_store).await?; + let compaction_groups = CompactionGroup::list_at_snapshot::(&snapshot).await?; if !compaction_groups.is_empty() { compaction_config::Entity::insert_many( compaction_groups @@ -790,7 +792,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("compaction config migrated"); // compaction status - let compaction_statuses = CompactStatus::list(&meta_store).await?; + let compaction_statuses = CompactStatus::list_at_snapshot::(&snapshot).await?; if !compaction_statuses.is_empty() { compaction_status::Entity::insert_many( compaction_statuses @@ -809,7 +811,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("compaction status migrated"); // compaction task - let compaction_tasks = CompactTaskAssignment::list(&meta_store).await?; + let compaction_tasks = + CompactTaskAssignment::list_at_snapshot::(&snapshot).await?; if !compaction_tasks.is_empty() { compaction_task::Entity::insert_many(compaction_tasks.into_iter().map(|ct| { let context_id = ct.context_id; @@ -826,15 +829,15 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an println!("compaction task migrated"); // hummock sequence - let sst_obj_id = load_current_id(&meta_store, "hummock_ss_table_id", Some(1)).await; - let compaction_task_id = load_current_id(&meta_store, "hummock_compaction_task", Some(1)).await; + let sst_obj_id = load_current_id(&snapshot, "hummock_ss_table_id", Some(1)).await; + let compaction_task_id = load_current_id(&snapshot, "hummock_compaction_task", Some(1)).await; let compaction_group_id = load_current_id( - &meta_store, + &snapshot, "compaction_group", Some(StaticCompactionGroupId::End as u64 + 1), ) .await; - let backup_id = load_current_id(&meta_store, "backup", Some(1)).await; + let backup_id = load_current_id(&snapshot, "backup", Some(1)).await; hummock_sequence::Entity::insert_many(vec![ hummock_sequence::ActiveModel { name: Set(SSTABLE_OBJECT_ID.into()), @@ -912,9 +915,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an Ok(()) } -async fn load_current_id(meta_store: &MetaStoreRef, category: &str, start: Option) -> u64 { +async fn load_current_id(snapshot: &EtcdSnapshot, category: &str, start: Option) -> u64 { let category_gen_key = format!("{}_id_next_generator", category); - let res = meta_store + let res = snapshot .get_cf(DEFAULT_COLUMN_FAMILY, category_gen_key.as_bytes()) .await; match res {