Skip to content

Commit

Permalink
feat: read snapshot instead for migration from etcd to sql backend
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Aug 27, 2024
1 parent e3a9d37 commit 60b1541
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ use risingwave_meta::hummock::compaction::CompactStatus;
use risingwave_meta::hummock::model::CompactionGroup;
use risingwave_meta::manager::model::SystemParamsModel;
use risingwave_meta::model;
use risingwave_meta::model::{ClusterId, MetadataModel, NotificationVersion, TableParallelism};
use risingwave_meta::model::{ClusterId, MetadataModel, TableParallelism};
use risingwave_meta::storage::{
EtcdMetaStore, MetaStore, MetaStoreBoxExt, MetaStoreError, MetaStoreRef,
EtcdMetaStore, EtcdSnapshot, MetaStore, MetaStoreError, Snapshot,
WrappedEtcdClient as EtcdClient, DEFAULT_COLUMN_FAMILY,
};
use risingwave_meta::stream::TableRevision;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_model_v2::catalog_version::VersionCategory;
use risingwave_meta_model_v2::compaction_status::LevelHandlers;
Expand Down Expand Up @@ -93,7 +92,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
let client = EtcdClient::connect(from.endpoints.clone(), Some(options.clone()), auth_enabled)
.await
.context("failed to connect etcd")?;
let meta_store = EtcdMetaStore::new(client).into_ref();
let meta_store = EtcdMetaStore::new(client);
let snapshot = meta_store.snapshot().await;

// 2. init sql meta store.
let mut options = sea_orm::ConnectOptions::new(target);
Expand All @@ -114,7 +114,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.expect("failed to init sql backend");

// cluster Id.
let cluster_id: Uuid = ClusterId::from_meta_store(&meta_store)
let cluster_id: Uuid = ClusterId::from_snapshot::<EtcdMetaStore>(&snapshot)
.await?
.expect("cluster id not found")
.parse()?;
Expand All @@ -135,7 +135,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("cluster id updated to {}", cluster_id);

// system parameters.
let system_parameters = PbSystemParams::get(&meta_store)
let system_parameters = PbSystemParams::get_at_snapshot::<EtcdMetaStore>(&snapshot)
.await?
.expect("system parameters not found");
SystemParameter::insert_many(system_params_to_model(&system_parameters)?)
Expand All @@ -144,7 +144,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("system parameters migrated");

// workers.
let workers = model::Worker::list(&meta_store).await?;
let workers = model::Worker::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let next_worker_id = workers
.iter()
.map(|w| w.worker_node.id + 1)
Expand Down Expand Up @@ -174,18 +174,18 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("worker nodes migrated");

// catalogs.
let databases = PbDatabase::list(&meta_store).await?;
let schemas = PbSchema::list(&meta_store).await?;
let users = PbUserInfo::list(&meta_store).await?;
let tables = PbTable::list(&meta_store).await?;
let sources = PbSource::list(&meta_store).await?;
let sinks = PbSink::list(&meta_store).await?;
let indexes = PbIndex::list(&meta_store).await?;
let views = PbView::list(&meta_store).await?;
let functions = PbFunction::list(&meta_store).await?;
let connections = PbConnection::list(&meta_store).await?;
let subscriptions = PbSubscription::list(&meta_store).await?;
let secrets = PbSecret::list(&meta_store).await?;
let databases = PbDatabase::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let schemas = PbSchema::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let users = PbUserInfo::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let tables = PbTable::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let sources = PbSource::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let sinks = PbSink::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let indexes = PbIndex::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let views = PbView::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let functions = PbFunction::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let connections = PbConnection::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let subscriptions = PbSubscription::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let secrets = PbSecret::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;

// inuse object ids.
let mut inuse_obj_ids = tables
Expand Down Expand Up @@ -415,7 +415,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
}

// table fragments.
let table_fragments = model::TableFragments::list(&meta_store).await?;
let table_fragments =
model::TableFragments::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
let mut fragment_job_map = HashMap::new();
let mut fragments = vec![];
let mut actors = vec![];
Expand Down Expand Up @@ -681,28 +682,29 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("user privileges migrated");

// notification.
let notification_version = NotificationVersion::new(&meta_store).await;
let notification_version = load_current_id(&snapshot, "notification_version", None).await;
CatalogVersion::insert(catalog_version::ActiveModel {
name: Set(VersionCategory::Notification),
version: Set(notification_version.version() as _),
version: Set(notification_version as _),
})
.exec(&meta_store_sql.conn)
.await?;
println!("notification version migrated");

