From 93ea4c4e6f477092f0e381d11cedb9e9fc9f20de Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 29 Apr 2024 17:27:43 +0800 Subject: [PATCH 01/23] add prefix for azblob --- .../object/opendal_engine/opendal_object_store.rs | 2 +- src/object_store/src/object/prefix.rs | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index d7ba829dda63..c77607211d71 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -73,7 +73,7 @@ impl ObjectStore for OpendalObjectStore { EngineType::Obs => String::default(), EngineType::Oss => String::default(), EngineType::Webhdfs => String::default(), - EngineType::Azblob => String::default(), + EngineType::Azblob => prefix::azblob::get_object_prefix(obj_id), EngineType::Fs => String::default(), } } diff --git a/src/object_store/src/object/prefix.rs b/src/object_store/src/object/prefix.rs index e29729bf752f..f8e98b6af880 100644 --- a/src/object_store/src/object/prefix.rs +++ b/src/object_store/src/object/prefix.rs @@ -23,3 +23,15 @@ pub(crate) mod s3 { obj_prefix } } + +pub(crate) mod azblob { + /// The number of Azblob bucket prefixes + pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256; + + pub(crate) fn get_object_prefix(obj_id: u64) -> String { + let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; + let mut obj_prefix = prefix.to_string(); + obj_prefix.push('/'); + obj_prefix + } +} From c9da117ae1209b9a21b5e56f608d7a87602672ac Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 23 May 2024 18:29:16 +0800 Subject: [PATCH 02/23] save work --- proto/meta.proto | 1 + src/meta/src/error.rs | 1 + src/meta/src/hummock/manager/mod.rs | 28 +++++++++++++++++++--------- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index d6c3ffe709d8..3ede5fceb5b9 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -599,6 +599,7 @@ message SystemParams { optional bool pause_on_next_bootstrap = 13; optional string wasm_storage_url = 14 [deprecated = true]; optional bool enable_tracing = 15; + optional bool is_new_cluster = 16; } message GetSystemParamsRequest {} diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a..d16282587909 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -197,3 +197,4 @@ impl From for MetaError { } } } + diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 92d99d738af7..8b75defb6b25 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -18,7 +18,6 @@ use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant, SystemTime}; - use anyhow::Context; use arc_swap::ArcSwap; use bytes::Bytes; @@ -49,7 +48,7 @@ use risingwave_hummock_sdk::{ }; use risingwave_meta_model_v2::{ compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version, - hummock_version_delta, hummock_version_stats, + hummock_version_delta, hummock_version_stats, object, }; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; @@ -74,10 +73,10 @@ use tokio_stream::wrappers::IntervalStream; use tonic::Streaming; use tracing::warn; -use crate::hummock::compaction::selector::{ +use crate::{hummock::compaction::selector::{ DynamicLevelSelector, LocalSelectorStatistic, ManualCompactionOption, ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector, TtlCompactionSelector, -}; +}, manager::SystemParamsManagerImpl}; use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ @@ -386,13 +385,24 @@ impl HummockManager { // Skip this check in e2e compaction test, which needs to start a secondary cluster with // same bucket if !deterministic_mode { - write_exclusive_cluster_id( + let is_new_cluster = write_exclusive_cluster_id( state_store_dir, env.cluster_id().clone(), object_store.clone(), ) .await?; - + if is_new_cluster{ + match env.system_params_manager_impl_ref() { + SystemParamsManagerImpl::Kv(mgr) => { + mgr.set_param("is_new_cluster", Some("true".to_owned())) + .await.unwrap(); + } + SystemParamsManagerImpl::Sql(mgr) => { + mgr.set_param("is_new_cluster", Some("true".to_owned())) + .await.unwrap(); + } + }; + } // config bucket lifecycle for new cluster. if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref() && !env.opts.do_not_config_object_storage_lifecycle @@ -3426,7 +3436,7 @@ async fn write_exclusive_cluster_id( state_store_dir: &str, cluster_id: ClusterId, object_store: ObjectStoreRef, -) -> Result<()> { +) -> Result { const CLUSTER_ID_DIR: &str = "cluster_id"; const CLUSTER_ID_NAME: &str = "0"; let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR); @@ -3435,7 +3445,7 @@ async fn write_exclusive_cluster_id( Ok(stored_cluster_id) => { let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap(); if cluster_id.deref() == stored_cluster_id { - return Ok(()); + return Ok(false); } Err(ObjectError::internal(format!( @@ -3449,7 +3459,7 @@ async fn write_exclusive_cluster_id( object_store .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id))) .await?; - return Ok(()); + return Ok(true); } Err(e.into()) } From 7ad7a1a940767daaef200bd4a95b712fa4bca68a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 24 May 2024 18:13:35 +0800 Subject: [PATCH 03/23] save work --- src/common/src/system_param/mod.rs | 1 + src/common/src/system_param/reader.rs | 4 +++ src/compute/src/server.rs | 4 +-- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 6 +++- src/ctl/src/common/hummock_service.rs | 17 ++++++++- src/jni_core/src/hummock_iterator.rs | 1 + src/meta/src/error.rs | 1 - src/meta/src/hummock/manager/mod.rs | 35 +++++++++++-------- src/object_store/src/object/mem.rs | 2 +- src/object_store/src/object/mod.rs | 12 +++---- .../opendal_engine/opendal_object_store.rs | 4 +-- src/object_store/src/object/prefix.rs | 19 +++++++--- src/object_store/src/object/s3.rs | 2 +- src/storage/compactor/src/server.rs | 1 + .../hummock_test/src/bin/replay/main.rs | 1 + .../src/hummock/iterator/test_utils.rs | 1 + src/storage/src/hummock/sstable_store.rs | 7 +++- src/storage/src/store_impl.rs | 2 ++ .../src/compaction_test_runner.rs | 1 + .../src/delete_range_runner.rs | 1 + 20 files changed, 86 insertions(+), 36 deletions(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 06d4cce2e4e6..87e019469b1f 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -87,6 +87,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, + { is_new_cluster, bool, Some(false), true, "Whether to devide object prefix", }, } }; } diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 3374e7212023..9e7f0f745e24 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -137,6 +137,10 @@ where self.inner().data_directory.as_ref().unwrap() } + fn is_new_cluster(&self) -> bool { + self.inner().is_new_cluster.unwrap() + } + fn backup_storage_url(&self) -> &str { self.inner().backup_storage_url.as_ref().unwrap() } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 9f1c10a0c15b..05ee53518ff8 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -33,7 +33,7 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::pretty_bytes::convert; -use risingwave_common::{GIT_SHA, RW_VERSION}; +use risingwave_common::{system_param, GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::observer_manager::ObserverManager; @@ -88,7 +88,6 @@ pub async fn compute_node_serve( ) -> (Vec>, Sender<()>) { // Load the configuration. let config = load_config(&opts.config_path, &opts); - info!("Starting compute node",); info!("> config: {:?}", config); info!( @@ -194,6 +193,7 @@ pub async fn compute_node_serve( storage_metrics.clone(), compactor_metrics.clone(), await_tree_config.clone(), + system_params.is_new_cluster(), ) .await .unwrap(); diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 0fc65054b51e..cb7544f2d022 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -59,6 +59,8 @@ pub struct SstDumpArgs { print_table: bool, #[clap(short = 'd')] data_dir: Option, + #[clap(short, long = "devide_prefix")] + devide_prefix: Option, } pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { @@ -109,7 +111,9 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result } else { // Object information is retrieved from object store. Meta service is not required. let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; - let sstable_store = hummock_service_opts.create_sstable_store().await?; + let sstable_store = hummock_service_opts + .create_sstable_store(args.devide_prefix.unwrap_or_default()) + .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); let obj_path = sstable_store.get_sst_data_path(*obj_id); diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index e10d3669af60..4e17f8017a71 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -35,6 +35,8 @@ pub struct HummockServiceOpts { pub hummock_url: String, pub data_dir: Option, + devide_prefix: bool, + heartbeat_handle: Option>, heartbeat_shutdown_sender: Option>, } @@ -79,11 +81,22 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; + let devide_prefix = match env::var("RW_OBJECT_STORE_DEVIDE_PREFIX") { + Ok(devide_prefix) => devide_prefix == "true", + Err(_) => { + const MESSAGE: &str = "env variable `RW_OBJECT_STORE_DEVIDE_PREFIX` not found. + + "; + bail!(MESSAGE); + } + }; + Ok(Self { hummock_url, data_dir, heartbeat_handle: None, heartbeat_shutdown_sender: None, + devide_prefix, }) } @@ -141,6 +154,7 @@ impl HummockServiceOpts { metrics.storage_metrics.clone(), metrics.compactor_metrics.clone(), None, + self.devide_prefix, ) .await?; @@ -156,7 +170,7 @@ impl HummockServiceOpts { } } - pub async fn create_sstable_store(&self) -> Result> { + pub async fn create_sstable_store(&self, devide_prefix: bool) -> Result> { let object_store = build_remote_object_store( self.hummock_url.strip_prefix("hummock+").unwrap(), Arc::new(ObjectStoreMetrics::unused()), @@ -184,6 +198,7 @@ impl HummockServiceOpts { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), + devide_prefix, }))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 42e584fb3820..2e87e21c343f 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -94,6 +94,7 @@ impl HummockJavaBindingIterator { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), + devide_prefix: false, })); let reader = HummockVersionReader::new( sstable_store, diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index d16282587909..8aeaed2f9c5a 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -197,4 +197,3 @@ impl From for MetaError { } } } - diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8b75defb6b25..4d12bc43496d 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -18,6 +18,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant, SystemTime}; + use anyhow::Context; use arc_swap::ArcSwap; use bytes::Bytes; @@ -73,10 +74,10 @@ use tokio_stream::wrappers::IntervalStream; use tonic::Streaming; use tracing::warn; -use crate::{hummock::compaction::selector::{ +use crate::hummock::compaction::selector::{ DynamicLevelSelector, LocalSelectorStatistic, ManualCompactionOption, ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector, TtlCompactionSelector, -}, manager::SystemParamsManagerImpl}; +}; use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ @@ -89,7 +90,9 @@ use crate::hummock::sequence::next_compaction_task_id; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; #[cfg(any(test, feature = "test"))] use crate::manager::{ClusterManagerRef, FragmentManagerRef}; -use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager, META_NODE_ID}; +use crate::manager::{ + MetaSrvEnv, MetaStoreImpl, MetadataManager, SystemParamsManagerImpl, META_NODE_ID, +}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapEntryTransactionWrapper, BTreeMapTransaction, BTreeMapTransactionWrapper, ClusterId, MetadataModel, MetadataModelError, ValTransaction, @@ -391,18 +394,20 @@ impl HummockManager { object_store.clone(), ) .await?; - if is_new_cluster{ - match env.system_params_manager_impl_ref() { - SystemParamsManagerImpl::Kv(mgr) => { - mgr.set_param("is_new_cluster", Some("true".to_owned())) - .await.unwrap(); - } - SystemParamsManagerImpl::Sql(mgr) => { - mgr.set_param("is_new_cluster", Some("true".to_owned())) - .await.unwrap(); - } - }; - } + if is_new_cluster { + match env.system_params_manager_impl_ref() { + SystemParamsManagerImpl::Kv(mgr) => { + mgr.set_param("is_new_cluster", Some("true".to_owned())) + .await + .unwrap(); + } + SystemParamsManagerImpl::Sql(mgr) => { + mgr.set_param("is_new_cluster", Some("true".to_owned())) + .await + .unwrap(); + } + }; + } // config bucket lifecycle for new cluster. if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref() && !env.opts.do_not_config_object_storage_lifecycle diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 3a1a7ed655e8..fbe8e7830604 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -101,7 +101,7 @@ pub struct InMemObjectStore { #[async_trait::async_trait] impl ObjectStore for InMemObjectStore { - fn get_object_prefix(&self, _obj_id: u64) -> String { + fn get_object_prefix(&self, _obj_id: u64, devide_prefix: bool) -> String { String::default() } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index a623e4b116fd..48119fc2559b 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -89,7 +89,7 @@ pub trait StreamingUploader: Send { #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { /// Get the key prefix for object - fn get_object_prefix(&self, obj_id: u64) -> String; + fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String; /// Uploads the object to `ObjectStore`. async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>; @@ -252,16 +252,16 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, list, dispatch_async, prefix) } - pub fn get_object_prefix(&self, obj_id: u64) -> String { + pub fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String { // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore // trait. Macro object_store_impl_method_body routes to local or remote only depending on // the path match self { - ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), + ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id, devide_prefix), + ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id, devide_prefix), + ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id, devide_prefix), #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id), + ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id, devide_prefix), } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index c77607211d71..f6a9e0491f86 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -63,7 +63,7 @@ impl OpendalObjectStore { #[async_trait::async_trait] impl ObjectStore for OpendalObjectStore { - fn get_object_prefix(&self, obj_id: u64) -> String { + fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), EngineType::Minio => prefix::s3::get_object_prefix(obj_id), @@ -73,7 +73,7 @@ impl ObjectStore for OpendalObjectStore { EngineType::Obs => String::default(), EngineType::Oss => String::default(), EngineType::Webhdfs => String::default(), - EngineType::Azblob => prefix::azblob::get_object_prefix(obj_id), + EngineType::Azblob => prefix::azblob::get_object_prefix(obj_id, devide_prefix), EngineType::Fs => String::default(), } } diff --git a/src/object_store/src/object/prefix.rs b/src/object_store/src/object/prefix.rs index f8e98b6af880..e3dc82a622d3 100644 --- a/src/object_store/src/object/prefix.rs +++ b/src/object_store/src/object/prefix.rs @@ -28,10 +28,19 @@ pub(crate) mod azblob { /// The number of Azblob bucket prefixes pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256; - pub(crate) fn get_object_prefix(obj_id: u64) -> String { - let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; - let mut obj_prefix = prefix.to_string(); - obj_prefix.push('/'); - obj_prefix + pub(crate) fn get_object_prefix(obj_id: u64, devide_prefix: bool) -> String { + match devide_prefix { + true => { + let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; + let mut obj_prefix = prefix.to_string(); + obj_prefix.push('/'); + obj_prefix + } + false => { + let mut obj_prefix = (NUM_BUCKET_PREFIXES_AZBLOB + 1).to_string(); + obj_prefix.push('/'); + obj_prefix + } + } } } diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 9d48f1175976..3e173cf04b09 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -314,7 +314,7 @@ pub struct S3ObjectStore { #[async_trait::async_trait] impl ObjectStore for S3ObjectStore { - fn get_object_prefix(&self, obj_id: u64) -> String { + fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. prefix::s3::get_object_prefix(obj_id) } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 95ee16ca3862..9889a5cddb87 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -132,6 +132,7 @@ pub async fn prepare_start_parameters( storage_opts.data_directory.to_string(), 1 << 20, // set 1MB memory to avoid panic. meta_cache_capacity_bytes, + system_params_reader.is_new_cluster(), )); let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes)); diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 693a6448a4dd..08ea5c06d9c1 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -126,6 +126,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result SstableSto meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + devide_prefix: false, })) } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 3d4a4d1e642a..e1df4645898d 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -202,6 +202,7 @@ pub struct SstableStoreConfig { pub meta_file_cache: FileCache, pub recent_filter: Option>>, pub state_store_metrics: Arc, + pub devide_prefix: bool, } pub struct SstableStore { @@ -220,6 +221,7 @@ pub struct SstableStore { prefetch_buffer_usage: Arc, prefetch_buffer_capacity: usize, max_prefetch_block_number: usize, + devide_prefix: bool, } impl SstableStore { @@ -286,6 +288,7 @@ impl SstableStore { prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)), prefetch_buffer_capacity: config.prefetch_buffer_capacity, max_prefetch_block_number: config.max_prefetch_block_number, + devide_prefix: config.devide_prefix, } } @@ -296,6 +299,7 @@ impl SstableStore { path: String, block_cache_capacity: usize, meta_cache_capacity: usize, + devide_prefix: bool, ) -> Self { let meta_cache = Arc::new(Cache::lru(LruCacheConfig { capacity: meta_cache_capacity, @@ -328,6 +332,7 @@ impl SstableStore { prefetch_buffer_capacity: block_cache_capacity, max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */ recent_filter: None, + devide_prefix, } } @@ -621,7 +626,7 @@ impl SstableStore { } pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String { - let obj_prefix = self.store.get_object_prefix(object_id); + let obj_prefix = self.store.get_object_prefix(object_id, self.devide_prefix); format!( "{}/{}{}.{}", self.path, obj_prefix, object_id, OBJECT_SUFFIX diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index e7c8c6e45f7e..ef2617c8594f 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -565,6 +565,7 @@ impl StateStoreImpl { storage_metrics: Arc, compactor_metrics: Arc, await_tree_config: Option, + devide_prefix: bool, ) -> StorageResult { set_foyer_metrics_registry(GLOBAL_METRICS_REGISTRY.clone()); @@ -662,6 +663,7 @@ impl StateStoreImpl { meta_file_cache, recent_filter, state_store_metrics: state_store_metrics.clone(), + devide_prefix, })); let notification_client = RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 48e8311816d0..d81117059309 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -711,6 +711,7 @@ pub async fn create_hummock_store_with_metrics( metrics.storage_metrics.clone(), metrics.compactor_metrics.clone(), None, + false, ) .await?; diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 341e89a58af8..0a57ee655560 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -225,6 +225,7 @@ async fn compaction_test( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: state_store_metrics.clone(), + devide_prefix: system_params.is_new_cluster(), })); let store = HummockStorage::new( From 5798276af6ad9d1e3900b720964ec730da36ef11 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 27 May 2024 11:02:53 +0800 Subject: [PATCH 04/23] todo, handle ctl --- src/compute/src/server.rs | 2 +- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 6 ++--- src/ctl/src/common/hummock_service.rs | 20 ++++++++++------- src/jni_core/src/hummock_iterator.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 2 +- src/object_store/src/object/mem.rs | 2 +- src/object_store/src/object/mod.rs | 20 ++++++++++++----- .../opendal_engine/opendal_object_store.rs | 4 ++-- src/object_store/src/object/prefix.rs | 8 +++++-- src/object_store/src/object/s3.rs | 3 ++- src/storage/benches/bench_compactor.rs | 1 + src/storage/benches/bench_multi_builder.rs | 1 + .../hummock_test/src/bin/replay/main.rs | 2 +- .../src/hummock/iterator/test_utils.rs | 2 +- src/storage/src/hummock/sstable_store.rs | 22 ++++++++++++++----- src/storage/src/store_impl.rs | 4 ++-- .../src/delete_range_runner.rs | 2 +- 17 files changed, 66 insertions(+), 37 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 05ee53518ff8..3a7700b20bd7 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -33,7 +33,7 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::pretty_bytes::convert; -use risingwave_common::{system_param, GIT_SHA, RW_VERSION}; +use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::observer_manager::ObserverManager; diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index cb7544f2d022..0fc35570a99c 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -59,8 +59,8 @@ pub struct SstDumpArgs { print_table: bool, #[clap(short = 'd')] data_dir: Option, - #[clap(short, long = "devide_prefix")] - devide_prefix: Option, + #[clap(short, long = "devide_object_prefix")] + devide_object_prefix: Option, } pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { @@ -112,7 +112,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result // Object information is retrieved from object store. Meta service is not required. let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; let sstable_store = hummock_service_opts - .create_sstable_store(args.devide_prefix.unwrap_or_default()) + .create_sstable_store(args.devide_object_prefix.unwrap_or_default()) .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 4e17f8017a71..6839c493e6ca 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -35,7 +35,7 @@ pub struct HummockServiceOpts { pub hummock_url: String, pub data_dir: Option, - devide_prefix: bool, + devide_object_prefix: bool, heartbeat_handle: Option>, heartbeat_shutdown_sender: Option>, @@ -81,10 +81,11 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; - let devide_prefix = match env::var("RW_OBJECT_STORE_DEVIDE_PREFIX") { - Ok(devide_prefix) => devide_prefix == "true", + let devide_object_prefix = match env::var("RW_OBJECT_STORE_devide_object_prefix") { + Ok(devide_object_prefix) => devide_object_prefix == "true", Err(_) => { - const MESSAGE: &str = "env variable `RW_OBJECT_STORE_DEVIDE_PREFIX` not found. + const MESSAGE: &str = + "env variable `RW_OBJECT_STORE_devide_object_prefix` not found. "; bail!(MESSAGE); @@ -96,7 +97,7 @@ impl HummockServiceOpts { data_dir, heartbeat_handle: None, heartbeat_shutdown_sender: None, - devide_prefix, + devide_object_prefix, }) } @@ -154,7 +155,7 @@ impl HummockServiceOpts { metrics.storage_metrics.clone(), metrics.compactor_metrics.clone(), None, - self.devide_prefix, + self.devide_object_prefix, ) .await?; @@ -170,7 +171,10 @@ impl HummockServiceOpts { } } - pub async fn create_sstable_store(&self, devide_prefix: bool) -> Result> { + pub async fn create_sstable_store( + &self, + devide_object_prefix: bool, + ) -> Result> { let object_store = build_remote_object_store( self.hummock_url.strip_prefix("hummock+").unwrap(), Arc::new(ObjectStoreMetrics::unused()), @@ -198,7 +202,7 @@ impl HummockServiceOpts { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), - devide_prefix, + devide_object_prefix, }))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 2e87e21c343f..bbc7a862ffcc 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -94,7 +94,7 @@ impl HummockJavaBindingIterator { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), - devide_prefix: false, + devide_object_prefix: false, })); let reader = HummockVersionReader::new( sstable_store, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4d12bc43496d..604811c2cc90 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -49,7 +49,7 @@ use risingwave_hummock_sdk::{ }; use risingwave_meta_model_v2::{ compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version, - hummock_version_delta, hummock_version_stats, object, + hummock_version_delta, hummock_version_stats, }; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index fbe8e7830604..12dd5b246e26 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -101,7 +101,7 @@ pub struct InMemObjectStore { #[async_trait::async_trait] impl ObjectStore for InMemObjectStore { - fn get_object_prefix(&self, _obj_id: u64, devide_prefix: bool) -> String { + fn get_object_prefix(&self, _obj_id: u64, _devide_object_prefix: bool) -> String { String::default() } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 48119fc2559b..184feb1a3dbf 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -89,7 +89,7 @@ pub trait StreamingUploader: Send { #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { /// Get the key prefix for object - fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String; + fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String; /// Uploads the object to `ObjectStore`. async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>; @@ -252,16 +252,24 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, list, dispatch_async, prefix) } - pub fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String { + pub fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String { // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore // trait. Macro object_store_impl_method_body routes to local or remote only depending on // the path match self { - ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id, devide_prefix), - ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id, devide_prefix), - ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id, devide_prefix), + ObjectStoreImpl::InMem(store) => { + store.inner.get_object_prefix(obj_id, devide_object_prefix) + } + ObjectStoreImpl::Opendal(store) => { + store.inner.get_object_prefix(obj_id, devide_object_prefix) + } + ObjectStoreImpl::S3(store) => { + store.inner.get_object_prefix(obj_id, devide_object_prefix) + } #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id, devide_prefix), + ObjectStoreImpl::Sim(store) => { + store.inner.get_object_prefix(obj_id, devide_object_prefix) + } } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index f6a9e0491f86..78fd0f18b367 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -63,7 +63,7 @@ impl OpendalObjectStore { #[async_trait::async_trait] impl ObjectStore for OpendalObjectStore { - fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String { + fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), EngineType::Minio => prefix::s3::get_object_prefix(obj_id), @@ -73,7 +73,7 @@ impl ObjectStore for OpendalObjectStore { EngineType::Obs => String::default(), EngineType::Oss => String::default(), EngineType::Webhdfs => String::default(), - EngineType::Azblob => prefix::azblob::get_object_prefix(obj_id, devide_prefix), + EngineType::Azblob => prefix::azblob::get_object_prefix(obj_id, devide_object_prefix), EngineType::Fs => String::default(), } } diff --git a/src/object_store/src/object/prefix.rs b/src/object_store/src/object/prefix.rs index e3dc82a622d3..ab50c3847ba5 100644 --- a/src/object_store/src/object/prefix.rs +++ b/src/object_store/src/object/prefix.rs @@ -28,8 +28,12 @@ pub(crate) mod azblob { /// The number of Azblob bucket prefixes pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256; - pub(crate) fn get_object_prefix(obj_id: u64, devide_prefix: bool) -> String { - match devide_prefix { + pub(crate) fn get_object_prefix(obj_id: u64, devide_object_prefix: bool) -> String { + // For Azure Blob Storage, whether objects are divided by prefixes depends on whether it is a new cluster. + // If it is a new cluster, objects will be divided into NUM_BUCKET_PREFIXES_AZBLOB prefixes. + // If it is an old cluster, prefixes are not used due to the need to read and write old data. + // The decision of whether it is a new or old cluster is determined by the input parameter 'devide_object_prefix'. + match devide_object_prefix { true => { let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; let mut obj_prefix = prefix.to_string(); diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 3e173cf04b09..2d2aad4c6b2f 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -314,8 +314,9 @@ pub struct S3ObjectStore { #[async_trait::async_trait] impl ObjectStore for S3ObjectStore { - fn get_object_prefix(&self, obj_id: u64, devide_prefix: bool) -> String { + fn get_object_prefix(&self, obj_id: u64, _devide_object_prefix: bool) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. + // Using aws s3 sdk as object storage, the object prefix will be devised by default. prefix::s3::get_object_prefix(obj_id) } diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 02a0de8c7e23..f944aad4c4a7 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -73,6 +73,7 @@ pub fn mock_sstable_store() -> SstableStoreRef { meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + devide_object_prefix: false, })) } diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index ef07fd107986..89c63718a29a 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -156,6 +156,7 @@ fn bench_builder( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + devide_object_prefix: false, })); let mut group = c.benchmark_group("bench_multi_builder"); diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 08ea5c06d9c1..e3cde61feb4e 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -126,7 +126,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result SstableSto meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), - devide_prefix: false, + devide_object_prefix: false, })) } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index e1df4645898d..f0cadc8bf32e 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -202,7 +202,7 @@ pub struct SstableStoreConfig { pub meta_file_cache: FileCache, pub recent_filter: Option>>, pub state_store_metrics: Arc, - pub devide_prefix: bool, + pub devide_object_prefix: bool, } pub struct SstableStore { @@ -221,7 +221,15 @@ pub struct SstableStore { prefetch_buffer_usage: Arc, prefetch_buffer_capacity: usize, max_prefetch_block_number: usize, - devide_prefix: bool, + /// Whether the object store is divided into prefixes depends on two factors: + /// 1. The specific object store type. + /// 2. Whether the existing cluster is a new cluster. + /// + /// The value of 'devide_object_prefix' is determined by the 'is_new_cluster' field in the system parameters. + /// For a new cluster, 'devide_object_prefix' is set to True. + /// For an old cluster, 'devide_object_prefix' is set to False. + /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility. + devide_object_prefix: bool, } impl SstableStore { @@ -288,7 +296,7 @@ impl SstableStore { prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)), prefetch_buffer_capacity: config.prefetch_buffer_capacity, max_prefetch_block_number: config.max_prefetch_block_number, - devide_prefix: config.devide_prefix, + devide_object_prefix: config.devide_object_prefix, } } @@ -299,7 +307,7 @@ impl SstableStore { path: String, block_cache_capacity: usize, meta_cache_capacity: usize, - devide_prefix: bool, + devide_object_prefix: bool, ) -> Self { let meta_cache = Arc::new(Cache::lru(LruCacheConfig { capacity: meta_cache_capacity, @@ -332,7 +340,7 @@ impl SstableStore { prefetch_buffer_capacity: block_cache_capacity, max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */ recent_filter: None, - devide_prefix, + devide_object_prefix, } } @@ -626,7 +634,9 @@ impl SstableStore { } pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String { - let obj_prefix = self.store.get_object_prefix(object_id, self.devide_prefix); + let obj_prefix = self + .store + .get_object_prefix(object_id, self.devide_object_prefix); format!( "{}/{}{}.{}", self.path, obj_prefix, object_id, OBJECT_SUFFIX diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index ef2617c8594f..234df17eca00 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -565,7 +565,7 @@ impl StateStoreImpl { storage_metrics: Arc, compactor_metrics: Arc, await_tree_config: Option, - devide_prefix: bool, + devide_object_prefix: bool, ) -> StorageResult { set_foyer_metrics_registry(GLOBAL_METRICS_REGISTRY.clone()); @@ -663,7 +663,7 @@ impl StateStoreImpl { meta_file_cache, recent_filter, state_store_metrics: state_store_metrics.clone(), - devide_prefix, + devide_object_prefix, })); let notification_client = RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 0a57ee655560..1bca47ef3979 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -225,7 +225,7 @@ async fn compaction_test( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: state_store_metrics.clone(), - devide_prefix: system_params.is_new_cluster(), + devide_object_prefix: system_params.is_new_cluster(), })); let store = HummockStorage::new( From 800cc64a1628b6fa68c6399249f1f1c975740c65 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 27 May 2024 14:27:39 +0800 Subject: [PATCH 05/23] add some comments --- src/common/src/system_param/reader.rs | 2 +- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 2 +- src/ctl/src/common/hummock_service.rs | 10 ++-------- src/meta/src/hummock/manager/mod.rs | 5 +++++ src/object_store/src/object/mod.rs | 2 +- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 9e7f0f745e24..6d847f476256 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -138,7 +138,7 @@ where } fn is_new_cluster(&self) -> bool { - self.inner().is_new_cluster.unwrap() + self.inner().is_new_cluster.unwrap_or(true) } fn backup_storage_url(&self) -> &str { diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 0fc35570a99c..8cd01ea5f07f 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -112,7 +112,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result // Object information is retrieved from object store. Meta service is not required. let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; let sstable_store = hummock_service_opts - .create_sstable_store(args.devide_object_prefix.unwrap_or_default()) + .create_sstable_store(args.devide_object_prefix.unwrap_or(true)) .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 6839c493e6ca..4410dda61d81 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -81,15 +81,9 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; - let devide_object_prefix = match env::var("RW_OBJECT_STORE_devide_object_prefix") { + let devide_object_prefix = match env::var("RW_OBJECT_STORE_DEVIDE_OBJECT_PREFIX") { Ok(devide_object_prefix) => devide_object_prefix == "true", - Err(_) => { - const MESSAGE: &str = - "env variable `RW_OBJECT_STORE_devide_object_prefix` not found. - - "; - bail!(MESSAGE); - } + _ => true, }; Ok(Self { diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 604811c2cc90..4e2143b7f9c0 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -3437,6 +3437,11 @@ fn gen_version_delta<'a>( version_delta } +/// This function, `write_exclusive_cluster_id`, is used to check if it is a new cluster during meta startup. +/// +/// The determination of a new or old cluster is based on whether the file "0.data" in the `data_directory/cluster_id` path has been written. +/// +/// In all other cases, the function returns a boolean variable indicating whether it is a new cluster. async fn write_exclusive_cluster_id( state_store_dir: &str, cluster_id: ClusterId, diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 184feb1a3dbf..b28f7ff049c5 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -88,7 +88,7 @@ pub trait StreamingUploader: Send { /// The implementation must be thread-safe. #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { - /// Get the key prefix for object + /// Get the key prefix for object, the prefix is determined by the type of object store and `devise_object_prefix`. fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String; /// Uploads the object to `ObjectStore`. From 912f913dc6da704cd972012fbe39116957e13f21 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 27 May 2024 17:24:10 +0800 Subject: [PATCH 06/23] refactor --- proto/meta.proto | 2 +- src/common/src/system_param/mod.rs | 2 +- src/common/src/system_param/reader.rs | 4 +-- src/compute/src/server.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 29 ++++++++++++++----- .../opendal_engine/opendal_object_store.rs | 16 +++++----- src/object_store/src/object/prefix.rs | 11 ++----- src/storage/compactor/src/server.rs | 2 +- src/storage/src/hummock/sstable_store.rs | 2 +- .../src/delete_range_runner.rs | 2 +- 10 files changed, 41 insertions(+), 31 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 3ede5fceb5b9..ce1b945fb905 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -599,7 +599,7 @@ message SystemParams { optional bool pause_on_next_bootstrap = 13; optional string wasm_storage_url = 14 [deprecated = true]; optional bool enable_tracing = 15; - optional bool is_new_cluster = 16; + optional bool use_new_object_prefix_strategy = 16; } message GetSystemParamsRequest {} diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 87e019469b1f..3e9e4bc3b768 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -87,7 +87,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, - { is_new_cluster, bool, Some(false), true, "Whether to devide object prefix", }, + { use_new_object_prefix_strategy, bool, Some(false), true, "Whether to devide object prefix.", }, } }; } diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 6d847f476256..a55b5b33df1b 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -137,8 +137,8 @@ where self.inner().data_directory.as_ref().unwrap() } - fn is_new_cluster(&self) -> bool { - self.inner().is_new_cluster.unwrap_or(true) + fn use_new_object_prefix_strategy(&self) -> bool { + self.inner().use_new_object_prefix_strategy.unwrap_or(true) } fn backup_storage_url(&self) -> &str { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 3a7700b20bd7..59374a33f915 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -193,7 +193,7 @@ pub async fn compute_node_serve( storage_metrics.clone(), compactor_metrics.clone(), await_tree_config.clone(), - system_params.is_new_cluster(), + system_params.use_new_object_prefix_strategy(), ) .await .unwrap(); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4e2143b7f9c0..cb6a41dd1fdb 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -388,21 +388,34 @@ impl HummockManager { // Skip this check in e2e compaction test, which needs to start a secondary cluster with // same bucket if !deterministic_mode { - let is_new_cluster = write_exclusive_cluster_id( + let use_new_object_prefix_strategy = write_exclusive_cluster_id( state_store_dir, env.cluster_id().clone(), object_store.clone(), ) .await?; - if is_new_cluster { + if use_new_object_prefix_strategy { match env.system_params_manager_impl_ref() { SystemParamsManagerImpl::Kv(mgr) => { - mgr.set_param("is_new_cluster", Some("true".to_owned())) + mgr.set_param("use_new_object_prefix_strategy", Some("true".to_owned())) .await .unwrap(); } SystemParamsManagerImpl::Sql(mgr) => { - mgr.set_param("is_new_cluster", Some("true".to_owned())) + mgr.set_param("use_new_object_prefix_strategy", Some("true".to_owned())) + .await + .unwrap(); + } + }; + } else { + match env.system_params_manager_impl_ref() { + SystemParamsManagerImpl::Kv(mgr) => { + mgr.set_param("use_new_object_prefix_strategy", Some("false".to_owned())) + .await + .unwrap(); + } + SystemParamsManagerImpl::Sql(mgr) => { + mgr.set_param("use_new_object_prefix_strategy", Some("false".to_owned())) .await .unwrap(); } @@ -3437,11 +3450,11 @@ fn gen_version_delta<'a>( version_delta } -/// This function, `write_exclusive_cluster_id`, is used to check if it is a new cluster during meta startup. -/// -/// The determination of a new or old cluster is based on whether the file "0.data" in the `data_directory/cluster_id` path has been written. +/// This function, `write_exclusive_cluster_id`, is used to check if it is a new cluster during meta startup: +/// For new clusters, the name of the object store needs to be prefixed according to the object id. +/// For old clusters, the prefix is ​​not divided for the sake of compatibility. /// -/// In all other cases, the function returns a boolean variable indicating whether it is a new cluster. +/// The return value of this function represents whether to adopt the new object prefix strategy. async fn write_exclusive_cluster_id( state_store_dir: &str, cluster_id: ClusterId, diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 78fd0f18b367..e8b25ff48573 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -68,13 +68,15 @@ impl ObjectStore for OpendalObjectStore { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), EngineType::Minio => prefix::s3::get_object_prefix(obj_id), EngineType::Memory => String::default(), - EngineType::Hdfs => String::default(), - EngineType::Gcs => String::default(), - EngineType::Obs => String::default(), - EngineType::Oss => String::default(), - EngineType::Webhdfs => String::default(), - EngineType::Azblob => prefix::azblob::get_object_prefix(obj_id, devide_object_prefix), - EngineType::Fs => String::default(), + EngineType::Hdfs + | EngineType::Gcs + | EngineType::Obs + | EngineType::Oss + | EngineType::Webhdfs + | EngineType::Azblob + | EngineType::Fs => { + prefix::opendal_engine::get_object_prefix(obj_id, devide_object_prefix) + } } } diff --git a/src/object_store/src/object/prefix.rs b/src/object_store/src/object/prefix.rs index ab50c3847ba5..ba0f5aa039cb 100644 --- a/src/object_store/src/object/prefix.rs +++ b/src/object_store/src/object/prefix.rs @@ -24,15 +24,14 @@ pub(crate) mod s3 { } } -pub(crate) mod azblob { +pub(crate) mod opendal_engine { /// The number of Azblob bucket prefixes pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256; pub(crate) fn get_object_prefix(obj_id: u64, devide_object_prefix: bool) -> String { - // For Azure Blob Storage, whether objects are divided by prefixes depends on whether it is a new cluster. + // For OpenDAL object storage, whether objects are divided by prefixes depends on whether it is a new cluster: // If it is a new cluster, objects will be divided into NUM_BUCKET_PREFIXES_AZBLOB prefixes. // If it is an old cluster, prefixes are not used due to the need to read and write old data. - // The decision of whether it is a new or old cluster is determined by the input parameter 'devide_object_prefix'. match devide_object_prefix { true => { let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; @@ -40,11 +39,7 @@ pub(crate) mod azblob { obj_prefix.push('/'); obj_prefix } - false => { - let mut obj_prefix = (NUM_BUCKET_PREFIXES_AZBLOB + 1).to_string(); - obj_prefix.push('/'); - obj_prefix - } + false => String::default(), } } } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 9889a5cddb87..12ea5b4ee454 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -132,7 +132,7 @@ pub async fn prepare_start_parameters( storage_opts.data_directory.to_string(), 1 << 20, // set 1MB memory to avoid panic. meta_cache_capacity_bytes, - system_params_reader.is_new_cluster(), + system_params_reader.use_new_object_prefix_strategy(), )); let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes)); diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index f0cadc8bf32e..8ea7664533c7 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -225,7 +225,7 @@ pub struct SstableStore { /// 1. The specific object store type. /// 2. Whether the existing cluster is a new cluster. /// - /// The value of 'devide_object_prefix' is determined by the 'is_new_cluster' field in the system parameters. + /// The value of 'devide_object_prefix' is determined by the 'use_new_object_prefix_strategy' field in the system parameters. /// For a new cluster, 'devide_object_prefix' is set to True. /// For an old cluster, 'devide_object_prefix' is set to False. /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility. diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 1bca47ef3979..5aefff820891 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -225,7 +225,7 @@ async fn compaction_test( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: state_store_metrics.clone(), - devide_object_prefix: system_params.is_new_cluster(), + devide_object_prefix: system_params.use_new_object_prefix_strategy(), })); let store = HummockStorage::new( From 7acef063da0edb6cead3cd3eb4b23b6204616d85 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 28 May 2024 15:14:33 +0800 Subject: [PATCH 07/23] some rename --- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 6 ++-- src/ctl/src/common/hummock_service.rs | 16 +++++------ src/jni_core/src/hummock_iterator.rs | 2 +- src/object_store/src/object/mem.rs | 2 +- src/object_store/src/object/mod.rs | 28 +++++++++---------- .../opendal_engine/opendal_object_store.rs | 4 +-- src/object_store/src/object/prefix.rs | 6 ++-- src/object_store/src/object/s3.rs | 2 +- src/storage/benches/bench_compactor.rs | 2 +- src/storage/benches/bench_multi_builder.rs | 2 +- .../hummock_test/src/bin/replay/main.rs | 2 +- .../src/hummock/iterator/test_utils.rs | 2 +- src/storage/src/hummock/sstable_store.rs | 18 ++++++------ src/storage/src/store_impl.rs | 4 +-- .../src/delete_range_runner.rs | 2 +- 15 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 8cd01ea5f07f..cd4e3b1c85ab 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -59,8 +59,8 @@ pub struct SstDumpArgs { print_table: bool, #[clap(short = 'd')] data_dir: Option, - #[clap(short, long = "devide_object_prefix")] - devide_object_prefix: Option, + #[clap(short, long = "use_new_object_prefix_strategy")] + use_new_object_prefix_strategy: Option, } pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { @@ -112,7 +112,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result // Object information is retrieved from object store. Meta service is not required. let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; let sstable_store = hummock_service_opts - .create_sstable_store(args.devide_object_prefix.unwrap_or(true)) + .create_sstable_store(args.use_new_object_prefix_strategy.unwrap_or(true)) .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 4410dda61d81..68ef012ca661 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -35,7 +35,7 @@ pub struct HummockServiceOpts { pub hummock_url: String, pub data_dir: Option, - devide_object_prefix: bool, + use_new_object_prefix_strategy: bool, heartbeat_handle: Option>, heartbeat_shutdown_sender: Option>, @@ -81,9 +81,9 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; - let devide_object_prefix = match env::var("RW_OBJECT_STORE_DEVIDE_OBJECT_PREFIX") { - Ok(devide_object_prefix) => devide_object_prefix == "true", - _ => true, + let use_new_object_prefix_strategy = match env::var("RW_USE_NEW_OBJECT_PREFIX_STRATEGY") { + Ok(use_new_object_prefix_strategy) => use_new_object_prefix_strategy == "true", + _ => false, }; Ok(Self { @@ -91,7 +91,7 @@ impl HummockServiceOpts { data_dir, heartbeat_handle: None, heartbeat_shutdown_sender: None, - devide_object_prefix, + use_new_object_prefix_strategy, }) } @@ -149,7 +149,7 @@ impl HummockServiceOpts { metrics.storage_metrics.clone(), metrics.compactor_metrics.clone(), None, - self.devide_object_prefix, + self.use_new_object_prefix_strategy, ) .await?; @@ -167,7 +167,7 @@ impl HummockServiceOpts { pub async fn create_sstable_store( &self, - devide_object_prefix: bool, + use_new_object_prefix_strategy: bool, ) -> Result> { let object_store = build_remote_object_store( self.hummock_url.strip_prefix("hummock+").unwrap(), @@ -196,7 +196,7 @@ impl HummockServiceOpts { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), - devide_object_prefix, + use_new_object_prefix_strategy, }))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index bbc7a862ffcc..93b6eb5a7581 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -94,7 +94,7 @@ impl HummockJavaBindingIterator { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), - devide_object_prefix: false, + use_new_object_prefix_strategy: false, })); let reader = HummockVersionReader::new( sstable_store, diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 12dd5b246e26..f60459c33758 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -101,7 +101,7 @@ pub struct InMemObjectStore { #[async_trait::async_trait] impl ObjectStore for InMemObjectStore { - fn get_object_prefix(&self, _obj_id: u64, _devide_object_prefix: bool) -> String { + fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { String::default() } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index b28f7ff049c5..2b618e42f968 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -89,7 +89,7 @@ pub trait StreamingUploader: Send { #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { /// Get the key prefix for object, the prefix is determined by the type of object store and `devise_object_prefix`. - fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String; + fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String; /// Uploads the object to `ObjectStore`. async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>; @@ -252,24 +252,24 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, list, dispatch_async, prefix) } - pub fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String { + pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore // trait. Macro object_store_impl_method_body routes to local or remote only depending on // the path match self { - ObjectStoreImpl::InMem(store) => { - store.inner.get_object_prefix(obj_id, devide_object_prefix) - } - ObjectStoreImpl::Opendal(store) => { - store.inner.get_object_prefix(obj_id, devide_object_prefix) - } - ObjectStoreImpl::S3(store) => { - store.inner.get_object_prefix(obj_id, devide_object_prefix) - } + ObjectStoreImpl::InMem(store) => store + .inner + .get_object_prefix(obj_id, use_new_object_prefix_strategy), + ObjectStoreImpl::Opendal(store) => store + .inner + .get_object_prefix(obj_id, use_new_object_prefix_strategy), + ObjectStoreImpl::S3(store) => store + .inner + .get_object_prefix(obj_id, use_new_object_prefix_strategy), #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => { - store.inner.get_object_prefix(obj_id, devide_object_prefix) - } + ObjectStoreImpl::Sim(store) => store + .inner + .get_object_prefix(obj_id, use_new_object_prefix_strategy), } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index e8b25ff48573..39752e89fee1 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -63,7 +63,7 @@ impl OpendalObjectStore { #[async_trait::async_trait] impl ObjectStore for OpendalObjectStore { - fn get_object_prefix(&self, obj_id: u64, devide_object_prefix: bool) -> String { + fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), EngineType::Minio => prefix::s3::get_object_prefix(obj_id), @@ -75,7 +75,7 @@ impl ObjectStore for OpendalObjectStore { | EngineType::Webhdfs | EngineType::Azblob | EngineType::Fs => { - prefix::opendal_engine::get_object_prefix(obj_id, devide_object_prefix) + prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy) } } } diff --git a/src/object_store/src/object/prefix.rs b/src/object_store/src/object/prefix.rs index ba0f5aa039cb..5229d900a8b6 100644 --- a/src/object_store/src/object/prefix.rs +++ b/src/object_store/src/object/prefix.rs @@ -28,11 +28,11 @@ pub(crate) mod opendal_engine { /// The number of Azblob bucket prefixes pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256; - pub(crate) fn get_object_prefix(obj_id: u64, devide_object_prefix: bool) -> String { + pub(crate) fn get_object_prefix(obj_id: u64, use_new_object_prefix_strategy: bool) -> String { // For OpenDAL object storage, whether objects are divided by prefixes depends on whether it is a new cluster: - // If it is a new cluster, objects will be divided into NUM_BUCKET_PREFIXES_AZBLOB prefixes. + // If it is a new cluster, objects will be divided into `NUM_BUCKET_PREFIXES_AZBLOB` prefixes. // If it is an old cluster, prefixes are not used due to the need to read and write old data. - match devide_object_prefix { + match use_new_object_prefix_strategy { true => { let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; let mut obj_prefix = prefix.to_string(); diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 2d2aad4c6b2f..e421c717e676 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -314,7 +314,7 @@ pub struct S3ObjectStore { #[async_trait::async_trait] impl ObjectStore for S3ObjectStore { - fn get_object_prefix(&self, obj_id: u64, _devide_object_prefix: bool) -> String { + fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. // Using aws s3 sdk as object storage, the object prefix will be devised by default. prefix::s3::get_object_prefix(obj_id) diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index f944aad4c4a7..02635397d569 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -73,7 +73,7 @@ pub fn mock_sstable_store() -> SstableStoreRef { meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), - devide_object_prefix: false, + use_new_object_prefix_strategy: false, })) } diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index 89c63718a29a..b519611abe15 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -156,7 +156,7 @@ fn bench_builder( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), - devide_object_prefix: false, + use_new_object_prefix_strategy: false, })); let mut group = c.benchmark_group("bench_multi_builder"); diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index e3cde61feb4e..c43b8901da67 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -126,7 +126,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result SstableSto meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), - devide_object_prefix: false, + use_new_object_prefix_strategy: false, })) } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 8ea7664533c7..e094c6708bae 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -202,7 +202,7 @@ pub struct SstableStoreConfig { pub meta_file_cache: FileCache, pub recent_filter: Option>>, pub state_store_metrics: Arc, - pub devide_object_prefix: bool, + pub use_new_object_prefix_strategy: bool, } pub struct SstableStore { @@ -225,11 +225,11 @@ pub struct SstableStore { /// 1. The specific object store type. /// 2. Whether the existing cluster is a new cluster. /// - /// The value of 'devide_object_prefix' is determined by the 'use_new_object_prefix_strategy' field in the system parameters. - /// For a new cluster, 'devide_object_prefix' is set to True. - /// For an old cluster, 'devide_object_prefix' is set to False. + /// The value of 'use_new_object_prefix_strategy' is determined by the 'use_new_object_prefix_strategy' field in the system parameters. + /// For a new cluster, 'use_new_object_prefix_strategy' is set to True. + /// For an old cluster, 'use_new_object_prefix_strategy' is set to False. /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility. - devide_object_prefix: bool, + use_new_object_prefix_strategy: bool, } impl SstableStore { @@ -296,7 +296,7 @@ impl SstableStore { prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)), prefetch_buffer_capacity: config.prefetch_buffer_capacity, max_prefetch_block_number: config.max_prefetch_block_number, - devide_object_prefix: config.devide_object_prefix, + use_new_object_prefix_strategy: config.use_new_object_prefix_strategy, } } @@ -307,7 +307,7 @@ impl SstableStore { path: String, block_cache_capacity: usize, meta_cache_capacity: usize, - devide_object_prefix: bool, + use_new_object_prefix_strategy: bool, ) -> Self { let meta_cache = Arc::new(Cache::lru(LruCacheConfig { capacity: meta_cache_capacity, @@ -340,7 +340,7 @@ impl SstableStore { prefetch_buffer_capacity: block_cache_capacity, max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */ recent_filter: None, - devide_object_prefix, + use_new_object_prefix_strategy, } } @@ -636,7 +636,7 @@ impl SstableStore { pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String { let obj_prefix = self .store - .get_object_prefix(object_id, self.devide_object_prefix); + .get_object_prefix(object_id, self.use_new_object_prefix_strategy); format!( "{}/{}{}.{}", self.path, obj_prefix, object_id, OBJECT_SUFFIX diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 234df17eca00..2bd2c9a0fb86 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -565,7 +565,7 @@ impl StateStoreImpl { storage_metrics: Arc, compactor_metrics: Arc, await_tree_config: Option, - devide_object_prefix: bool, + use_new_object_prefix_strategy: bool, ) -> StorageResult { set_foyer_metrics_registry(GLOBAL_METRICS_REGISTRY.clone()); @@ -663,7 +663,7 @@ impl StateStoreImpl { meta_file_cache, recent_filter, state_store_metrics: state_store_metrics.clone(), - devide_object_prefix, + use_new_object_prefix_strategy, })); let notification_client = RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 5aefff820891..9171486229e4 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -225,7 +225,7 @@ async fn compaction_test( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: state_store_metrics.clone(), - devide_object_prefix: system_params.use_new_object_prefix_strategy(), + use_new_object_prefix_strategy: system_params.use_new_object_prefix_strategy(), })); let store = HummockStorage::new( From b71f9b2a10d5c1a27db21f9edefc6edf0d46f620 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 28 May 2024 15:40:09 +0800 Subject: [PATCH 08/23] make clippy happy --- src/object_store/src/object/sim/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 31f34cbcd326..8b68a79a3825 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -115,7 +115,7 @@ pub struct SimObjectStore { #[async_trait::async_trait] impl ObjectStore for SimObjectStore { - fn get_object_prefix(&self, _obj_id: u64) -> String { + fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { String::default() } From a7dcb7130c99c9603aecca83a4f75880d908785c Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 28 May 2024 16:10:36 +0800 Subject: [PATCH 09/23] typo fix --- src/common/src/system_param/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 8bd306190189..cbbcbf139240 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -87,7 +87,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, - { use_new_object_prefix_strategy, bool, Some(false), true, "Whether to devide object prefix.", }, + { use_new_object_prefix_strategy, bool, Some(false), true, "Whether to split object prefix.", }, } }; } From dcdea1b726998e2ee5db9098f1ff29eee54a4367 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 28 May 2024 18:16:43 +0800 Subject: [PATCH 10/23] fix ut --- src/common/src/system_param/mod.rs | 1 + src/config/docs.md | 1 + src/config/example.toml | 1 + 3 files changed, 3 insertions(+) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index cbbcbf139240..226637d60c71 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -442,6 +442,7 @@ mod tests { (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), (ENABLE_TRACING_KEY, "true"), + (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "true"), ("a_deprecated_param", "foo"), ]; diff --git a/src/config/docs.md b/src/config/docs.md index aea210f5235a..303be850200e 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -163,3 +163,4 @@ This page is automatically generated by `./risedev generate-example-config` | pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false | | sstable_size_mb | Target size of the Sstable. | 256 | | state_store | URL for the state store | | +| use_new_object_prefix_strategy | Whether to split object prefix. | false | diff --git a/src/config/example.toml b/src/config/example.toml index fc70258788bb..b8b3c2a281df 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -239,3 +239,4 @@ bloom_false_positive = 0.001 max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false enable_tracing = false +use_new_object_prefix_strategy = false From c332de6571fdeef623955bf1982190823ab96c8f Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 28 May 2024 19:16:37 +0800 Subject: [PATCH 11/23] fix e2e --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 2a130de04c19..00a6d35b2e64 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -14,6 +14,7 @@ postmaster checkpoint_frequency postmaster enable_tracing postmaster max_concurrent_creating_streaming_jobs postmaster pause_on_next_bootstrap +postmaster use_new_object_prefix_strategy user application_name user background_ddl user batch_enable_distributed_dml From c63dc0946fe84780151ffc4d8cc209eed4951264 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 3 Jun 2024 15:38:39 +0800 Subject: [PATCH 12/23] resolve some comments --- e2e_test/batch/catalog/pg_settings.slt.part | 2 +- proto/java_binding.proto | 1 + src/common/src/system_param/mod.rs | 4 ++-- src/jni_core/src/hummock_iterator.rs | 2 +- src/object_store/src/object/s3.rs | 2 +- src/storage/benches/bench_compactor.rs | 2 +- src/storage/hummock_test/src/bin/replay/main.rs | 5 ++++- src/storage/src/hummock/iterator/test_utils.rs | 2 +- 8 files changed, 12 insertions(+), 8 deletions(-) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 00a6d35b2e64..a3fd0edc6522 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -7,6 +7,7 @@ internal data_directory internal parallel_compact_size_mb internal sstable_size_mb internal state_store +internal use_new_object_prefix_strategy postmaster backup_storage_directory postmaster backup_storage_url postmaster barrier_interval_ms @@ -14,7 +15,6 @@ postmaster checkpoint_frequency postmaster enable_tracing postmaster max_concurrent_creating_streaming_jobs postmaster pause_on_next_bootstrap -postmaster use_new_object_prefix_strategy user application_name user background_ddl user batch_enable_distributed_dml diff --git a/proto/java_binding.proto b/proto/java_binding.proto index 32ed2f5df199..72558438d176 100644 --- a/proto/java_binding.proto +++ b/proto/java_binding.proto @@ -34,4 +34,5 @@ message ReadPlan { catalog.Table table_catalog = 7; repeated uint32 vnode_ids = 8; + bool use_new_object_prefix_strategy = 9; } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 226637d60c71..f9bb38a6e0f7 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -87,7 +87,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, - { use_new_object_prefix_strategy, bool, Some(false), true, "Whether to split object prefix.", }, + { use_new_object_prefix_strategy, bool, Some(false), false, "Whether to split object prefix.", }, } }; } @@ -442,7 +442,7 @@ mod tests { (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), (ENABLE_TRACING_KEY, "true"), - (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "true"), + (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"), ("a_deprecated_param", "foo"), ]; diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 4752ca7ab156..2d26e7ea7195 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -105,7 +105,7 @@ impl HummockJavaBindingIterator { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), - use_new_object_prefix_strategy: false, + use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy, meta_cache_v2, block_cache_v2, })); diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 4d1849eecf91..f1f569cb7d36 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -404,7 +404,7 @@ impl ObjectStore for S3ObjectStore { fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. - // Using aws s3 sdk as object storage, the object prefix will be devised by default. + // Using aws s3 sdk as object storage, the object prefix will be divided by default. prefix::s3::get_object_prefix(obj_id) } diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 9bdc36738d80..3f2767e2efd5 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -80,7 +80,7 @@ pub async fn mock_sstable_store() -> SstableStoreRef { max_prefetch_block_number: 16, recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), - use_new_object_prefix_strategy: false, + use_new_object_prefix_strategy: true, meta_cache_v2, block_cache_v2, diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index cf14a6b8318e..4283fd6a383a 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -62,6 +62,9 @@ struct Args { #[arg(short, long)] object_storage: String, + + #[arg(short, long)] + use_new_object_prefix_strategy: bool, } #[tokio::main(flavor = "multi_thread")] @@ -132,7 +135,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result Ssta recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), - use_new_object_prefix_strategy: false, + use_new_object_prefix_strategy: true, meta_cache_v2, block_cache_v2, From 260fac6deeccc8d3f63a45f3d83d14bb87cc1f7c Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 3 Jun 2024 17:09:31 +0800 Subject: [PATCH 13/23] resolve some comments --- src/meta/model_v2/migration/src/lib.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 43 +++---------------- src/meta/src/manager/env.rs | 17 +++++++- src/meta/src/manager/system_param/mod.rs | 6 ++- src/object_store/src/object/mod.rs | 21 ++------- src/storage/benches/bench_multi_builder.rs | 2 +- .../src/compaction_test_runner.rs | 2 +- 7 files changed, 33 insertions(+), 60 deletions(-) diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 74968d2e3a11..c6c8d14fa58c 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -1,7 +1,7 @@ #![allow(clippy::enum_variant_names)] pub use sea_orm_migration::prelude::*; - +pub use sea_orm_migration::MigrationStatus; mod m20230908_072257_init; mod m20231008_020431_hummock; mod m20240304_074901_subscription; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index fdd003df1ac4..b42a9d091071 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -46,7 +46,7 @@ use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::context::ContextInfo; use crate::hummock::manager::gc::DeleteObjectTracker; use crate::hummock::CompactorManagerRef; -use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager, SystemParamsManagerImpl}; +use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager}; use crate::model::{ClusterId, MetadataModel, MetadataModelError}; use crate::rpc::metrics::MetaMetrics; @@ -224,39 +224,13 @@ impl HummockManager { // Skip this check in e2e compaction test, which needs to start a secondary cluster with // same bucket if !deterministic_mode { - let use_new_object_prefix_strategy = write_exclusive_cluster_id( + write_exclusive_cluster_id( state_store_dir, env.cluster_id().clone(), object_store.clone(), ) .await?; - if use_new_object_prefix_strategy { - match env.system_params_manager_impl_ref() { - SystemParamsManagerImpl::Kv(mgr) => { - mgr.set_param("use_new_object_prefix_strategy", Some("true".to_owned())) - .await - .unwrap(); - } - SystemParamsManagerImpl::Sql(mgr) => { - mgr.set_param("use_new_object_prefix_strategy", Some("true".to_owned())) - .await - .unwrap(); - } - }; - } else { - match env.system_params_manager_impl_ref() { - SystemParamsManagerImpl::Kv(mgr) => { - mgr.set_param("use_new_object_prefix_strategy", Some("false".to_owned())) - .await - .unwrap(); - } - SystemParamsManagerImpl::Sql(mgr) => { - mgr.set_param("use_new_object_prefix_strategy", Some("false".to_owned())) - .await - .unwrap(); - } - }; - } + // config bucket lifecycle for new cluster. if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref() && !env.opts.do_not_config_object_storage_lifecycle @@ -587,16 +561,11 @@ impl HummockManager { } } -/// This function, `write_exclusive_cluster_id`, is used to check if it is a new cluster during meta startup: -/// For new clusters, the name of the object store needs to be prefixed according to the object id. -/// For old clusters, the prefix is ​​not divided for the sake of compatibility. -/// -/// The return value of this function represents whether to adopt the new object prefix strategy. async fn write_exclusive_cluster_id( state_store_dir: &str, cluster_id: ClusterId, object_store: ObjectStoreRef, -) -> Result { +) -> Result<()> { const CLUSTER_ID_DIR: &str = "cluster_id"; const CLUSTER_ID_NAME: &str = "0"; let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR); @@ -605,7 +574,7 @@ async fn write_exclusive_cluster_id( Ok(stored_cluster_id) => { let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap(); if cluster_id.deref() == stored_cluster_id { - return Ok(false); + return Ok(()); } Err(ObjectError::internal(format!( @@ -619,7 +588,7 @@ async fn write_exclusive_cluster_id( object_store .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id))) .await?; - return Ok(true); + return Ok(()); } Err(e.into()) } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index a8e2909a11bf..6b80aada9f38 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -20,6 +20,7 @@ use risingwave_common::config::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; @@ -416,11 +417,25 @@ impl MetaSrvEnv { .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); + let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; + let mut cluster_first_launch = true; + // If `m20230908_072257_init` has been applied, it is the old cluster + for migration in migrations { + if migration.name() == "m20230908_072257_init" + && migration.status() == MigrationStatus::Applied + { + cluster_first_launch = false; + } + } + let mut system_params = init_system_params; + // For new clusters, the name of the object store needs to be prefixed according to the object id. + // For old clusters, the prefix is ​​not divided for the sake of compatibility. + system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), notification_manager.clone(), - init_system_params, + system_params, ) .await?, ); diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 0030d3c2d8f8..c021ba5e4a47 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -56,7 +56,7 @@ impl SystemParamsManager { init_params: SystemParams, cluster_first_launch: bool, ) -> MetaResult { - let params = if cluster_first_launch { + let mut params = if cluster_first_launch { init_params } else if let Some(persisted) = SystemParams::get(&meta_store).await? { merge_params(persisted, init_params) @@ -66,6 +66,10 @@ impl SystemParamsManager { )); }; + // For new clusters, the name of the object store needs to be prefixed according to the object id. + // For old clusters, the prefix is ​​not divided for the sake of compatibility. + params.use_new_object_prefix_strategy = Some(cluster_first_launch); + info!("system parameters: {:?}", params); check_missing_params(¶ms).map_err(|e| anyhow!(e))?; diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index ab47f050a26b..fb678416f264 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -326,24 +326,9 @@ impl ObjectStoreImpl { } pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { - // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore - // trait. Macro object_store_impl_method_body routes to local or remote only depending on - // the path - match self { - ObjectStoreImpl::InMem(store) => store - .inner - .get_object_prefix(obj_id, use_new_object_prefix_strategy), - ObjectStoreImpl::Opendal(store) => store - .inner - .get_object_prefix(obj_id, use_new_object_prefix_strategy), - ObjectStoreImpl::S3(store) => store - .inner - .get_object_prefix(obj_id, use_new_object_prefix_strategy), - #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => store - .inner - .get_object_prefix(obj_id, use_new_object_prefix_strategy), - } + dispatch_object_store_enum!(self, |store| store + .inner + .get_object_prefix(obj_id, use_new_object_prefix_strategy)) } pub fn support_streaming_upload(&self) -> bool { diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index ab3b007a531b..66b8a24877e2 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -153,7 +153,7 @@ async fn generate_sstable_store(object_store: Arc) -> Arc Date: Mon, 3 Jun 2024 18:59:17 +0800 Subject: [PATCH 14/23] judge is new cluster before Migrator::up and fix all ctl command --- src/ctl/src/cmd_impl/bench.rs | 8 ++- src/ctl/src/cmd_impl/hummock/list_kv.rs | 6 ++- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 16 ++++-- .../src/cmd_impl/hummock/validate_version.rs | 8 ++- src/ctl/src/cmd_impl/table/scan.rs | 24 +++++++-- src/ctl/src/common/hummock_service.rs | 9 ++-- src/ctl/src/lib.rs | 53 ++++++++++++++++--- src/meta/node/src/server.rs | 26 ++++++++- src/meta/src/manager/env.rs | 18 +++---- 9 files changed, 130 insertions(+), 38 deletions(-) diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index d3c0cde6d20e..dce4a21115d6 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -42,6 +42,8 @@ pub enum BenchCommands { #[clap(long, default_value_t = 1)] threads: usize, data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, } @@ -86,9 +88,13 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { mv_name, threads, data_dir, + use_new_object_prefix_strategy, } => { let (hummock, metrics) = context - .hummock_store_with_metrics(HummockServiceOpts::from_env(data_dir)?) + .hummock_store_with_metrics(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog(meta.clone(), mv_name).await?; let mut handlers = vec![]; diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 2eb54362b413..f90712a02505 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -27,9 +27,13 @@ pub async fn list_kv( epoch: u64, table_id: u32, data_dir: Option, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; if is_max_epoch(epoch) { tracing::info!("using MAX EPOCH as epoch"); diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index cd4e3b1c85ab..ce69ab87f1b7 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -59,8 +59,8 @@ pub struct SstDumpArgs { print_table: bool, #[clap(short = 'd')] data_dir: Option, - #[clap(short, long = "use_new_object_prefix_strategy")] - use_new_object_prefix_strategy: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, } pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { @@ -74,7 +74,10 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result if args.print_level { // Level information is retrieved from meta service let hummock = context - .hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?) + .hummock_store(HummockServiceOpts::from_env( + args.data_dir.clone(), + args.use_new_object_prefix_strategy, + )?) .await?; let version = hummock.inner().get_pinned_version().version().clone(); let sstable_store = hummock.sstable_store(); @@ -110,9 +113,12 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result } } else { // Object information is retrieved from object store. Meta service is not required. - let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; + let hummock_service_opts = HummockServiceOpts::from_env( + args.data_dir.clone(), + args.use_new_object_prefix_strategy, + )?; let sstable_store = hummock_service_opts - .create_sstable_store(args.use_new_object_prefix_strategy.unwrap_or(true)) + .create_sstable_store(args.use_new_object_prefix_strategy) .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index b2ae1c22f66c..e8f61a8c9835 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -65,6 +65,7 @@ pub async fn print_user_key_in_archive( archive_ids: Vec, data_dir: String, user_key: String, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| { panic!("cannot decode user key {} into raw bytes", user_key); @@ -72,7 +73,8 @@ pub async fn print_user_key_in_archive( let user_key = UserKey::decode(&user_key_bytes); println!("user key: {user_key:?}"); - let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?; + let hummock_opts = + HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?; let hummock = context.hummock_store(hummock_opts).await?; let sstable_store = hummock.sstable_store(); let archive_object_store = sstable_store.store(); @@ -178,8 +180,10 @@ pub async fn print_version_delta_in_archive( archive_ids: Vec, data_dir: String, sst_id: HummockSstableObjectId, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { - let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?; + let hummock_opts = + HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?; let hummock = context.hummock_store(hummock_opts).await?; let sstable_store = hummock.sstable_store(); let archive_object_store = sstable_store.store(); diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 8c21d975009f..0689e315f74c 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -86,19 +86,35 @@ pub fn make_storage_table( )) } -pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option) -> Result<()> { +pub async fn scan( + context: &CtlContext, + mv_name: String, + data_dir: Option, + use_new_object_prefix_strategy: bool, +) -> Result<()> { let meta_client = context.meta_client().await?; let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog(meta_client, mv_name).await?; do_scan(table, hummock).await } -pub async fn scan_id(context: &CtlContext, table_id: u32, data_dir: Option) -> Result<()> { +pub async fn scan_id( + context: &CtlContext, + table_id: u32, + data_dir: Option, + use_new_object_prefix_strategy: bool, +) -> Result<()> { let meta_client = context.meta_client().await?; let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog_by_id(meta_client, table_id).await?; do_scan(table, hummock).await diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 6724be46a347..0d42b77c1361 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -57,7 +57,10 @@ impl HummockServiceOpts { /// Currently, we will read these variables for meta: /// /// * `RW_HUMMOCK_URL`: hummock store address - pub fn from_env(data_dir: Option) -> Result { + pub fn from_env( + data_dir: Option, + use_new_object_prefix_strategy: bool, + ) -> Result { let hummock_url = match env::var("RW_HUMMOCK_URL") { Ok(url) => { if !url.starts_with("hummock+") { @@ -82,10 +85,6 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; - let use_new_object_prefix_strategy = match env::var("RW_USE_NEW_OBJECT_PREFIX_STRATEGY") { - Ok(use_new_object_prefix_strategy) => use_new_object_prefix_strategy == "true", - _ => false, - }; Ok(Self { hummock_url, diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index a1aaa8f48c5f..635912ede3dc 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -192,6 +192,9 @@ enum HummockCommands { // data directory for hummock state store. None: use default data_dir: Option, + + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, SstDump(SstDumpArgs), /// trigger a targeted compaction through `compaction_group_id` @@ -295,6 +298,8 @@ enum HummockCommands { /// KVs that are matched with the user key are printed. #[clap(long)] user_key: String, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, PrintVersionDeltaInArchive { /// The ident of the archive file in object store. It's also the first Hummock version id of this archive. @@ -306,6 +311,8 @@ enum HummockCommands { /// Version deltas that are related to the SST id are printed. #[clap(long)] sst_id: u64, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, } @@ -317,6 +324,9 @@ enum TableCommands { mv_name: String, // data directory for hummock state store. None: use default data_dir: Option, + + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, /// scan a state table using Id ScanById { @@ -324,6 +334,8 @@ enum TableCommands { table_id: u32, // data directory for hummock state store. None: use default data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, /// list all state tables List, @@ -633,8 +645,16 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { epoch, table_id, data_dir, + use_new_object_prefix_strategy, }) => { - cmd_impl::hummock::list_kv(context, epoch, table_id, data_dir).await?; + cmd_impl::hummock::list_kv( + context, + epoch, + table_id, + data_dir, + use_new_object_prefix_strategy, + ) + .await?; } Commands::Hummock(HummockCommands::SstDump(args)) => { cmd_impl::hummock::sst_dump(context, args).await.unwrap() @@ -744,12 +764,14 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { archive_ids, data_dir, sst_id, + use_new_object_prefix_strategy, }) => { cmd_impl::hummock::print_version_delta_in_archive( context, archive_ids, data_dir, sst_id, + use_new_object_prefix_strategy, ) .await?; } @@ -757,15 +779,32 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { archive_ids, data_dir, user_key, + use_new_object_prefix_strategy, }) => { - cmd_impl::hummock::print_user_key_in_archive(context, archive_ids, data_dir, user_key) - .await?; + cmd_impl::hummock::print_user_key_in_archive( + context, + archive_ids, + data_dir, + user_key, + use_new_object_prefix_strategy, + ) + .await?; } - Commands::Table(TableCommands::Scan { mv_name, data_dir }) => { - cmd_impl::table::scan(context, mv_name, data_dir).await? + Commands::Table(TableCommands::Scan { + mv_name, + data_dir, + use_new_object_prefix_strategy, + }) => { + cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy) + .await? } - Commands::Table(TableCommands::ScanById { table_id, data_dir }) => { - cmd_impl::table::scan_id(context, table_id, data_dir).await? + Commands::Table(TableCommands::ScanById { + table_id, + data_dir, + use_new_object_prefix_strategy, + }) => { + cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy) + .await? } Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?, Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?, diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bf5bb72ed731..bfa3a5a52ba8 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -36,7 +36,7 @@ use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; -use risingwave_meta_model_migration::{Migrator, MigratorTrait}; +use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; @@ -391,6 +391,26 @@ pub async fn start_service_as_election_follower( server.await; } +/// This function `is_first_launch_for_sql_backend_cluster` is used to check whether the cluster, which uses SQL as the backend, is a new cluster. +/// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied, +/// then it is considered an old cluster. +/// +/// Note: this check should be performed before `Migrator::up()`. +pub async fn is_first_launch_for_sql_backend_cluster( + sql_meta_store: &SqlMetaStore, +) -> MetaResult { + let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; + let mut cluster_first_launch = true; + for migration in migrations { + if migration.name() == "m20230908_072257_init" + && migration.status() == MigrationStatus::Applied + { + cluster_first_launch = false; + } + } + Ok(cluster_first_launch) +} + /// Starts all services needed for the meta leader node /// Only call this function once, since initializing the services multiple times will result in an /// inconsistent state @@ -408,7 +428,10 @@ pub async fn start_service_as_election_leader( mut svc_shutdown_rx: WatchReceiver<()>, ) -> MetaResult<()> { tracing::info!("Defining leader services"); + let mut is_sql_backend_cluster_first_launch = true; if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { + is_sql_backend_cluster_first_launch = + is_first_launch_for_sql_backend_cluster(sql_store).await?; // Try to upgrade if any new model changes are added. Migrator::up(&sql_store.conn, None) .await @@ -420,6 +443,7 @@ pub async fn start_service_as_election_leader( init_system_params, init_session_config, meta_store_impl, + is_sql_backend_cluster_first_launch, ) .await?; let system_params_reader = env.system_params_reader().await; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 6b80aada9f38..5da6d7f8b4d7 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -20,7 +20,6 @@ use risingwave_common::config::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; -use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; @@ -351,6 +350,7 @@ impl MetaSrvEnv { init_system_params: SystemParams, init_session_config: SessionConfig, meta_store_impl: MetaStoreImpl, + is_sql_backend_cluster_first_launch: bool, ) -> MetaResult { let notification_manager = Arc::new(NotificationManager::new(meta_store_impl.clone()).await); @@ -417,20 +417,12 @@ impl MetaSrvEnv { .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); - let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; - let mut cluster_first_launch = true; - // If `m20230908_072257_init` has been applied, it is the old cluster - for migration in migrations { - if migration.name() == "m20230908_072257_init" - && migration.status() == MigrationStatus::Applied - { - cluster_first_launch = false; - } - } + let mut system_params = init_system_params; // For new clusters, the name of the object store needs to be prefixed according to the object id. // For old clusters, the prefix is ​​not divided for the sake of compatibility. - system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); + system_params.use_new_object_prefix_strategy = + Some(is_sql_backend_cluster_first_launch); let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), @@ -548,6 +540,7 @@ impl MetaSrvEnv { risingwave_common::system_param::system_params_for_test(), Default::default(), MetaStoreImpl::Sql(SqlMetaStore::for_test().await), + true, ) .await .unwrap() @@ -561,6 +554,7 @@ impl MetaSrvEnv { risingwave_common::system_param::system_params_for_test(), Default::default(), MetaStoreImpl::Kv(MemStore::default().into_ref()), + true, ) .await .unwrap() From ac1b97dce5f2fb51e08f06746480169e30061a08 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 3 Jun 2024 20:43:10 +0800 Subject: [PATCH 15/23] fix ut --- src/common/src/system_param/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index f9bb38a6e0f7..3c003f48bbad 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -377,6 +377,7 @@ macro_rules! impl_system_params_for_test { ret.state_store = Some("hummock+memory".to_string()); ret.backup_storage_url = Some("memory".into()); ret.backup_storage_directory = Some("backup".into()); + ret.use_new_object_prefix_strategy = Some(true); ret } }; From 7c5e8c3263837239ed6c11f66337e5ab9dc01992 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 4 Jun 2024 14:29:33 +0800 Subject: [PATCH 16/23] empty commit for retry From 1f2dfe332c4918f2f25d8199c88b29fa460cbcdf Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 12 Jun 2024 16:28:57 +0800 Subject: [PATCH 17/23] resolve some comment --- src/common/src/system_param/mod.rs | 2 +- src/common/src/system_param/reader.rs | 5 ++++- src/config/example.toml | 1 - src/meta/src/backup_restore/restore.rs | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 3c003f48bbad..84ed4b8a5a32 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -87,7 +87,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, - { use_new_object_prefix_strategy, bool, Some(false), false, "Whether to split object prefix.", }, + { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", }, } }; } diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index a55b5b33df1b..982ef51740dc 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -138,7 +138,10 @@ where } fn use_new_object_prefix_strategy(&self) -> bool { - self.inner().use_new_object_prefix_strategy.unwrap_or(true) + *self.inner() + .use_new_object_prefix_strategy + .as_ref() + .unwrap() } fn backup_storage_url(&self) -> &str { diff --git a/src/config/example.toml b/src/config/example.toml index b8b3c2a281df..fc70258788bb 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -239,4 +239,3 @@ bloom_false_positive = 0.001 max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false enable_tracing = false -use_new_object_prefix_strategy = false diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index f94737a1bd49..7ef948c30da6 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -246,6 +246,7 @@ mod tests { SystemParams { state_store: Some("state_store".into()), data_directory: Some("data_directory".into()), + use_new_object_prefix_strategy: Some(true), backup_storage_url: Some("backup_storage_url".into()), backup_storage_directory: Some("backup_storage_directory".into()), ..SystemConfig::default().into_init_system_params() From eb628fcd368cf8b8b2a9557dad65a373c95ba87c Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 12 Jun 2024 17:10:41 +0800 Subject: [PATCH 18/23] minor --- src/config/docs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config/docs.md b/src/config/docs.md index 303be850200e..cc5c039f5946 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -163,4 +163,4 @@ This page is automatically generated by `./risedev generate-example-config` | pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false | | sstable_size_mb | Target size of the Sstable. | 256 | | state_store | URL for the state store | | -| use_new_object_prefix_strategy | Whether to split object prefix. | false | +| use_new_object_prefix_strategy | Whether to split object prefix. | | From 04201449d1293cf638808d4a4a749c8c0d7099b3 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 12 Jun 2024 19:56:32 +0800 Subject: [PATCH 19/23] fmrt --- src/common/src/system_param/reader.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 982ef51740dc..9a2c6e49534a 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -138,7 +138,8 @@ where } fn use_new_object_prefix_strategy(&self) -> bool { - *self.inner() + *self + .inner() .use_new_object_prefix_strategy .as_ref() .unwrap() From 58581cfd2c50e1924ca0c0de874e0a2ca78dac1b Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 14 Jun 2024 13:14:22 +0800 Subject: [PATCH 20/23] resolve comment --- src/meta/node/src/server.rs | 31 ------------------------------- src/meta/src/manager/env.rs | 33 ++++++++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 34 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bfa3a5a52ba8..aeb2a4bcf5b4 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -36,7 +36,6 @@ use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; -use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; @@ -391,26 +390,6 @@ pub async fn start_service_as_election_follower( server.await; } -/// This function `is_first_launch_for_sql_backend_cluster` is used to check whether the cluster, which uses SQL as the backend, is a new cluster. -/// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied, -/// then it is considered an old cluster. -/// -/// Note: this check should be performed before `Migrator::up()`. -pub async fn is_first_launch_for_sql_backend_cluster( - sql_meta_store: &SqlMetaStore, -) -> MetaResult { - let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; - let mut cluster_first_launch = true; - for migration in migrations { - if migration.name() == "m20230908_072257_init" - && migration.status() == MigrationStatus::Applied - { - cluster_first_launch = false; - } - } - Ok(cluster_first_launch) -} - /// Starts all services needed for the meta leader node /// Only call this function once, since initializing the services multiple times will result in an /// inconsistent state @@ -428,22 +407,12 @@ pub async fn start_service_as_election_leader( mut svc_shutdown_rx: WatchReceiver<()>, ) -> MetaResult<()> { tracing::info!("Defining leader services"); - let mut is_sql_backend_cluster_first_launch = true; - if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { - is_sql_backend_cluster_first_launch = - is_first_launch_for_sql_backend_cluster(sql_store).await?; - // Try to upgrade if any new model changes are added. - Migrator::up(&sql_store.conn, None) - .await - .expect("Failed to upgrade models in meta store"); - } let env = MetaSrvEnv::new( opts.clone(), init_system_params, init_session_config, meta_store_impl, - is_sql_backend_cluster_first_launch, ) .await?; let system_params_reader = env.system_params_reader().await; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 5da6d7f8b4d7..7332b23a4c2e 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -20,6 +20,7 @@ use risingwave_common::config::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; @@ -344,13 +345,32 @@ impl MetaOpts { } } +/// This function `is_first_launch_for_sql_backend_cluster` is used to check whether the cluster, which uses SQL as the backend, is a new cluster. +/// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied, +/// then it is considered an old cluster. +/// +/// Note: this check should be performed before `Migrator::up()`. +pub async fn is_first_launch_for_sql_backend_cluster( + sql_meta_store: &SqlMetaStore, +) -> MetaResult { + let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; + let mut cluster_first_launch = true; + for migration in migrations { + if migration.name() == "m20230908_072257_init" + && migration.status() == MigrationStatus::Applied + { + cluster_first_launch = false; + } + } + Ok(cluster_first_launch) +} + impl MetaSrvEnv { pub async fn new( opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, meta_store_impl: MetaStoreImpl, - is_sql_backend_cluster_first_launch: bool, ) -> MetaResult { let notification_manager = Arc::new(NotificationManager::new(meta_store_impl.clone()).await); @@ -412,6 +432,15 @@ impl MetaSrvEnv { } } MetaStoreImpl::Sql(sql_meta_store) => { + let mut is_sql_backend_cluster_first_launch = true; + if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { + is_sql_backend_cluster_first_launch = + is_first_launch_for_sql_backend_cluster(sql_store).await?; + // Try to upgrade if any new model changes are added. + Migrator::up(&sql_store.conn, None) + .await + .expect("Failed to upgrade models in meta store"); + } let cluster_id = Cluster::find() .one(&sql_meta_store.conn) .await? @@ -540,7 +569,6 @@ impl MetaSrvEnv { risingwave_common::system_param::system_params_for_test(), Default::default(), MetaStoreImpl::Sql(SqlMetaStore::for_test().await), - true, ) .await .unwrap() @@ -554,7 +582,6 @@ impl MetaSrvEnv { risingwave_common::system_param::system_params_for_test(), Default::default(), MetaStoreImpl::Kv(MemStore::default().into_ref()), - true, ) .await .unwrap() From 87ab951b0bb865c4cd5fb1eb69ef3652b21f5e79 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 17 Jun 2024 09:29:14 +0800 Subject: [PATCH 21/23] fix ci --- src/common/src/system_param/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 84ed4b8a5a32..c8382d35fee8 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -377,7 +377,7 @@ macro_rules! impl_system_params_for_test { ret.state_store = Some("hummock+memory".to_string()); ret.backup_storage_url = Some("memory".into()); ret.backup_storage_directory = Some("backup".into()); - ret.use_new_object_prefix_strategy = Some(true); + ret.use_new_object_prefix_strategy = Some(false); ret } }; From a028c13d467f5850dfae4636732f88abbbec7567 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 17 Jun 2024 14:32:23 +0800 Subject: [PATCH 22/23] fix --- src/meta/node/src/server.rs | 1 - src/meta/src/manager/env.rs | 38 +++++++++++++----------- src/meta/src/manager/system_param/mod.rs | 6 +--- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index aeb2a4bcf5b4..98ac644bdf09 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -407,7 +407,6 @@ pub async fn start_service_as_election_leader( mut svc_shutdown_rx: WatchReceiver<()>, ) -> MetaResult<()> { tracing::info!("Defining leader services"); - let env = MetaSrvEnv::new( opts.clone(), init_system_params, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 7332b23a4c2e..fec674057b0f 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -354,24 +354,33 @@ pub async fn is_first_launch_for_sql_backend_cluster( sql_meta_store: &SqlMetaStore, ) -> MetaResult { let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; - let mut cluster_first_launch = true; for migration in migrations { if migration.name() == "m20230908_072257_init" && migration.status() == MigrationStatus::Applied { - cluster_first_launch = false; + return Ok(false); } } - Ok(cluster_first_launch) + Ok(true) } impl MetaSrvEnv { pub async fn new( opts: MetaOpts, - init_system_params: SystemParams, + mut init_system_params: SystemParams, init_session_config: SessionConfig, meta_store_impl: MetaStoreImpl, ) -> MetaResult { + let mut is_sql_backend_cluster_first_launch = true; + if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { + is_sql_backend_cluster_first_launch = + is_first_launch_for_sql_backend_cluster(sql_store).await?; + // Try to upgrade if any new model changes are added. + Migrator::up(&sql_store.conn, None) + .await + .expect("Failed to upgrade models in meta store"); + } + let notification_manager = Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); @@ -390,6 +399,10 @@ impl MetaSrvEnv { } else { (ClusterId::new(), true) }; + // For new clusters, the name of the object store needs to be prefixed according to the object id. + // For old clusters, the prefix is ​​not divided for the sake of compatibility. + + init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); let system_params_manager = Arc::new( SystemParamsManager::new( meta_store.clone(), @@ -432,31 +445,20 @@ impl MetaSrvEnv { } } MetaStoreImpl::Sql(sql_meta_store) => { - let mut is_sql_backend_cluster_first_launch = true; - if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { - is_sql_backend_cluster_first_launch = - is_first_launch_for_sql_backend_cluster(sql_store).await?; - // Try to upgrade if any new model changes are added. - Migrator::up(&sql_store.conn, None) - .await - .expect("Failed to upgrade models in meta store"); - } let cluster_id = Cluster::find() .one(&sql_meta_store.conn) .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); - - let mut system_params = init_system_params; + init_system_params.use_new_object_prefix_strategy = + Some(is_sql_backend_cluster_first_launch); // For new clusters, the name of the object store needs to be prefixed according to the object id. // For old clusters, the prefix is ​​not divided for the sake of compatibility. - system_params.use_new_object_prefix_strategy = - Some(is_sql_backend_cluster_first_launch); let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), notification_manager.clone(), - system_params, + init_system_params, ) .await?, ); diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index c021ba5e4a47..0030d3c2d8f8 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -56,7 +56,7 @@ impl SystemParamsManager { init_params: SystemParams, cluster_first_launch: bool, ) -> MetaResult { - let mut params = if cluster_first_launch { + let params = if cluster_first_launch { init_params } else if let Some(persisted) = SystemParams::get(&meta_store).await? { merge_params(persisted, init_params) @@ -66,10 +66,6 @@ impl SystemParamsManager { )); }; - // For new clusters, the name of the object store needs to be prefixed according to the object id. - // For old clusters, the prefix is ​​not divided for the sake of compatibility. - params.use_new_object_prefix_strategy = Some(cluster_first_launch); - info!("system parameters: {:?}", params); check_missing_params(¶ms).map_err(|e| anyhow!(e))?; From ea9befec4a8c84c2e758e82cd9d37d2ca0bd745e Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 17 Jun 2024 15:55:35 +0800 Subject: [PATCH 23/23] style change --- src/meta/src/manager/env.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index fec674057b0f..d58fa70fa18a 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -371,18 +371,6 @@ impl MetaSrvEnv { init_session_config: SessionConfig, meta_store_impl: MetaStoreImpl, ) -> MetaResult { - let mut is_sql_backend_cluster_first_launch = true; - if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { - is_sql_backend_cluster_first_launch = - is_first_launch_for_sql_backend_cluster(sql_store).await?; - // Try to upgrade if any new model changes are added. - Migrator::up(&sql_store.conn, None) - .await - .expect("Failed to upgrade models in meta store"); - } - - let notification_manager = - Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); let stream_client_pool = Arc::new(StreamClientPool::default()); let event_log_manager = Arc::new(start_event_log_manager( @@ -392,6 +380,8 @@ impl MetaSrvEnv { let env = match &meta_store_impl { MetaStoreImpl::Kv(meta_store) => { + let notification_manager = + Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); let (cluster_id, cluster_first_launch) = if let Some(id) = ClusterId::from_meta_store(meta_store).await? { @@ -399,6 +389,7 @@ impl MetaSrvEnv { } else { (ClusterId::new(), true) }; + // For new clusters, the name of the object store needs to be prefixed according to the object id. // For old clusters, the prefix is ​​not divided for the sake of compatibility. @@ -445,6 +436,15 @@ impl MetaSrvEnv { } } MetaStoreImpl::Sql(sql_meta_store) => { + let is_sql_backend_cluster_first_launch = + is_first_launch_for_sql_backend_cluster(sql_meta_store).await?; + // Try to upgrade if any new model changes are added. + Migrator::up(&sql_meta_store.conn, None) + .await + .expect("Failed to upgrade models in meta store"); + + let notification_manager = + Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let cluster_id = Cluster::find() .one(&sql_meta_store.conn) .await?