diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 2a130de04c19..a3fd0edc6522 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -7,6 +7,7 @@ internal data_directory internal parallel_compact_size_mb internal sstable_size_mb internal state_store +internal use_new_object_prefix_strategy postmaster backup_storage_directory postmaster backup_storage_url postmaster barrier_interval_ms diff --git a/proto/java_binding.proto b/proto/java_binding.proto index 32ed2f5df199..72558438d176 100644 --- a/proto/java_binding.proto +++ b/proto/java_binding.proto @@ -34,4 +34,5 @@ message ReadPlan { catalog.Table table_catalog = 7; repeated uint32 vnode_ids = 8; + bool use_new_object_prefix_strategy = 9; } diff --git a/proto/meta.proto b/proto/meta.proto index 9ad18cb3df7d..4a67dd0455e5 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -602,6 +602,7 @@ message SystemParams { optional bool pause_on_next_bootstrap = 13; optional string wasm_storage_url = 14 [deprecated = true]; optional bool enable_tracing = 15; + optional bool use_new_object_prefix_strategy = 16; } message GetSystemParamsRequest {} diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index f71324cb4e55..c8382d35fee8 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -87,6 +87,7 @@ macro_rules! for_all_params { { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, + { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", }, } }; } @@ -376,6 +377,7 @@ macro_rules! impl_system_params_for_test { ret.state_store = Some("hummock+memory".to_string()); ret.backup_storage_url = Some("memory".into()); ret.backup_storage_directory = Some("backup".into()); + ret.use_new_object_prefix_strategy = Some(false); ret } }; @@ -441,6 +443,7 @@ mod tests { (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), (ENABLE_TRACING_KEY, "true"), + (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"), ("a_deprecated_param", "foo"), ]; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 3374e7212023..9a2c6e49534a 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -137,6 +137,14 @@ where self.inner().data_directory.as_ref().unwrap() } + fn use_new_object_prefix_strategy(&self) -> bool { + *self + .inner() + .use_new_object_prefix_strategy + .as_ref() + .unwrap() + } + fn backup_storage_url(&self) -> &str { self.inner().backup_storage_url.as_ref().unwrap() } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 61f62fbc7e00..a28631a29bea 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -91,7 +91,6 @@ pub async fn compute_node_serve( ) -> (Vec>, Sender<()>) { // Load the configuration. let config = load_config(&opts.config_path, &opts); - info!("Starting compute node",); info!("> config: {:?}", config); info!( @@ -211,6 +210,7 @@ pub async fn compute_node_serve( storage_metrics.clone(), compactor_metrics.clone(), await_tree_config.clone(), + system_params.use_new_object_prefix_strategy(), ) .await .unwrap(); diff --git a/src/config/docs.md b/src/config/docs.md index ab3355926016..59c8961a15be 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -167,3 +167,4 @@ This page is automatically generated by `./risedev generate-example-config` | pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false | | sstable_size_mb | Target size of the Sstable. | 256 | | state_store | URL for the state store | | +| use_new_object_prefix_strategy | Whether to split object prefix. | | diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index d3c0cde6d20e..dce4a21115d6 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -42,6 +42,8 @@ pub enum BenchCommands { #[clap(long, default_value_t = 1)] threads: usize, data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, } @@ -86,9 +88,13 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { mv_name, threads, data_dir, + use_new_object_prefix_strategy, } => { let (hummock, metrics) = context - .hummock_store_with_metrics(HummockServiceOpts::from_env(data_dir)?) + .hummock_store_with_metrics(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog(meta.clone(), mv_name).await?; let mut handlers = vec![]; diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 2eb54362b413..f90712a02505 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -27,9 +27,13 @@ pub async fn list_kv( epoch: u64, table_id: u32, data_dir: Option, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; if is_max_epoch(epoch) { tracing::info!("using MAX EPOCH as epoch"); diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 0fc65054b51e..ce69ab87f1b7 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -59,6 +59,8 @@ pub struct SstDumpArgs { print_table: bool, #[clap(short = 'd')] data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, } pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { @@ -72,7 +74,10 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result if args.print_level { // Level information is retrieved from meta service let hummock = context - .hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?) + .hummock_store(HummockServiceOpts::from_env( + args.data_dir.clone(), + args.use_new_object_prefix_strategy, + )?) .await?; let version = hummock.inner().get_pinned_version().version().clone(); let sstable_store = hummock.sstable_store(); @@ -108,8 +113,13 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result } } else { // Object information is retrieved from object store. Meta service is not required. - let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; - let sstable_store = hummock_service_opts.create_sstable_store().await?; + let hummock_service_opts = HummockServiceOpts::from_env( + args.data_dir.clone(), + args.use_new_object_prefix_strategy, + )?; + let sstable_store = hummock_service_opts + .create_sstable_store(args.use_new_object_prefix_strategy) + .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); let obj_path = sstable_store.get_sst_data_path(*obj_id); diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index b2ae1c22f66c..e8f61a8c9835 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -65,6 +65,7 @@ pub async fn print_user_key_in_archive( archive_ids: Vec, data_dir: String, user_key: String, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| { panic!("cannot decode user key {} into raw bytes", user_key); @@ -72,7 +73,8 @@ pub async fn print_user_key_in_archive( let user_key = UserKey::decode(&user_key_bytes); println!("user key: {user_key:?}"); - let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?; + let hummock_opts = + HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?; let hummock = context.hummock_store(hummock_opts).await?; let sstable_store = hummock.sstable_store(); let archive_object_store = sstable_store.store(); @@ -178,8 +180,10 @@ pub async fn print_version_delta_in_archive( archive_ids: Vec, data_dir: String, sst_id: HummockSstableObjectId, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { - let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?; + let hummock_opts = + HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?; let hummock = context.hummock_store(hummock_opts).await?; let sstable_store = hummock.sstable_store(); let archive_object_store = sstable_store.store(); diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 8c21d975009f..0689e315f74c 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -86,19 +86,35 @@ pub fn make_storage_table( )) } -pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option) -> Result<()> { +pub async fn scan( + context: &CtlContext, + mv_name: String, + data_dir: Option, + use_new_object_prefix_strategy: bool, +) -> Result<()> { let meta_client = context.meta_client().await?; let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog(meta_client, mv_name).await?; do_scan(table, hummock).await } -pub async fn scan_id(context: &CtlContext, table_id: u32, data_dir: Option) -> Result<()> { +pub async fn scan_id( + context: &CtlContext, + table_id: u32, + data_dir: Option, + use_new_object_prefix_strategy: bool, +) -> Result<()> { let meta_client = context.meta_client().await?; let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog_by_id(meta_client, table_id).await?; do_scan(table, hummock).await diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 59d272c3a27f..e885548d5a1e 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -36,6 +36,8 @@ pub struct HummockServiceOpts { pub hummock_url: String, pub data_dir: Option, + use_new_object_prefix_strategy: bool, + heartbeat_handle: Option>, heartbeat_shutdown_sender: Option>, } @@ -55,7 +57,10 @@ impl HummockServiceOpts { /// Currently, we will read these variables for meta: /// /// * `RW_HUMMOCK_URL`: hummock store address - pub fn from_env(data_dir: Option) -> Result { + pub fn from_env( + data_dir: Option, + use_new_object_prefix_strategy: bool, + ) -> Result { let hummock_url = match env::var("RW_HUMMOCK_URL") { Ok(url) => { if !url.starts_with("hummock+") { @@ -80,11 +85,13 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; + Ok(Self { hummock_url, data_dir, heartbeat_handle: None, heartbeat_shutdown_sender: None, + use_new_object_prefix_strategy, }) } @@ -142,6 +149,7 @@ impl HummockServiceOpts { metrics.storage_metrics.clone(), metrics.compactor_metrics.clone(), None, + self.use_new_object_prefix_strategy, ) .await?; @@ -157,7 +165,10 @@ impl HummockServiceOpts { } } - pub async fn create_sstable_store(&self) -> Result> { + pub async fn create_sstable_store( + &self, + use_new_object_prefix_strategy: bool, + ) -> Result> { let object_store = build_remote_object_store( self.hummock_url.strip_prefix("hummock+").unwrap(), Arc::new(ObjectStoreMetrics::unused()), @@ -190,6 +201,7 @@ impl HummockServiceOpts { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), + use_new_object_prefix_strategy, meta_cache, block_cache, }))) diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 1f50250276d6..002817993d85 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -193,6 +193,9 @@ enum HummockCommands { // data directory for hummock state store. None: use default data_dir: Option, + + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, SstDump(SstDumpArgs), /// trigger a targeted compaction through `compaction_group_id` @@ -302,6 +305,8 @@ enum HummockCommands { /// KVs that are matched with the user key are printed. #[clap(long)] user_key: String, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, PrintVersionDeltaInArchive { /// The ident of the archive file in object store. It's also the first Hummock version id of this archive. @@ -313,6 +318,8 @@ enum HummockCommands { /// Version deltas that are related to the SST id are printed. #[clap(long)] sst_id: u64, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, } @@ -324,6 +331,9 @@ enum TableCommands { mv_name: String, // data directory for hummock state store. None: use default data_dir: Option, + + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, /// scan a state table using Id ScanById { @@ -331,6 +341,8 @@ enum TableCommands { table_id: u32, // data directory for hummock state store. None: use default data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, /// list all state tables List, @@ -640,8 +652,16 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { epoch, table_id, data_dir, + use_new_object_prefix_strategy, }) => { - cmd_impl::hummock::list_kv(context, epoch, table_id, data_dir).await?; + cmd_impl::hummock::list_kv( + context, + epoch, + table_id, + data_dir, + use_new_object_prefix_strategy, + ) + .await?; } Commands::Hummock(HummockCommands::SstDump(args)) => { cmd_impl::hummock::sst_dump(context, args).await.unwrap() @@ -764,12 +784,14 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { archive_ids, data_dir, sst_id, + use_new_object_prefix_strategy, }) => { cmd_impl::hummock::print_version_delta_in_archive( context, archive_ids, data_dir, sst_id, + use_new_object_prefix_strategy, ) .await?; } @@ -777,15 +799,32 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { archive_ids, data_dir, user_key, + use_new_object_prefix_strategy, }) => { - cmd_impl::hummock::print_user_key_in_archive(context, archive_ids, data_dir, user_key) - .await?; + cmd_impl::hummock::print_user_key_in_archive( + context, + archive_ids, + data_dir, + user_key, + use_new_object_prefix_strategy, + ) + .await?; } - Commands::Table(TableCommands::Scan { mv_name, data_dir }) => { - cmd_impl::table::scan(context, mv_name, data_dir).await? + Commands::Table(TableCommands::Scan { + mv_name, + data_dir, + use_new_object_prefix_strategy, + }) => { + cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy) + .await? } - Commands::Table(TableCommands::ScanById { table_id, data_dir }) => { - cmd_impl::table::scan_id(context, table_id, data_dir).await? + Commands::Table(TableCommands::ScanById { + table_id, + data_dir, + use_new_object_prefix_strategy, + }) => { + cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy) + .await? } Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?, Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?, diff --git a/src/java_binding/src/hummock_iterator.rs b/src/java_binding/src/hummock_iterator.rs index 4b6fc5b01742..5cfeb0ecf7e0 100644 --- a/src/java_binding/src/hummock_iterator.rs +++ b/src/java_binding/src/hummock_iterator.rs @@ -106,6 +106,7 @@ pub(crate) async fn new_hummock_java_binding_iter( state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), + use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy, meta_cache, block_cache, })); diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 66f136b6159d..d0d732533703 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -1,7 +1,7 @@ #![allow(clippy::enum_variant_names)] pub use sea_orm_migration::prelude::*; - +pub use sea_orm_migration::MigrationStatus; mod m20230908_072257_init; mod m20231008_020431_hummock; mod m20240304_074901_subscription; diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index e8b738305dce..43b6342e22ed 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -36,7 +36,6 @@ use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; -use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; @@ -408,13 +407,6 @@ pub async fn start_service_as_election_leader( mut svc_shutdown_rx: WatchReceiver<()>, ) -> MetaResult<()> { tracing::info!("Defining leader services"); - if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { - // Try to upgrade if any new model changes are added. - Migrator::up(&sql_store.conn, None) - .await - .expect("Failed to upgrade models in meta store"); - } - let env = MetaSrvEnv::new( opts.clone(), init_system_params, diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 1ba13c8cbf8b..e24abb68def1 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -246,6 +246,7 @@ mod tests { SystemParams { state_store: Some("state_store".into()), data_directory: Some("data_directory".into()), + use_new_object_prefix_strategy: Some(true), backup_storage_url: Some("backup_storage_url".into()), backup_storage_directory: Some("backup_storage_directory".into()), ..SystemConfig::default().into_init_system_params() diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index b623e441c0c2..7a284ca2fccd 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -20,6 +20,7 @@ use risingwave_common::config::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; @@ -348,15 +349,32 @@ impl MetaOpts { } } +/// This function `is_first_launch_for_sql_backend_cluster` is used to check whether the cluster, which uses SQL as the backend, is a new cluster. +/// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied, +/// then it is considered an old cluster. +/// +/// Note: this check should be performed before `Migrator::up()`. +pub async fn is_first_launch_for_sql_backend_cluster( + sql_meta_store: &SqlMetaStore, +) -> MetaResult { + let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; + for migration in migrations { + if migration.name() == "m20230908_072257_init" + && migration.status() == MigrationStatus::Applied + { + return Ok(false); + } + } + Ok(true) +} + impl MetaSrvEnv { pub async fn new( opts: MetaOpts, - init_system_params: SystemParams, + mut init_system_params: SystemParams, init_session_config: SessionConfig, meta_store_impl: MetaStoreImpl, ) -> MetaResult { - let notification_manager = - Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); let stream_client_pool = Arc::new(StreamClientPool::default()); let event_log_manager = Arc::new(start_event_log_manager( @@ -366,6 +384,8 @@ impl MetaSrvEnv { let env = match &meta_store_impl { MetaStoreImpl::Kv(meta_store) => { + let notification_manager = + Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); let (cluster_id, cluster_first_launch) = if let Some(id) = ClusterId::from_meta_store(meta_store).await? { @@ -373,6 +393,11 @@ impl MetaSrvEnv { } else { (ClusterId::new(), true) }; + + // For new clusters, the name of the object store needs to be prefixed according to the object id. + // For old clusters, the prefix is ​​not divided for the sake of compatibility. + + init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); let system_params_manager = Arc::new( SystemParamsManager::new( meta_store.clone(), @@ -415,11 +440,24 @@ impl MetaSrvEnv { } } MetaStoreImpl::Sql(sql_meta_store) => { + let is_sql_backend_cluster_first_launch = + is_first_launch_for_sql_backend_cluster(sql_meta_store).await?; + // Try to upgrade if any new model changes are added. + Migrator::up(&sql_meta_store.conn, None) + .await + .expect("Failed to upgrade models in meta store"); + + let notification_manager = + Arc::new(NotificationManager::new(meta_store_impl.clone()).await); let cluster_id = Cluster::find() .one(&sql_meta_store.conn) .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); + init_system_params.use_new_object_prefix_strategy = + Some(is_sql_backend_cluster_first_launch); + // For new clusters, the name of the object store needs to be prefixed according to the object id. + // For old clusters, the prefix is ​​not divided for the sake of compatibility. let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 6bc78ae8b198..270cac3719d6 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -101,7 +101,7 @@ pub struct InMemObjectStore { impl ObjectStore for InMemObjectStore { type StreamingUploader = InMemStreamingUploader; - fn get_object_prefix(&self, _obj_id: u64) -> String { + fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { String::default() } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 5369d914a751..e5e9cb966126 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -73,8 +73,8 @@ pub trait StreamingUploader: Send { #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { type StreamingUploader: StreamingUploader; - /// Get the key prefix for object - fn get_object_prefix(&self, obj_id: u64) -> String; + /// Get the key prefix for object, the prefix is determined by the type of object store and `devise_object_prefix`. + fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String; /// Uploads the object to `ObjectStore`. async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>; @@ -325,8 +325,10 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, list(prefix).await) } - pub fn get_object_prefix(&self, obj_id: u64) -> String { - dispatch_object_store_enum!(self, |store| store.inner.get_object_prefix(obj_id)) + pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { + dispatch_object_store_enum!(self, |store| store + .inner + .get_object_prefix(obj_id, use_new_object_prefix_strategy)) } pub fn support_streaming_upload(&self) -> bool { diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 47ca4f362702..0d946e95d43f 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -72,18 +72,20 @@ impl OpendalObjectStore { impl ObjectStore for OpendalObjectStore { type StreamingUploader = OpendalStreamingUploader; - fn get_object_prefix(&self, obj_id: u64) -> String { + fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), EngineType::Minio => prefix::s3::get_object_prefix(obj_id), EngineType::Memory => String::default(), - EngineType::Hdfs => String::default(), - EngineType::Gcs => String::default(), - EngineType::Obs => String::default(), - EngineType::Oss => String::default(), - EngineType::Webhdfs => String::default(), - EngineType::Azblob => String::default(), - EngineType::Fs => String::default(), + EngineType::Hdfs + | EngineType::Gcs + | EngineType::Obs + | EngineType::Oss + | EngineType::Webhdfs + | EngineType::Azblob + | EngineType::Fs => { + prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy) + } } } diff --git a/src/object_store/src/object/prefix.rs b/src/object_store/src/object/prefix.rs index e29729bf752f..5229d900a8b6 100644 --- a/src/object_store/src/object/prefix.rs +++ b/src/object_store/src/object/prefix.rs @@ -23,3 +23,23 @@ pub(crate) mod s3 { obj_prefix } } + +pub(crate) mod opendal_engine { + /// The number of Azblob bucket prefixes + pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256; + + pub(crate) fn get_object_prefix(obj_id: u64, use_new_object_prefix_strategy: bool) -> String { + // For OpenDAL object storage, whether objects are divided by prefixes depends on whether it is a new cluster: + // If it is a new cluster, objects will be divided into `NUM_BUCKET_PREFIXES_AZBLOB` prefixes. + // If it is an old cluster, prefixes are not used due to the need to read and write old data. + match use_new_object_prefix_strategy { + true => { + let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB; + let mut obj_prefix = prefix.to_string(); + obj_prefix.push('/'); + obj_prefix + } + false => String::default(), + } + } +} diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 6c72ced36563..f1f569cb7d36 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -402,8 +402,9 @@ pub struct S3ObjectStore { impl ObjectStore for S3ObjectStore { type StreamingUploader = S3StreamingUploader; - fn get_object_prefix(&self, obj_id: u64) -> String { + fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. + // Using aws s3 sdk as object storage, the object prefix will be divided by default. prefix::s3::get_object_prefix(obj_id) } diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 4c91531020ea..2f06b9183909 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -118,7 +118,7 @@ pub struct SimObjectStore { impl ObjectStore for SimObjectStore { type StreamingUploader = SimStreamingUploader; - fn get_object_prefix(&self, _obj_id: u64) -> String { + fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String { String::default() } diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 1ccb3a4eccaa..08da1438709c 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -80,6 +80,7 @@ pub async fn mock_sstable_store() -> SstableStoreRef { max_prefetch_block_number: 16, recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + use_new_object_prefix_strategy: true, meta_cache, block_cache, diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index b67be3467c48..08d744189a1d 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -148,6 +148,7 @@ async fn generate_sstable_store(object_store: Arc) -> Arc Result Ssta recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + use_new_object_prefix_strategy: true, meta_cache, block_cache, diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index a6ab2a6162e0..3240a4243a58 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -125,6 +125,7 @@ pub struct SstableStoreConfig { pub max_prefetch_block_number: usize, pub recent_filter: Option>>, pub state_store_metrics: Arc, + pub use_new_object_prefix_strategy: bool, pub meta_cache: HybridCache>, pub block_cache: HybridCache>, @@ -144,6 +145,15 @@ pub struct SstableStore { prefetch_buffer_usage: Arc, prefetch_buffer_capacity: usize, max_prefetch_block_number: usize, + /// Whether the object store is divided into prefixes depends on two factors: + /// 1. The specific object store type. + /// 2. Whether the existing cluster is a new cluster. + /// + /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters. + /// For a new cluster, `use_new_object_prefix_strategy` is set to True. + /// For an old cluster, `use_new_object_prefix_strategy` is set to False. + /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility. + use_new_object_prefix_strategy: bool, } impl SstableStore { @@ -162,6 +172,7 @@ impl SstableStore { prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)), prefetch_buffer_capacity: config.prefetch_buffer_capacity, max_prefetch_block_number: config.max_prefetch_block_number, + use_new_object_prefix_strategy: config.use_new_object_prefix_strategy, } } @@ -173,6 +184,7 @@ impl SstableStore { path: String, block_cache_capacity: usize, meta_cache_capacity: usize, + use_new_object_prefix_strategy: bool, ) -> HummockResult { let meta_cache = HybridCacheBuilder::new() .memory(meta_cache_capacity) @@ -205,6 +217,7 @@ impl SstableStore { prefetch_buffer_capacity: block_cache_capacity, max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */ recent_filter: None, + use_new_object_prefix_strategy, meta_cache, block_cache, @@ -508,7 +521,9 @@ impl SstableStore { } pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String { - let obj_prefix = self.store.get_object_prefix(object_id); + let obj_prefix = self + .store + .get_object_prefix(object_id, self.use_new_object_prefix_strategy); format!( "{}/{}{}.{}", self.path, obj_prefix, object_id, OBJECT_SUFFIX diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index fdecccc85893..2512f680b536 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -617,6 +617,7 @@ impl StateStoreImpl { storage_metrics: Arc, compactor_metrics: Arc, await_tree_config: Option, + use_new_object_prefix_strategy: bool, ) -> StorageResult { const MB: usize = 1 << 20; @@ -752,6 +753,7 @@ impl StateStoreImpl { max_prefetch_block_number: opts.max_prefetch_block_number, recent_filter, state_store_metrics: state_store_metrics.clone(), + use_new_object_prefix_strategy, meta_cache, block_cache, diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index e492b6497907..f294d8fb6fe5 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -711,6 +711,7 @@ pub async fn create_hummock_store_with_metrics( metrics.storage_metrics.clone(), metrics.compactor_metrics.clone(), None, + true, ) .await?; diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 7183ee7e2e85..5d6c6ff7e70d 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -227,6 +227,7 @@ async fn compaction_test( max_prefetch_block_number: storage_opts.max_prefetch_block_number, recent_filter: None, state_store_metrics: state_store_metrics.clone(), + use_new_object_prefix_strategy: system_params.use_new_object_prefix_strategy(), meta_cache, block_cache, }));