// table revision.
let table_revision = TableRevision::get(&meta_store).await?;
let table_revision = load_current_id(&snapshot, "table_revision", None).await;
CatalogVersion::insert(catalog_version::ActiveModel {
name: Set(VersionCategory::TableRevision),
version: Set(table_revision.inner() as _),
version: Set(table_revision as _),
})
.exec(&meta_store_sql.conn)
.await?;
println!("table revision migrated");

// hummock.
// hummock pinned snapshots
let pinned_snapshots = HummockPinnedSnapshot::list(&meta_store).await?;
let pinned_snapshots =
HummockPinnedSnapshot::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
if !pinned_snapshots.is_empty() {
hummock_pinned_snapshot::Entity::insert_many(
pinned_snapshots
Expand All @@ -719,7 +721,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("hummock pinned snapshots migrated");

// hummock pinned version
let pinned_version = HummockPinnedVersion::list(&meta_store).await?;
let pinned_version = HummockPinnedVersion::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
if !pinned_version.is_empty() {
hummock_pinned_version::Entity::insert_many(
pinned_version
Expand All @@ -736,7 +738,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("hummock pinned version migrated");

// hummock version delta
let version_delta = HummockVersionDelta::list(&meta_store).await?;
let version_delta = HummockVersionDelta::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
if !version_delta.is_empty() {
hummock_version_delta::Entity::insert_many(
version_delta
Expand All @@ -757,7 +759,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("hummock version delta migrated");

// hummock version stat
let version_stats = HummockVersionStats::list(&meta_store)
let version_stats = HummockVersionStats::list_at_snapshot::<EtcdMetaStore>(&snapshot)
.await?
.into_iter()
.next();
Expand All @@ -773,7 +775,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an

// compaction
// compaction config
let compaction_groups = CompactionGroup::list(&meta_store).await?;
let compaction_groups = CompactionGroup::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
if !compaction_groups.is_empty() {
compaction_config::Entity::insert_many(
compaction_groups
Expand All @@ -790,7 +792,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("compaction config migrated");

// compaction status
let compaction_statuses = CompactStatus::list(&meta_store).await?;
let compaction_statuses = CompactStatus::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
if !compaction_statuses.is_empty() {
compaction_status::Entity::insert_many(
compaction_statuses
Expand All @@ -809,7 +811,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("compaction status migrated");

// compaction task
let compaction_tasks = CompactTaskAssignment::list(&meta_store).await?;
let compaction_tasks =
CompactTaskAssignment::list_at_snapshot::<EtcdMetaStore>(&snapshot).await?;
if !compaction_tasks.is_empty() {
compaction_task::Entity::insert_many(compaction_tasks.into_iter().map(|ct| {
let context_id = ct.context_id;
Expand All @@ -826,15 +829,15 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
println!("compaction task migrated");

// hummock sequence
let sst_obj_id = load_current_id(&meta_store, "hummock_ss_table_id", Some(1)).await;
let compaction_task_id = load_current_id(&meta_store, "hummock_compaction_task", Some(1)).await;
let sst_obj_id = load_current_id(&snapshot, "hummock_ss_table_id", Some(1)).await;
let compaction_task_id = load_current_id(&snapshot, "hummock_compaction_task", Some(1)).await;
let compaction_group_id = load_current_id(
&meta_store,
&snapshot,
"compaction_group",
Some(StaticCompactionGroupId::End as u64 + 1),
)
.await;
let backup_id = load_current_id(&meta_store, "backup", Some(1)).await;
let backup_id = load_current_id(&snapshot, "backup", Some(1)).await;
hummock_sequence::Entity::insert_many(vec![
hummock_sequence::ActiveModel {
name: Set(SSTABLE_OBJECT_ID.into()),
Expand Down Expand Up @@ -912,9 +915,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
Ok(())
}

async fn load_current_id(meta_store: &MetaStoreRef, category: &str, start: Option<u64>) -> u64 {
async fn load_current_id(snapshot: &EtcdSnapshot, category: &str, start: Option<u64>) -> u64 {
let category_gen_key = format!("{}_id_next_generator", category);
let res = meta_store
let res = snapshot
.get_cf(DEFAULT_COLUMN_FAMILY, category_gen_key.as_bytes())
.await;
match res {
Expand Down

0 comments on commit 60b1541

Please sign in to comment.