Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object store): introduce object prefix for opendal object store #16542

Merged
merged 25 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal data_directory
internal parallel_compact_size_mb
internal sstable_size_mb
internal state_store
internal use_new_object_prefix_strategy
postmaster backup_storage_directory
postmaster backup_storage_url
postmaster barrier_interval_ms
Expand Down
1 change: 1 addition & 0 deletions proto/java_binding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ message ReadPlan {
catalog.Table table_catalog = 7;

repeated uint32 vnode_ids = 8;
bool use_new_object_prefix_strategy = 9;
}
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ message SystemParams {
optional bool pause_on_next_bootstrap = 13;
optional string wasm_storage_url = 14 [deprecated = true];
optional bool enable_tracing = 15;
optional bool use_new_object_prefix_strategy = 16;
}

message GetSystemParamsRequest {}
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ macro_rules! for_all_params {
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
{ use_new_object_prefix_strategy, bool, Some(false), false, "Whether to split object prefix.", },
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}
};
}
Expand Down Expand Up @@ -441,6 +442,7 @@ mod tests {
(MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
(PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
(ENABLE_TRACING_KEY, "true"),
(USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
("a_deprecated_param", "foo"),
];

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ where
self.inner().data_directory.as_ref().unwrap()
}

fn use_new_object_prefix_strategy(&self) -> bool {
self.inner().use_new_object_prefix_strategy.unwrap_or(true)
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

fn backup_storage_url(&self) -> &str {
self.inner().backup_storage_url.as_ref().unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub async fn compute_node_serve(
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, &opts);

info!("Starting compute node",);
info!("> config: {:?}", config);
info!(
Expand Down Expand Up @@ -208,6 +207,7 @@ pub async fn compute_node_serve(
storage_metrics.clone(),
compactor_metrics.clone(),
await_tree_config.clone(),
system_params.use_new_object_prefix_strategy(),
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,4 @@ This page is automatically generated by `./risedev generate-example-config`
| pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false |
| sstable_size_mb | Target size of the Sstable. | 256 |
| state_store | URL for the state store | |
| use_new_object_prefix_strategy | Whether to split object prefix. | false |
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,4 @@ bloom_false_positive = 0.001
max_concurrent_creating_streaming_jobs = 1
pause_on_next_bootstrap = false
enable_tracing = false
use_new_object_prefix_strategy = false
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct SstDumpArgs {
print_table: bool,
#[clap(short = 'd')]
data_dir: Option<String>,
#[clap(short, long = "use_new_object_prefix_strategy")]
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
use_new_object_prefix_strategy: Option<bool>,
}

pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
Expand Down Expand Up @@ -109,7 +111,9 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
} else {
// Object information is retrieved from object store. Meta service is not required.
let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?;
let sstable_store = hummock_service_opts.create_sstable_store().await?;
let sstable_store = hummock_service_opts
.create_sstable_store(args.use_new_object_prefix_strategy.unwrap_or(true))
.await?;
if let Some(obj_id) = &args.object_id {
let obj_store = sstable_store.store();
let obj_path = sstable_store.get_sst_data_path(*obj_id);
Expand Down
15 changes: 14 additions & 1 deletion src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct HummockServiceOpts {
pub hummock_url: String,
pub data_dir: Option<String>,

use_new_object_prefix_strategy: bool,

heartbeat_handle: Option<JoinHandle<()>>,
heartbeat_shutdown_sender: Option<Sender<()>>,
}
Expand Down Expand Up @@ -80,11 +82,17 @@ impl HummockServiceOpts {
bail!(MESSAGE);
}
};
let use_new_object_prefix_strategy = match env::var("RW_USE_NEW_OBJECT_PREFIX_STRATEGY") {
Ok(use_new_object_prefix_strategy) => use_new_object_prefix_strategy == "true",
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
_ => false,
};

Ok(Self {
hummock_url,
data_dir,
heartbeat_handle: None,
heartbeat_shutdown_sender: None,
use_new_object_prefix_strategy,
})
}

Expand Down Expand Up @@ -142,6 +150,7 @@ impl HummockServiceOpts {
metrics.storage_metrics.clone(),
metrics.compactor_metrics.clone(),
None,
self.use_new_object_prefix_strategy,
)
.await?;

Expand All @@ -157,7 +166,10 @@ impl HummockServiceOpts {
}
}

pub async fn create_sstable_store(&self) -> Result<Arc<SstableStore>> {
pub async fn create_sstable_store(
&self,
use_new_object_prefix_strategy: bool,
) -> Result<Arc<SstableStore>> {
let object_store = build_remote_object_store(
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
Expand Down Expand Up @@ -190,6 +202,7 @@ impl HummockServiceOpts {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
use_new_object_prefix_strategy,
meta_cache_v2,
block_cache_v2,
})))
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl HummockJavaBindingIterator {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy,
meta_cache_v2,
block_cache_v2,
}));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::enum_variant_names)]

