Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object store): introduce object prefix for opendal object store #16542

Merged
merged 25 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal data_directory
internal parallel_compact_size_mb
internal sstable_size_mb
internal state_store
internal use_new_object_prefix_strategy
postmaster backup_storage_directory
postmaster backup_storage_url
postmaster barrier_interval_ms
Expand Down
1 change: 1 addition & 0 deletions proto/java_binding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ message ReadPlan {
catalog.Table table_catalog = 7;

repeated uint32 vnode_ids = 8;
bool use_new_object_prefix_strategy = 9;
}
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ message SystemParams {
optional bool pause_on_next_bootstrap = 13;
optional string wasm_storage_url = 14 [deprecated = true];
optional bool enable_tracing = 15;
optional bool use_new_object_prefix_strategy = 16;
}

message GetSystemParamsRequest {}
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ macro_rules! for_all_params {
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
{ use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
}
};
}
Expand Down Expand Up @@ -376,6 +377,7 @@ macro_rules! impl_system_params_for_test {
ret.state_store = Some("hummock+memory".to_string());
ret.backup_storage_url = Some("memory".into());
ret.backup_storage_directory = Some("backup".into());
ret.use_new_object_prefix_strategy = Some(false);
ret
}
};
Expand Down Expand Up @@ -441,6 +443,7 @@ mod tests {
(MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
(PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
(ENABLE_TRACING_KEY, "true"),
(USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
("a_deprecated_param", "foo"),
];

Expand Down
8 changes: 8 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ where
self.inner().data_directory.as_ref().unwrap()
}

fn use_new_object_prefix_strategy(&self) -> bool {
*self
.inner()
.use_new_object_prefix_strategy
.as_ref()
.unwrap()
}

fn backup_storage_url(&self) -> &str {
self.inner().backup_storage_url.as_ref().unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub async fn compute_node_serve(
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, &opts);

info!("Starting compute node",);
info!("> config: {:?}", config);
info!(
Expand Down Expand Up @@ -211,6 +210,7 @@ pub async fn compute_node_serve(
storage_metrics.clone(),
compactor_metrics.clone(),
await_tree_config.clone(),
system_params.use_new_object_prefix_strategy(),
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,4 @@ This page is automatically generated by `./risedev generate-example-config`
| pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false |
| sstable_size_mb | Target size of the Sstable. | 256 |
| state_store | URL for the state store | |
| use_new_object_prefix_strategy | Whether to split object prefix. | |
8 changes: 7 additions & 1 deletion src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub enum BenchCommands {
#[clap(long, default_value_t = 1)]
threads: usize,
data_dir: Option<String>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
}

Expand Down Expand Up @@ -86,9 +88,13 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
mv_name,
threads,
data_dir,
use_new_object_prefix_strategy,
} => {
let (hummock, metrics) = context
.hummock_store_with_metrics(HummockServiceOpts::from_env(data_dir)?)
.hummock_store_with_metrics(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog(meta.clone(), mv_name).await?;
let mut handlers = vec![];
Expand Down
6 changes: 5 additions & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ pub async fn list_kv(
epoch: u64,
table_id: u32,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
if is_max_epoch(epoch) {
tracing::info!("using MAX EPOCH as epoch");
Expand Down
16 changes: 13 additions & 3 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct SstDumpArgs {
print_table: bool,
#[clap(short = 'd')]
data_dir: Option<String>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
}

pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
Expand All @@ -72,7 +74,10 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
if args.print_level {
// Level information is retrieved from meta service
let hummock = context
.hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?)
.hummock_store(HummockServiceOpts::from_env(
args.data_dir.clone(),
args.use_new_object_prefix_strategy,
)?)
.await?;
let version = hummock.inner().get_pinned_version().version().clone();
let sstable_store = hummock.sstable_store();
Expand Down Expand Up @@ -108,8 +113,13 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}
} else {
// Object information is retrieved from object store. Meta service is not required.
let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?;
let sstable_store = hummock_service_opts.create_sstable_store().await?;
let hummock_service_opts = HummockServiceOpts::from_env(
args.data_dir.clone(),
args.use_new_object_prefix_strategy,
)?;
let sstable_store = hummock_service_opts
.create_sstable_store(args.use_new_object_prefix_strategy)
.await?;
if let Some(obj_id) = &args.object_id {
let obj_store = sstable_store.store();
let obj_path = sstable_store.get_sst_data_path(*obj_id);
Expand Down
8 changes: 6 additions & 2 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ pub async fn print_user_key_in_archive(
archive_ids: Vec<HummockVersionId>,
data_dir: String,
user_key: String,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
panic!("cannot decode user key {} into raw bytes", user_key);
});
let user_key = UserKey::decode(&user_key_bytes);
println!("user key: {user_key:?}");

let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
let hummock_opts =
HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
let hummock = context.hummock_store(hummock_opts).await?;
let sstable_store = hummock.sstable_store();
let archive_object_store = sstable_store.store();
Expand Down Expand Up @@ -178,8 +180,10 @@ pub async fn print_version_delta_in_archive(
archive_ids: Vec<HummockVersionId>,
data_dir: String,
sst_id: HummockSstableObjectId,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
let hummock_opts =
HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
let hummock = context.hummock_store(hummock_opts).await?;
let sstable_store = hummock.sstable_store();
let archive_object_store = sstable_store.store();
Expand Down
24 changes: 20 additions & 4 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,35 @@ pub fn make_storage_table<S: StateStore>(
))
}

pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option<String>) -> Result<()> {
pub async fn scan(
context: &CtlContext,
mv_name: String,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<()> {
let meta_client = context.meta_client().await?;
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog(meta_client, mv_name).await?;
do_scan(table, hummock).await
}

pub async fn scan_id(context: &CtlContext, table_id: u32, data_dir: Option<String>) -> Result<()> {
pub async fn scan_id(
context: &CtlContext,
table_id: u32,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<()> {
let meta_client = context.meta_client().await?;
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog_by_id(meta_client, table_id).await?;
do_scan(table, hummock).await
Expand Down
16 changes: 14 additions & 2 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct HummockServiceOpts {
pub hummock_url: String,
pub data_dir: Option<String>,

use_new_object_prefix_strategy: bool,

heartbeat_handle: Option<JoinHandle<()>>,
heartbeat_shutdown_sender: Option<Sender<()>>,
}
Expand All @@ -55,7 +57,10 @@ impl HummockServiceOpts {
/// Currently, we will read these variables for meta:
///
/// * `RW_HUMMOCK_URL`: hummock store address
pub fn from_env(data_dir: Option<String>) -> Result<Self> {
pub fn from_env(
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<Self> {
let hummock_url = match env::var("RW_HUMMOCK_URL") {
Ok(url) => {
if !url.starts_with("hummock+") {
Expand All @@ -80,11 +85,13 @@ impl HummockServiceOpts {
bail!(MESSAGE);
}
};

Ok(Self {
hummock_url,
data_dir,
heartbeat_handle: None,
heartbeat_shutdown_sender: None,
use_new_object_prefix_strategy,
})
}

Expand Down Expand Up @@ -142,6 +149,7 @@ impl HummockServiceOpts {
metrics.storage_metrics.clone(),
metrics.compactor_metrics.clone(),
None,
self.use_new_object_prefix_strategy,
)
.await?;

Expand All @@ -157,7 +165,10 @@ impl HummockServiceOpts {
}
}

pub async fn create_sstable_store(&self) -> Result<Arc<SstableStore>> {
pub async fn create_sstable_store(
&self,
use_new_object_prefix_strategy: bool,
) -> Result<Arc<SstableStore>> {
let object_store = build_remote_object_store(
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
Expand Down Expand Up @@ -190,6 +201,7 @@ impl HummockServiceOpts {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
use_new_object_prefix_strategy,
meta_cache,
block_cache,
})))
Expand Down
Loading
Loading