Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object store): introduce object prefix for opendal object store #16542

Merged
merged 25 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ postmaster checkpoint_frequency
postmaster enable_tracing
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
postmaster use_new_object_prefix_strategy
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
user application_name
user background_ddl
user batch_enable_distributed_dml
Expand Down
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ message SystemParams {
optional bool pause_on_next_bootstrap = 13;
optional string wasm_storage_url = 14 [deprecated = true];
optional bool enable_tracing = 15;
optional bool use_new_object_prefix_strategy = 16;
}

message GetSystemParamsRequest {}
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ macro_rules! for_all_params {
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
{ use_new_object_prefix_strategy, bool, Some(false), true, "Whether to split object prefix.", },
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}
};
}
Expand Down Expand Up @@ -441,6 +442,7 @@ mod tests {
(MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
(PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
(ENABLE_TRACING_KEY, "true"),
(USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "true"),
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
("a_deprecated_param", "foo"),
];

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ where
self.inner().data_directory.as_ref().unwrap()
}

fn use_new_object_prefix_strategy(&self) -> bool {
self.inner().use_new_object_prefix_strategy.unwrap_or(true)
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

fn backup_storage_url(&self) -> &str {
self.inner().backup_storage_url.as_ref().unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub async fn compute_node_serve(
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, &opts);

info!("Starting compute node",);
info!("> config: {:?}", config);
info!(
Expand Down Expand Up @@ -208,6 +207,7 @@ pub async fn compute_node_serve(
storage_metrics.clone(),
compactor_metrics.clone(),
await_tree_config.clone(),
system_params.use_new_object_prefix_strategy(),
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,4 @@ This page is automatically generated by `./risedev generate-example-config`
| pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false |
| sstable_size_mb | Target size of the Sstable. | 256 |
| state_store | URL for the state store | |
| use_new_object_prefix_strategy | Whether to split object prefix. | false |
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,4 @@ bloom_false_positive = 0.001
max_concurrent_creating_streaming_jobs = 1
pause_on_next_bootstrap = false
enable_tracing = false
use_new_object_prefix_strategy = false
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct SstDumpArgs {
print_table: bool,
#[clap(short = 'd')]
data_dir: Option<String>,
#[clap(short, long = "use_new_object_prefix_strategy")]
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
use_new_object_prefix_strategy: Option<bool>,
}

pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
Expand Down Expand Up @@ -109,7 +111,9 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
} else {
// Object information is retrieved from object store. Meta service is not required.
let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?;
let sstable_store = hummock_service_opts.create_sstable_store().await?;
let sstable_store = hummock_service_opts
.create_sstable_store(args.use_new_object_prefix_strategy.unwrap_or(true))
.await?;
if let Some(obj_id) = &args.object_id {
let obj_store = sstable_store.store();
let obj_path = sstable_store.get_sst_data_path(*obj_id);
Expand Down
15 changes: 14 additions & 1 deletion src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct HummockServiceOpts {
pub hummock_url: String,
pub data_dir: Option<String>,

use_new_object_prefix_strategy: bool,

heartbeat_handle: Option<JoinHandle<()>>,
heartbeat_shutdown_sender: Option<Sender<()>>,
}
Expand Down Expand Up @@ -80,11 +82,17 @@ impl HummockServiceOpts {
bail!(MESSAGE);
}
};
let use_new_object_prefix_strategy = match env::var("RW_USE_NEW_OBJECT_PREFIX_STRATEGY") {
Ok(use_new_object_prefix_strategy) => use_new_object_prefix_strategy == "true",
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
_ => false,
};

Ok(Self {
hummock_url,
data_dir,
heartbeat_handle: None,
heartbeat_shutdown_sender: None,
use_new_object_prefix_strategy,
})
}

Expand Down Expand Up @@ -142,6 +150,7 @@ impl HummockServiceOpts {
metrics.storage_metrics.clone(),
metrics.compactor_metrics.clone(),
None,
self.use_new_object_prefix_strategy,
)
.await?;

Expand All @@ -157,7 +166,10 @@ impl HummockServiceOpts {
}
}

pub async fn create_sstable_store(&self) -> Result<Arc<SstableStore>> {
pub async fn create_sstable_store(
&self,
use_new_object_prefix_strategy: bool,
) -> Result<Arc<SstableStore>> {
let object_store = build_remote_object_store(
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
Expand Down Expand Up @@ -190,6 +202,7 @@ impl HummockServiceOpts {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
use_new_object_prefix_strategy,
meta_cache_v2,
block_cache_v2,
})))
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl HummockJavaBindingIterator {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
use_new_object_prefix_strategy: false,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
meta_cache_v2,
block_cache_v2,
}));
Expand Down
43 changes: 37 additions & 6 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::DeleteObjectTracker;
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager};
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager, SystemParamsManagerImpl};
use crate::model::{ClusterId, MetadataModel, MetadataModelError};
use crate::rpc::metrics::MetaMetrics;

