From 664d56a6daa39e5da55e774821b7b019385aca68 Mon Sep 17 00:00:00 2001
From: zwang28 <84491488@qq.com>
Date: Thu, 28 Dec 2023 17:07:47 +0800
Subject: [PATCH] refactor(meta): simplify Hummock metadata initialization

---
 src/meta/src/hummock/manager/checkpoint.rs | 40 +++-------------------
 src/meta/src/hummock/manager/mod.rs        | 13 +++----
 2 files changed, 11 insertions(+), 42 deletions(-)

diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs
index 523c0d35f9cd5..1134b0e1246d6 100644
--- a/src/meta/src/hummock/manager/checkpoint.rs
+++ b/src/meta/src/hummock/manager/checkpoint.rs
@@ -30,9 +30,6 @@ use crate::hummock::error::Result;
 use crate::hummock::manager::{read_lock, write_lock};
 use crate::hummock::metrics_utils::trigger_gc_stat;
 use crate::hummock::HummockManager;
-use crate::storage::{MetaStore, MetaStoreError, DEFAULT_COLUMN_FAMILY};
-
-const HUMMOCK_INIT_FLAG_KEY: &[u8] = b"hummock_init_flag";
 
 #[derive(Default)]
 pub struct HummockVersionCheckpoint {
@@ -63,9 +60,8 @@ impl HummockVersionCheckpoint {
 /// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale
 /// objects from those delta logs.
 impl HummockManager {
-    /// # Panics
-    /// if checkpoint is not found.
-    pub async fn read_checkpoint(&self) -> Result<HummockVersionCheckpoint> {
+    /// Returns Ok(None) if not found.
+    pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
         use prost::Message;
         let data = match self
             .object_store
@@ -75,16 +71,13 @@ impl HummockManager {
             Ok(data) => data,
             Err(e) => {
                 if e.is_object_not_found_error() {
-                    panic!(
-                        "Hummock version checkpoints do not exist in object store, path: {}",
-                        self.version_checkpoint_path
-                    );
+                    return Ok(None);
                 }
                 return Err(e.into());
             }
         };
         let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
-        Ok(HummockVersionCheckpoint::from_protobuf(&ckpt))
+        Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
     }
 
     pub(super) async fn write_checkpoint(
@@ -173,31 +166,6 @@ impl HummockManager {
         Ok(new_checkpoint_id - old_checkpoint_id)
     }
 
-    pub(super) async fn need_init(&self) -> Result<bool> {
-        match self
-            .env
-            .meta_store()
-            .get_cf(DEFAULT_COLUMN_FAMILY, HUMMOCK_INIT_FLAG_KEY)
-            .await
-        {
-            Ok(_) => Ok(false),
-            Err(MetaStoreError::ItemNotFound(_)) => Ok(true),
-            Err(e) => Err(e.into()),
-        }
-    }
-
-    pub(super) async fn mark_init(&self) -> Result<()> {
-        self.env
-            .meta_store()
-            .put_cf(
-                DEFAULT_COLUMN_FAMILY,
-                HUMMOCK_INIT_FLAG_KEY.to_vec(),
-                memcomparable::to_vec(&0).unwrap(),
-            )
-            .await
-            .map_err(Into::into)
-    }
-
     pub fn pause_version_checkpoint(&self) {
         self.pause_version_checkpoint.store(true, Ordering::Relaxed);
         tracing::info!("hummock version checkpoint is paused.");
diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs
index db4c1c5d50b87..eabd4db509c1e 100644
--- a/src/meta/src/hummock/manager/mod.rs
+++ b/src/meta/src/hummock/manager/mod.rs
@@ -488,7 +488,11 @@ impl HummockManager {
                 .map(|version_delta| (version_delta.id, version_delta))
                 .collect();
 
-        let mut redo_state = if self.need_init().await? {
+        let checkpoint = self.try_read_checkpoint().await?;
+        let mut redo_state = if let Some(c) = checkpoint {
+            versioning_guard.checkpoint = c;
+            versioning_guard.checkpoint.version.clone()
+        } else {
             let default_compaction_config = self
                 .compaction_group_manager
                 .read()
@@ -496,6 +500,7 @@ impl HummockManager {
                 .default_compaction_config();
             let checkpoint_version = create_init_version(default_compaction_config);
             tracing::info!("init hummock version checkpoint");
+            // This write to meta store is idempotent. So if `write_checkpoint` fails, restarting meta node is fine.
             HummockVersionStats::default()
                 .insert(self.env.meta_store())
                 .await?;
@@ -504,13 +509,9 @@ impl HummockManager {
                 stale_objects: Default::default(),
             };
             self.write_checkpoint(&versioning_guard.checkpoint).await?;
-            self.mark_init().await?;
             checkpoint_version
-        } else {
-            // Read checkpoint from object store.
-            versioning_guard.checkpoint = self.read_checkpoint().await?;
-            versioning_guard.checkpoint.version.clone()
         };
+
         versioning_guard.version_stats = HummockVersionStats::list(self.env.meta_store())
             .await?
             .into_iter()