Skip to content

Commit

Permalink
judge is new cluster before Migrator::up and fix all ctl command
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Jun 3, 2024
1 parent 260fac6 commit 1f837c6
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 38 deletions.
8 changes: 7 additions & 1 deletion src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub enum BenchCommands {
#[clap(long, default_value_t = 1)]
threads: usize,
data_dir: Option<String>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
}

Expand Down Expand Up @@ -86,9 +88,13 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
mv_name,
threads,
data_dir,
use_new_object_prefix_strategy,
} => {
let (hummock, metrics) = context
.hummock_store_with_metrics(HummockServiceOpts::from_env(data_dir)?)
.hummock_store_with_metrics(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog(meta.clone(), mv_name).await?;
let mut handlers = vec![];
Expand Down
6 changes: 5 additions & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ pub async fn list_kv(
epoch: u64,
table_id: u32,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
if is_max_epoch(epoch) {
tracing::info!("using MAX EPOCH as epoch");
Expand Down
16 changes: 11 additions & 5 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ pub struct SstDumpArgs {
print_table: bool,
#[clap(short = 'd')]
data_dir: Option<String>,
#[clap(short, long = "use_new_object_prefix_strategy")]
use_new_object_prefix_strategy: Option<bool>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
}

pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
Expand All @@ -74,7 +74,10 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
if args.print_level {
// Level information is retrieved from meta service
let hummock = context
.hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?)
.hummock_store(HummockServiceOpts::from_env(
args.data_dir.clone(),
args.use_new_object_prefix_strategy,
)?)
.await?;
let version = hummock.inner().get_pinned_version().version().clone();
let sstable_store = hummock.sstable_store();
Expand Down Expand Up @@ -110,9 +113,12 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}
} else {
// Object information is retrieved from object store. Meta service is not required.
let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?;
let hummock_service_opts = HummockServiceOpts::from_env(
args.data_dir.clone(),
args.use_new_object_prefix_strategy,
)?;
let sstable_store = hummock_service_opts
.create_sstable_store(args.use_new_object_prefix_strategy.unwrap_or(true))
.create_sstable_store(args.use_new_object_prefix_strategy)
.await?;
if let Some(obj_id) = &args.object_id {
let obj_store = sstable_store.store();
Expand Down
8 changes: 6 additions & 2 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ pub async fn print_user_key_in_archive(
archive_ids: Vec<HummockVersionId>,
data_dir: String,
user_key: String,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
panic!("cannot decode user key {} into raw bytes", user_key);
});
let user_key = UserKey::decode(&user_key_bytes);
println!("user key: {user_key:?}");

let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
let hummock_opts =
HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
let hummock = context.hummock_store(hummock_opts).await?;
let sstable_store = hummock.sstable_store();
let archive_object_store = sstable_store.store();
Expand Down Expand Up @@ -178,8 +180,10 @@ pub async fn print_version_delta_in_archive(
archive_ids: Vec<HummockVersionId>,
data_dir: String,
sst_id: HummockSstableObjectId,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
let hummock_opts =
HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
let hummock = context.hummock_store(hummock_opts).await?;
let sstable_store = hummock.sstable_store();
let archive_object_store = sstable_store.store();
Expand Down
24 changes: 20 additions & 4 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,35 @@ pub fn make_storage_table<S: StateStore>(
))
}

pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option<String>) -> Result<()> {
pub async fn scan(
context: &CtlContext,
mv_name: String,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<()> {
let meta_client = context.meta_client().await?;
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog(meta_client, mv_name).await?;
do_scan(table, hummock).await
}

pub async fn scan_id(context: &CtlContext, table_id: u32, data_dir: Option<String>) -> Result<()> {
pub async fn scan_id(
context: &CtlContext,
table_id: u32,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<()> {
let meta_client = context.meta_client().await?;
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog_by_id(meta_client, table_id).await?;
do_scan(table, hummock).await
Expand Down
9 changes: 4 additions & 5 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl HummockServiceOpts {
/// Currently, we will read these variables for meta:
///
/// * `RW_HUMMOCK_URL`: hummock store address
pub fn from_env(data_dir: Option<String>) -> Result<Self> {
pub fn from_env(
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<Self> {
let hummock_url = match env::var("RW_HUMMOCK_URL") {
Ok(url) => {
if !url.starts_with("hummock+") {
Expand All @@ -82,10 +85,6 @@ impl HummockServiceOpts {
bail!(MESSAGE);
}
};
let use_new_object_prefix_strategy = match env::var("RW_USE_NEW_OBJECT_PREFIX_STRATEGY") {
Ok(use_new_object_prefix_strategy) => use_new_object_prefix_strategy == "true",
_ => false,
};

Ok(Self {
hummock_url,
Expand Down
53 changes: 46 additions & 7 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ enum HummockCommands {

// data directory for hummock state store. None: use default
data_dir: Option<String>,

#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
SstDump(SstDumpArgs),
/// trigger a targeted compaction through `compaction_group_id`
Expand Down Expand Up @@ -295,6 +298,8 @@ enum HummockCommands {
/// KVs that are matched with the user key are printed.
#[clap(long)]
user_key: String,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
PrintVersionDeltaInArchive {
/// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
Expand All @@ -306,6 +311,8 @@ enum HummockCommands {
/// Version deltas that are related to the SST id are printed.
#[clap(long)]
sst_id: u64,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
}

Expand All @@ -317,13 +324,18 @@ enum TableCommands {
mv_name: String,
// data directory for hummock state store. None: use default
data_dir: Option<String>,

#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
/// scan a state table using Id
ScanById {
/// id of the state table to operate on
table_id: u32,
// data directory for hummock state store. None: use default
data_dir: Option<String>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
/// list all state tables
List,
Expand Down Expand Up @@ -633,8 +645,16 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
epoch,
table_id,
data_dir,
use_new_object_prefix_strategy,
}) => {
cmd_impl::hummock::list_kv(context, epoch, table_id, data_dir).await?;
cmd_impl::hummock::list_kv(
context,
epoch,
table_id,
data_dir,
use_new_object_prefix_strategy,
)
.await?;
}
Commands::Hummock(HummockCommands::SstDump(args)) => {
cmd_impl::hummock::sst_dump(context, args).await.unwrap()
Expand Down Expand Up @@ -744,28 +764,47 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
archive_ids,
data_dir,
sst_id,
use_new_object_prefix_strategy,
}) => {
cmd_impl::hummock::print_version_delta_in_archive(
context,
archive_ids,
data_dir,
sst_id,
use_new_object_prefix_strategy,
)
.await?;
}
Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
archive_ids,
data_dir,
user_key,
use_new_object_prefix_strategy,
}) => {
cmd_impl::hummock::print_user_key_in_archive(context, archive_ids, data_dir, user_key)
.await?;
cmd_impl::hummock::print_user_key_in_archive(
context,
archive_ids,
data_dir,
user_key,
use_new_object_prefix_strategy,
)
.await?;
}
Commands::Table(TableCommands::Scan { mv_name, data_dir }) => {
cmd_impl::table::scan(context, mv_name, data_dir).await?
Commands::Table(TableCommands::Scan {
mv_name,
data_dir,
use_new_object_prefix_strategy,
}) => {
cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
.await?
}
Commands::Table(TableCommands::ScanById { table_id, data_dir }) => {
cmd_impl::table::scan_id(context, table_id, data_dir).await?
Commands::Table(TableCommands::ScanById {
table_id,
data_dir,
use_new_object_prefix_strategy,
}) => {
cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
.await?
}
Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
Expand Down
26 changes: 25 additions & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::stream::ScaleController;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
use risingwave_meta_service::backup_service::BackupServiceImpl;
use risingwave_meta_service::cloud_service::CloudServiceImpl;
use risingwave_meta_service::cluster_service::ClusterServiceImpl;
Expand Down Expand Up @@ -391,6 +391,26 @@ pub async fn start_service_as_election_follower(
server.await;
}

/// This function `is_first_launch_for_sql_backend_cluster` is used to check whether the cluster, which uses SQL as the backend, is a new cluster.
/// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied,
/// then it is considered an old cluster.
///
/// Note: this check should be performed before `Migrator::up()`.
pub async fn is_first_launch_for_sql_backend_cluster(
sql_meta_store: &SqlMetaStore,
) -> MetaResult<bool> {
let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?;
let mut cluster_first_launch = true;
for migration in migrations {
if migration.name() == "m20230908_072257_init"
&& migration.status() == MigrationStatus::Applied
{
cluster_first_launch = false;
}
}
Ok(cluster_first_launch)
}

/// Starts all services needed for the meta leader node
/// Only call this function once, since initializing the services multiple times will result in an
/// inconsistent state
Expand All @@ -408,7 +428,10 @@ pub async fn start_service_as_election_leader(
mut svc_shutdown_rx: WatchReceiver<()>,
) -> MetaResult<()> {
tracing::info!("Defining leader services");
let mut is_sql_backend_cluster_first_launch = true;
if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl {
is_sql_backend_cluster_first_launch =
is_first_launch_for_sql_backend_cluster(sql_store).await?;
// Try to upgrade if any new model changes are added.
Migrator::up(&sql_store.conn, None)
.await
Expand All @@ -420,6 +443,7 @@ pub async fn start_service_as_election_leader(
init_system_params,
init_session_config,
meta_store_impl,
is_sql_backend_cluster_first_launch,
)
.await?;
let system_params_reader = env.system_params_reader().await;
Expand Down
18 changes: 6 additions & 12 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::config::{
};
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
use risingwave_meta_model_v2::prelude::Cluster;
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef};
Expand Down Expand Up @@ -351,6 +350,7 @@ impl MetaSrvEnv {
init_system_params: SystemParams,
init_session_config: SessionConfig,
meta_store_impl: MetaStoreImpl,
is_sql_backend_cluster_first_launch: bool,
) -> MetaResult<Self> {
let notification_manager =
Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
Expand Down Expand Up @@ -417,20 +417,12 @@ impl MetaSrvEnv {
.await?
.map(|c| c.cluster_id.to_string().into())
.unwrap();
let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?;
let mut cluster_first_launch = true;
// If `m20230908_072257_init` has been applied, it is the old cluster
for migration in migrations {
if migration.name() == "m20230908_072257_init"
&& migration.status() == MigrationStatus::Applied
{
cluster_first_launch = false;
}
}

let mut system_params = init_system_params;
// For new clusters, the name of the object store needs to be prefixed according to the object id.
// For old clusters, the prefix is ​​not divided for the sake of compatibility.
system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
system_params.use_new_object_prefix_strategy =
Some(is_sql_backend_cluster_first_launch);
let system_param_controller = Arc::new(
SystemParamsController::new(
sql_meta_store.clone(),
Expand Down Expand Up @@ -548,6 +540,7 @@ impl MetaSrvEnv {
risingwave_common::system_param::system_params_for_test(),
Default::default(),
MetaStoreImpl::Sql(SqlMetaStore::for_test().await),
true,
)
.await
.unwrap()
Expand All @@ -561,6 +554,7 @@ impl MetaSrvEnv {
risingwave_common::system_param::system_params_for_test(),
Default::default(),
MetaStoreImpl::Kv(MemStore::default().into_ref()),
true,
)
.await
.unwrap()
Expand Down

0 comments on commit 1f837c6

Please sign in to comment.