Expand Down Expand Up @@ -224,13 +224,39 @@ impl HummockManager {
// Skip this check in e2e compaction test, which needs to start a secondary cluster with
// same bucket
if !deterministic_mode {
write_exclusive_cluster_id(
let use_new_object_prefix_strategy = write_exclusive_cluster_id(
state_store_dir,
env.cluster_id().clone(),
object_store.clone(),
)
.await?;

if use_new_object_prefix_strategy {
match env.system_params_manager_impl_ref() {
SystemParamsManagerImpl::Kv(mgr) => {
mgr.set_param("use_new_object_prefix_strategy", Some("true".to_owned()))
.await
.unwrap();
}
SystemParamsManagerImpl::Sql(mgr) => {
mgr.set_param("use_new_object_prefix_strategy", Some("true".to_owned()))
.await
.unwrap();
}
};
} else {
match env.system_params_manager_impl_ref() {
SystemParamsManagerImpl::Kv(mgr) => {
mgr.set_param("use_new_object_prefix_strategy", Some("false".to_owned()))
.await
.unwrap();
}
SystemParamsManagerImpl::Sql(mgr) => {
mgr.set_param("use_new_object_prefix_strategy", Some("false".to_owned()))
.await
.unwrap();
}
};
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we move the logic to MetaSrvEnv::new instead here becuase:

  • cluster_first_launch can be obtained directly there without interacting with object store and modifying write_exclusive_cluster_id
  • We can avoid making use_new_object_prefix_strategy mutable and instead pass the init value for use_new_object_prefix_strategy to init system params manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_first_launch can be obtained directly there

For KV meta store, it is, but for SQL meta store, can we know whether it is the first launch here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yezizp2012 Can we also judge cluster_first_launch for SQL backend, just like KV backend?
Otherwise we need to modify this system variable later, but "Making the use_new_object_prefix_strategy mutable will be dangerous because user can techinically change it via ALTER in SQL and the after a restart, the whole cluster will crash."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some offline discussion, for SQL backend I use MigratorTrait::get_applied_migrations() to check whether it's a new cluster:

  • if m20230908_072257_init is applied, it's an old cluster
  • if m20230908_072257_init is not applied, it's a new cluster.

Then we can make use_new_object_prefix_strategy immutable.

// config bucket lifecycle for new cluster.
if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
&& !env.opts.do_not_config_object_storage_lifecycle
Expand Down Expand Up @@ -561,11 +587,16 @@ impl HummockManager {
}
}

/// This function, `write_exclusive_cluster_id`, is used to check if it is a new cluster during meta startup:
/// For new clusters, the name of the object store needs to be prefixed according to the object id.
/// For old clusters, the prefix is ​​not divided for the sake of compatibility.
///
/// The return value of this function represents whether to adopt the new object prefix strategy.
async fn write_exclusive_cluster_id(
state_store_dir: &str,
cluster_id: ClusterId,
object_store: ObjectStoreRef,
) -> Result<()> {
) -> Result<bool> {
const CLUSTER_ID_DIR: &str = "cluster_id";
const CLUSTER_ID_NAME: &str = "0";
let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
Expand All @@ -574,7 +605,7 @@ async fn write_exclusive_cluster_id(
Ok(stored_cluster_id) => {
let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
if cluster_id.deref() == stored_cluster_id {
return Ok(());
return Ok(false);
}

Err(ObjectError::internal(format!(
Expand All @@ -588,7 +619,7 @@ async fn write_exclusive_cluster_id(
object_store
.upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
.await?;
return Ok(());
return Ok(true);
}
Err(e.into())
}
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct InMemObjectStore {
impl ObjectStore for InMemObjectStore {
type StreamingUploader = InMemStreamingUploader;

fn get_object_prefix(&self, _obj_id: u64) -> String {
fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
String::default()
}

Expand Down
25 changes: 21 additions & 4 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ pub trait StreamingUploader: Send {
#[async_trait::async_trait]
pub trait ObjectStore: Send + Sync {
type StreamingUploader: StreamingUploader;
/// Get the key prefix for object
fn get_object_prefix(&self, obj_id: u64) -> String;
/// Get the key prefix for object, the prefix is determined by the type of object store and `devise_object_prefix`.
fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String;

/// Uploads the object to `ObjectStore`.
async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>;
Expand Down Expand Up @@ -325,8 +325,25 @@ impl ObjectStoreImpl {
object_store_impl_method_body!(self, list(prefix).await)
}

pub fn get_object_prefix(&self, obj_id: u64) -> String {
dispatch_object_store_enum!(self, |store| store.inner.get_object_prefix(obj_id))
pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
// FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore
// trait. Macro object_store_impl_method_body routes to local or remote only depending on
// the path
match self {
ObjectStoreImpl::InMem(store) => store
.inner
.get_object_prefix(obj_id, use_new_object_prefix_strategy),
ObjectStoreImpl::Opendal(store) => store
.inner
.get_object_prefix(obj_id, use_new_object_prefix_strategy),
ObjectStoreImpl::S3(store) => store
.inner
.get_object_prefix(obj_id, use_new_object_prefix_strategy),
#[cfg(madsim)]
ObjectStoreImpl::Sim(store) => store
.inner
.get_object_prefix(obj_id, use_new_object_prefix_strategy),
}
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn support_streaming_upload(&self) -> bool {
Expand Down
18 changes: 10 additions & 8 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ impl OpendalObjectStore {
impl ObjectStore for OpendalObjectStore {
type StreamingUploader = OpendalStreamingUploader;

fn get_object_prefix(&self, obj_id: u64) -> String {
fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
EngineType::Obs => String::default(),
EngineType::Oss => String::default(),
EngineType::Webhdfs => String::default(),
EngineType::Azblob => String::default(),
EngineType::Fs => String::default(),
EngineType::Hdfs
| EngineType::Gcs
| EngineType::Obs
| EngineType::Oss
| EngineType::Webhdfs
| EngineType::Azblob
| EngineType::Fs => {
prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
Comment on lines +80 to +87
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified that Fs and Hdfs worked after this change? I am not sure they will auto create the prefix dir for us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have verified that fs backend work.

}
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/object_store/src/object/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,23 @@ pub(crate) mod s3 {
obj_prefix
}
}

pub(crate) mod opendal_engine {
/// The number of Azblob bucket prefixes
pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256;

pub(crate) fn get_object_prefix(obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
// For OpenDAL object storage, whether objects are divided by prefixes depends on whether it is a new cluster:
// If it is a new cluster, objects will be divided into `NUM_BUCKET_PREFIXES_AZBLOB` prefixes.
// If it is an old cluster, prefixes are not used due to the need to read and write old data.
match use_new_object_prefix_strategy {
true => {
let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB;
let mut obj_prefix = prefix.to_string();
obj_prefix.push('/');
obj_prefix
}
false => String::default(),
}
}
}
3 changes: 2 additions & 1 deletion src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,9 @@ pub struct S3ObjectStore {
impl ObjectStore for S3ObjectStore {
type StreamingUploader = S3StreamingUploader;

fn get_object_prefix(&self, obj_id: u64) -> String {
fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
// Delegate to static method to avoid creating an `S3ObjectStore` in unit test.
// Using aws s3 sdk as object storage, the object prefix will be devised by default.
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
prefix::s3::get_object_prefix(obj_id)
}

Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/sim/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub struct SimObjectStore {
impl ObjectStore for SimObjectStore {
type StreamingUploader = SimStreamingUploader;

fn get_object_prefix(&self, _obj_id: u64) -> String {
fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
String::default()
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub async fn mock_sstable_store() -> SstableStoreRef {
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: false,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

meta_cache_v2,
block_cache_v2,
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ async fn generate_sstable_store(object_store: Arc<ObjectStoreImpl>) -> Arc<Sstab
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: false,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
meta_cache_v2,
block_cache_v2,
}))
Expand Down
1 change: 1 addition & 0 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub async fn prepare_start_parameters(
storage_opts.data_directory.to_string(),
0,
meta_cache_capacity_bytes,
system_params_reader.use_new_object_prefix_strategy(),
)
.await
// FIXME(MrCroxx): Handle this error.
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
max_prefetch_block_number: storage_opts.max_prefetch_block_number,
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
use_new_object_prefix_strategy: false,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
meta_cache_v2,
block_cache_v2,
}));
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> Ssta

recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: false,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

meta_cache_v2,
block_cache_v2,
Expand Down
Loading
Loading