pub use sea_orm_migration::prelude::*;

pub use sea_orm_migration::MigrationStatus;
mod m20230908_072257_init;
mod m20231008_020431_hummock;
mod m20240304_074901_subscription;
Expand Down
17 changes: 16 additions & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::config::{
};
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
use risingwave_meta_model_v2::prelude::Cluster;
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef};
Expand Down Expand Up @@ -416,11 +417,25 @@ impl MetaSrvEnv {
.await?
.map(|c| c.cluster_id.to_string().into())
.unwrap();
let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?;
let mut cluster_first_launch = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yezizp2012 PTAL for this part, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the status before calling Migrator::up. At this point all migrations has already been applied. Please add a helper function to get whether the cluster is first launched, together with some necessary comments.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the status before calling Migrator::up. At this point all migrations has already been applied. Please add a helper function to get whether the cluster is first launched, together with some necessary comments.

+1. If we check get_applied_migrations here, it will contain "m20230908_072257_init" for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, and since we explicitly specify that use_new_prefix_strategy should be passed in, there is no need to make any further judgment for ctl, right?

// If `m20230908_072257_init` has been applied, it is the old cluster
for migration in migrations {
if migration.name() == "m20230908_072257_init"
&& migration.status() == MigrationStatus::Applied
{
cluster_first_launch = false;
}
}
let mut system_params = init_system_params;
// For new clusters, the name of the object store needs to be prefixed according to the object id.
// For old clusters, the prefix is ​​not divided for the sake of compatibility.
system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
let system_param_controller = Arc::new(
SystemParamsController::new(
sql_meta_store.clone(),
notification_manager.clone(),
init_system_params,
system_params,
)
.await?,
);
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl SystemParamsManager {
init_params: SystemParams,
cluster_first_launch: bool,
) -> MetaResult<Self> {
let params = if cluster_first_launch {
let mut params = if cluster_first_launch {
init_params
} else if let Some(persisted) = SystemParams::get(&meta_store).await? {
merge_params(persisted, init_params)
Expand All @@ -66,6 +66,10 @@ impl SystemParamsManager {
));
};

// For new clusters, the name of the object store needs to be prefixed according to the object id.
// For old clusters, the prefix is ​​not divided for the sake of compatibility.
params.use_new_object_prefix_strategy = Some(cluster_first_launch);

info!("system parameters: {:?}", params);
check_missing_params(&params).map_err(|e| anyhow!(e))?;

Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct InMemObjectStore {
impl ObjectStore for InMemObjectStore {
type StreamingUploader = InMemStreamingUploader;

fn get_object_prefix(&self, _obj_id: u64) -> String {
fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
String::default()
}

Expand Down
10 changes: 6 additions & 4 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ pub trait StreamingUploader: Send {
#[async_trait::async_trait]
pub trait ObjectStore: Send + Sync {
type StreamingUploader: StreamingUploader;
/// Get the key prefix for object
fn get_object_prefix(&self, obj_id: u64) -> String;
/// Get the key prefix for object, the prefix is determined by the type of object store and `devise_object_prefix`.
fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String;

/// Uploads the object to `ObjectStore`.
async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>;
Expand Down Expand Up @@ -325,8 +325,10 @@ impl ObjectStoreImpl {
object_store_impl_method_body!(self, list(prefix).await)
}

pub fn get_object_prefix(&self, obj_id: u64) -> String {
dispatch_object_store_enum!(self, |store| store.inner.get_object_prefix(obj_id))
pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
dispatch_object_store_enum!(self, |store| store
.inner
.get_object_prefix(obj_id, use_new_object_prefix_strategy))
}

pub fn support_streaming_upload(&self) -> bool {
Expand Down
18 changes: 10 additions & 8 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ impl OpendalObjectStore {
impl ObjectStore for OpendalObjectStore {
type StreamingUploader = OpendalStreamingUploader;

fn get_object_prefix(&self, obj_id: u64) -> String {
fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
EngineType::Obs => String::default(),
EngineType::Oss => String::default(),
EngineType::Webhdfs => String::default(),
EngineType::Azblob => String::default(),
EngineType::Fs => String::default(),
EngineType::Hdfs
| EngineType::Gcs
| EngineType::Obs
| EngineType::Oss
| EngineType::Webhdfs
| EngineType::Azblob
| EngineType::Fs => {
prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
Comment on lines +80 to +87
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified that Fs and Hdfs worked after this change? I am not sure they will auto create the prefix dir for us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have verified that fs backend work.

}
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/object_store/src/object/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,23 @@ pub(crate) mod s3 {
obj_prefix
}
}

pub(crate) mod opendal_engine {
/// The number of Azblob bucket prefixes
pub(crate) const NUM_BUCKET_PREFIXES_AZBLOB: u32 = 256;

pub(crate) fn get_object_prefix(obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
// For OpenDAL object storage, whether objects are divided by prefixes depends on whether it is a new cluster:
// If it is a new cluster, objects will be divided into `NUM_BUCKET_PREFIXES_AZBLOB` prefixes.
// If it is an old cluster, prefixes are not used due to the need to read and write old data.
match use_new_object_prefix_strategy {
true => {
let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES_AZBLOB;
let mut obj_prefix = prefix.to_string();
obj_prefix.push('/');
obj_prefix
}
false => String::default(),
}
}
}
3 changes: 2 additions & 1 deletion src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,9 @@ pub struct S3ObjectStore {
impl ObjectStore for S3ObjectStore {
type StreamingUploader = S3StreamingUploader;

fn get_object_prefix(&self, obj_id: u64) -> String {
fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
// Delegate to static method to avoid creating an `S3ObjectStore` in unit test.
// Using aws s3 sdk as object storage, the object prefix will be divided by default.
prefix::s3::get_object_prefix(obj_id)
}

Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/sim/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub struct SimObjectStore {
impl ObjectStore for SimObjectStore {
type StreamingUploader = SimStreamingUploader;

fn get_object_prefix(&self, _obj_id: u64) -> String {
fn get_object_prefix(&self, _obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
String::default()
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub async fn mock_sstable_store() -> SstableStoreRef {
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: true,

meta_cache_v2,
block_cache_v2,
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ async fn generate_sstable_store(object_store: Arc<ObjectStoreImpl>) -> Arc<Sstab
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: true,
meta_cache_v2,
block_cache_v2,
}))
Expand Down
1 change: 1 addition & 0 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub async fn prepare_start_parameters(
storage_opts.data_directory.to_string(),
0,
meta_cache_capacity_bytes,
system_params_reader.use_new_object_prefix_strategy(),
)
.await
// FIXME(MrCroxx): Handle this error.
Expand Down
4 changes: 4 additions & 0 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ struct Args {

#[arg(short, long)]
object_storage: String,

#[arg(short, long)]
use_new_object_prefix_strategy: bool,
}

#[tokio::main(flavor = "multi_thread")]
Expand Down Expand Up @@ -132,6 +135,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
max_prefetch_block_number: storage_opts.max_prefetch_block_number,
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
use_new_object_prefix_strategy: args.use_new_object_prefix_strategy,
meta_cache_v2,
block_cache_v2,
}));
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> Ssta

recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: true,

meta_cache_v2,
block_cache_v2,
Expand Down
Loading
Loading