From 1f837c6bc4b72764f2b9fdf216718c9861538471 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 3 Jun 2024 18:59:17 +0800 Subject: [PATCH] judge is new cluster before Migrator::up and fix all ctl command --- src/ctl/src/cmd_impl/bench.rs | 8 ++- src/ctl/src/cmd_impl/hummock/list_kv.rs | 6 ++- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 16 ++++-- .../src/cmd_impl/hummock/validate_version.rs | 8 ++- src/ctl/src/cmd_impl/table/scan.rs | 24 +++++++-- src/ctl/src/common/hummock_service.rs | 9 ++-- src/ctl/src/lib.rs | 53 ++++++++++++++++--- src/meta/node/src/server.rs | 26 ++++++++- src/meta/src/manager/env.rs | 18 +++---- 9 files changed, 130 insertions(+), 38 deletions(-) diff --git a/src/ctl/src/cmd_impl/bench.rs b/src/ctl/src/cmd_impl/bench.rs index d3c0cde6d20e..dce4a21115d6 100644 --- a/src/ctl/src/cmd_impl/bench.rs +++ b/src/ctl/src/cmd_impl/bench.rs @@ -42,6 +42,8 @@ pub enum BenchCommands { #[clap(long, default_value_t = 1)] threads: usize, data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, } @@ -86,9 +88,13 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> { mv_name, threads, data_dir, + use_new_object_prefix_strategy, } => { let (hummock, metrics) = context - .hummock_store_with_metrics(HummockServiceOpts::from_env(data_dir)?) + .hummock_store_with_metrics(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog(meta.clone(), mv_name).await?; let mut handlers = vec![]; diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 2eb54362b413..f90712a02505 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -27,9 +27,13 @@ pub async fn list_kv( epoch: u64, table_id: u32, data_dir: Option, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; if is_max_epoch(epoch) { tracing::info!("using MAX EPOCH as epoch"); diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index cd4e3b1c85ab..ce69ab87f1b7 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -59,8 +59,8 @@ pub struct SstDumpArgs { print_table: bool, #[clap(short = 'd')] data_dir: Option, - #[clap(short, long = "use_new_object_prefix_strategy")] - use_new_object_prefix_strategy: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, } pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { @@ -74,7 +74,10 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result if args.print_level { // Level information is retrieved from meta service let hummock = context - .hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?) + .hummock_store(HummockServiceOpts::from_env( + args.data_dir.clone(), + args.use_new_object_prefix_strategy, + )?) .await?; let version = hummock.inner().get_pinned_version().version().clone(); let sstable_store = hummock.sstable_store(); @@ -110,9 +113,12 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result } } else { // Object information is retrieved from object store. Meta service is not required. - let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?; + let hummock_service_opts = HummockServiceOpts::from_env( + args.data_dir.clone(), + args.use_new_object_prefix_strategy, + )?; let sstable_store = hummock_service_opts - .create_sstable_store(args.use_new_object_prefix_strategy.unwrap_or(true)) + .create_sstable_store(args.use_new_object_prefix_strategy) .await?; if let Some(obj_id) = &args.object_id { let obj_store = sstable_store.store(); diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index b2ae1c22f66c..e8f61a8c9835 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -65,6 +65,7 @@ pub async fn print_user_key_in_archive( archive_ids: Vec, data_dir: String, user_key: String, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| { panic!("cannot decode user key {} into raw bytes", user_key); @@ -72,7 +73,8 @@ pub async fn print_user_key_in_archive( let user_key = UserKey::decode(&user_key_bytes); println!("user key: {user_key:?}"); - let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?; + let hummock_opts = + HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?; let hummock = context.hummock_store(hummock_opts).await?; let sstable_store = hummock.sstable_store(); let archive_object_store = sstable_store.store(); @@ -178,8 +180,10 @@ pub async fn print_version_delta_in_archive( archive_ids: Vec, data_dir: String, sst_id: HummockSstableObjectId, + use_new_object_prefix_strategy: bool, ) -> anyhow::Result<()> { - let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?; + let hummock_opts = + HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?; let hummock = context.hummock_store(hummock_opts).await?; let sstable_store = hummock.sstable_store(); let archive_object_store = sstable_store.store(); diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 8c21d975009f..0689e315f74c 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -86,19 +86,35 @@ pub fn make_storage_table( )) } -pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option) -> Result<()> { +pub async fn scan( + context: &CtlContext, + mv_name: String, + data_dir: Option, + use_new_object_prefix_strategy: bool, +) -> Result<()> { let meta_client = context.meta_client().await?; let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog(meta_client, mv_name).await?; do_scan(table, hummock).await } -pub async fn scan_id(context: &CtlContext, table_id: u32, data_dir: Option) -> Result<()> { +pub async fn scan_id( + context: &CtlContext, + table_id: u32, + data_dir: Option, + use_new_object_prefix_strategy: bool, +) -> Result<()> { let meta_client = context.meta_client().await?; let hummock = context - .hummock_store(HummockServiceOpts::from_env(data_dir)?) + .hummock_store(HummockServiceOpts::from_env( + data_dir, + use_new_object_prefix_strategy, + )?) .await?; let table = get_table_catalog_by_id(meta_client, table_id).await?; do_scan(table, hummock).await diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 6724be46a347..0d42b77c1361 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -57,7 +57,10 @@ impl HummockServiceOpts { /// Currently, we will read these variables for meta: /// /// * `RW_HUMMOCK_URL`: hummock store address - pub fn from_env(data_dir: Option) -> Result { + pub fn from_env( + data_dir: Option, + use_new_object_prefix_strategy: bool, + ) -> Result { let hummock_url = match env::var("RW_HUMMOCK_URL") { Ok(url) => { if !url.starts_with("hummock+") { @@ -82,10 +85,6 @@ impl HummockServiceOpts { bail!(MESSAGE); } }; - let use_new_object_prefix_strategy = match env::var("RW_USE_NEW_OBJECT_PREFIX_STRATEGY") { - Ok(use_new_object_prefix_strategy) => use_new_object_prefix_strategy == "true", - _ => false, - }; Ok(Self { hummock_url, diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index a1aaa8f48c5f..635912ede3dc 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -192,6 +192,9 @@ enum HummockCommands { // data directory for hummock state store. None: use default data_dir: Option, + + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, SstDump(SstDumpArgs), /// trigger a targeted compaction through `compaction_group_id` @@ -295,6 +298,8 @@ enum HummockCommands { /// KVs that are matched with the user key are printed. #[clap(long)] user_key: String, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, PrintVersionDeltaInArchive { /// The ident of the archive file in object store. It's also the first Hummock version id of this archive. @@ -306,6 +311,8 @@ enum HummockCommands { /// Version deltas that are related to the SST id are printed. #[clap(long)] sst_id: u64, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, } @@ -317,6 +324,9 @@ enum TableCommands { mv_name: String, // data directory for hummock state store. None: use default data_dir: Option, + + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, /// scan a state table using Id ScanById { @@ -324,6 +334,8 @@ enum TableCommands { table_id: u32, // data directory for hummock state store. None: use default data_dir: Option, + #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")] + use_new_object_prefix_strategy: bool, }, /// list all state tables List, @@ -633,8 +645,16 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { epoch, table_id, data_dir, + use_new_object_prefix_strategy, }) => { - cmd_impl::hummock::list_kv(context, epoch, table_id, data_dir).await?; + cmd_impl::hummock::list_kv( + context, + epoch, + table_id, + data_dir, + use_new_object_prefix_strategy, + ) + .await?; } Commands::Hummock(HummockCommands::SstDump(args)) => { cmd_impl::hummock::sst_dump(context, args).await.unwrap() @@ -744,12 +764,14 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { archive_ids, data_dir, sst_id, + use_new_object_prefix_strategy, }) => { cmd_impl::hummock::print_version_delta_in_archive( context, archive_ids, data_dir, sst_id, + use_new_object_prefix_strategy, ) .await?; } @@ -757,15 +779,32 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { archive_ids, data_dir, user_key, + use_new_object_prefix_strategy, }) => { - cmd_impl::hummock::print_user_key_in_archive(context, archive_ids, data_dir, user_key) - .await?; + cmd_impl::hummock::print_user_key_in_archive( + context, + archive_ids, + data_dir, + user_key, + use_new_object_prefix_strategy, + ) + .await?; } - Commands::Table(TableCommands::Scan { mv_name, data_dir }) => { - cmd_impl::table::scan(context, mv_name, data_dir).await? + Commands::Table(TableCommands::Scan { + mv_name, + data_dir, + use_new_object_prefix_strategy, + }) => { + cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy) + .await? } - Commands::Table(TableCommands::ScanById { table_id, data_dir }) => { - cmd_impl::table::scan_id(context, table_id, data_dir).await? + Commands::Table(TableCommands::ScanById { + table_id, + data_dir, + use_new_object_prefix_strategy, + }) => { + cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy) + .await? } Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?, Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?, diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bf5bb72ed731..bfa3a5a52ba8 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -36,7 +36,7 @@ use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; -use risingwave_meta_model_migration::{Migrator, MigratorTrait}; +use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; @@ -391,6 +391,26 @@ pub async fn start_service_as_election_follower( server.await; } +/// This function `is_first_launch_for_sql_backend_cluster` is used to check whether the cluster, which uses SQL as the backend, is a new cluster. +/// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied, +/// then it is considered an old cluster. +/// +/// Note: this check should be performed before `Migrator::up()`. +pub async fn is_first_launch_for_sql_backend_cluster( + sql_meta_store: &SqlMetaStore, +) -> MetaResult { + let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; + let mut cluster_first_launch = true; + for migration in migrations { + if migration.name() == "m20230908_072257_init" + && migration.status() == MigrationStatus::Applied + { + cluster_first_launch = false; + } + } + Ok(cluster_first_launch) +} + /// Starts all services needed for the meta leader node /// Only call this function once, since initializing the services multiple times will result in an /// inconsistent state @@ -408,7 +428,10 @@ pub async fn start_service_as_election_leader( mut svc_shutdown_rx: WatchReceiver<()>, ) -> MetaResult<()> { tracing::info!("Defining leader services"); + let mut is_sql_backend_cluster_first_launch = true; if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl { + is_sql_backend_cluster_first_launch = + is_first_launch_for_sql_backend_cluster(sql_store).await?; // Try to upgrade if any new model changes are added. Migrator::up(&sql_store.conn, None) .await @@ -420,6 +443,7 @@ pub async fn start_service_as_election_leader( init_system_params, init_session_config, meta_store_impl, + is_sql_backend_cluster_first_launch, ) .await?; let system_params_reader = env.system_params_reader().await; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 6b80aada9f38..5da6d7f8b4d7 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -20,7 +20,6 @@ use risingwave_common::config::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; -use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; @@ -351,6 +350,7 @@ impl MetaSrvEnv { init_system_params: SystemParams, init_session_config: SessionConfig, meta_store_impl: MetaStoreImpl, + is_sql_backend_cluster_first_launch: bool, ) -> MetaResult { let notification_manager = Arc::new(NotificationManager::new(meta_store_impl.clone()).await); @@ -417,20 +417,12 @@ impl MetaSrvEnv { .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); - let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?; - let mut cluster_first_launch = true; - // If `m20230908_072257_init` has been applied, it is the old cluster - for migration in migrations { - if migration.name() == "m20230908_072257_init" - && migration.status() == MigrationStatus::Applied - { - cluster_first_launch = false; - } - } + let mut system_params = init_system_params; // For new clusters, the name of the object store needs to be prefixed according to the object id. // For old clusters, the prefix is ​​not divided for the sake of compatibility. - system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); + system_params.use_new_object_prefix_strategy = + Some(is_sql_backend_cluster_first_launch); let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), @@ -548,6 +540,7 @@ impl MetaSrvEnv { risingwave_common::system_param::system_params_for_test(), Default::default(), MetaStoreImpl::Sql(SqlMetaStore::for_test().await), + true, ) .await .unwrap() @@ -561,6 +554,7 @@ impl MetaSrvEnv { risingwave_common::system_param::system_params_for_test(), Default::default(), MetaStoreImpl::Kv(MemStore::default().into_ref()), + true, ) .await .unwrap()