Skip to content

Commit

Permalink
fix(object_store): apply config in meta node (#16199)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Apr 8, 2024
1 parent faef9dd commit a14ff37
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.developer
.enable_check_task_level_overlap,
enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
object_store_config: config.storage.object_store,
},
config.system.into_init_system_params(),
)
Expand Down
19 changes: 14 additions & 5 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ impl BackupManager {
store_dir: &str,
) -> MetaResult<Arc<Self>> {
let store_config = (store_url.to_string(), store_dir.to_string());
let store =
create_snapshot_store(&store_config, metrics.object_store_metric.clone()).await?;
let store = create_snapshot_store(
&store_config,
metrics.object_store_metric.clone(),
&env.opts.object_store_config,
)
.await?;
tracing::info!(
"backup manager initialized: url={}, dir={}",
store_config.0,
Expand Down Expand Up @@ -161,8 +165,12 @@ impl BackupManager {
}

pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
let new_store =
create_snapshot_store(&config, self.meta_metrics.object_store_metric.clone()).await?;
let new_store = create_snapshot_store(
&config,
self.meta_metrics.object_store_metric.clone(),
&self.env.opts.object_store_config,
)
.await?;
tracing::info!(
"new backup config is applied: url={}, dir={}",
config.0,
Expand Down Expand Up @@ -368,13 +376,14 @@ impl BackupWorker {
async fn create_snapshot_store(
config: &StoreConfig,
metric: Arc<ObjectStoreMetrics>,
object_store_config: &ObjectStoreConfig,
) -> MetaResult<ObjectStoreMetaSnapshotStorage> {
let object_store = Arc::new(
build_remote_object_store(
&config.0,
metric,
"Meta Backup",
ObjectStoreConfig::default(),
object_store_config.clone(),
)
.await,
);
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use risingwave_common::catalog::TableId;
use risingwave_common::config::default::compaction_config;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::monitor::rwlock::MonitoredRwLock;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
Expand Down Expand Up @@ -365,7 +364,7 @@ impl HummockManager {
let state_store_url = sys_params.state_store();
let state_store_dir: &str = sys_params.data_directory();
let deterministic_mode = env.opts.compaction_deterministic_test;
let mut object_store_config = ObjectStoreConfig::default();
let mut object_store_config = env.opts.object_store_config.clone();
// For fs and hdfs object store, operations are not always atomic.
// We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
object_store_config.set_atomic_write_dir();
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::ops::Deref;
use std::sync::Arc;

use risingwave_common::config::{CompactionConfig, DefaultParallelism};
use risingwave_common::config::{CompactionConfig, DefaultParallelism, ObjectStoreConfig};
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_meta_model_v2::prelude::Cluster;
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -212,6 +212,7 @@ pub struct MetaOpts {
/// l0 multi level picker whether to check the overlap accuracy between sub levels
pub enable_check_task_level_overlap: bool,
pub enable_dropped_column_reclaim: bool,
pub object_store_config: ObjectStoreConfig,
}

impl MetaOpts {
Expand Down Expand Up @@ -267,6 +268,7 @@ impl MetaOpts {
enable_trivial_move: true,
enable_check_task_level_overlap: true,
enable_dropped_column_reclaim: false,
object_store_config: ObjectStoreConfig::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ pub async fn build_remote_object_store(
ident: &str,
config: ObjectStoreConfig,
) -> ObjectStoreImpl {
tracing::debug!(config=?config, "object store {ident}");
match url {
s3 if s3.starts_with("s3://") => {
if config.s3.developer.use_opendal {
Expand Down
36 changes: 26 additions & 10 deletions src/storage/src/hummock/backup_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ type VersionHolder = (

async fn create_snapshot_store(
config: &StoreConfig,
object_store_config: &ObjectStoreConfig,
) -> StorageResult<ObjectStoreMetaSnapshotStorage> {
let backup_object_store = Arc::new(
build_remote_object_store(
&config.0,
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
ObjectStoreConfig::default(),
object_store_config.clone(),
)
.await,
);
Expand All @@ -69,41 +70,56 @@ pub struct BackupReader {
inflight_request: parking_lot::Mutex<HashMap<MetaSnapshotId, InflightRequest>>,
store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
refresh_tx: tokio::sync::mpsc::UnboundedSender<u64>,
object_store_config: ObjectStoreConfig,
}

impl BackupReader {
pub async fn new(storage_url: &str, storage_directory: &str) -> StorageResult<BackupReaderRef> {
pub async fn new(
storage_url: &str,
storage_directory: &str,
object_store_config: &ObjectStoreConfig,
) -> StorageResult<BackupReaderRef> {
let config = (storage_url.to_string(), storage_directory.to_string());
let store = create_snapshot_store(&config).await?;
let store = create_snapshot_store(&config, object_store_config).await?;
tracing::info!(
"backup reader is initialized: url={}, dir={}",
config.0,
config.1
);
Ok(Self::with_store((store, config)))
Ok(Self::with_store(
(store, config),
object_store_config.clone(),
))
}

fn with_store(store: (ObjectStoreMetaSnapshotStorage, StoreConfig)) -> BackupReaderRef {
fn with_store(
store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
object_store_config: ObjectStoreConfig,
) -> BackupReaderRef {
let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel();
let instance = Arc::new(Self {
store: ArcSwap::from_pointee(store),
versions: Default::default(),
inflight_request: Default::default(),
object_store_config,
refresh_tx,
});
tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx));
instance
}

pub async fn unused() -> BackupReaderRef {
Self::with_store((
risingwave_backup::storage::unused().await,
StoreConfig::default(),
))
Self::with_store(
(
risingwave_backup::storage::unused().await,
StoreConfig::default(),
),
ObjectStoreConfig::default(),
)
}

async fn set_store(&self, config: StoreConfig) -> StorageResult<()> {
let new_store = create_snapshot_store(&config).await?;
let new_store = create_snapshot_store(&config, &self.object_store_config).await?;
tracing::info!(
"backup reader is updated: url={}, dir={}",
config.0,
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl HummockStorage {
let backup_reader = BackupReader::new(
&options.backup_storage_url,
&options.backup_storage_directory,
&options.object_store_config,
)
.await
.map_err(HummockError::read_backup_error)?;
Expand Down

0 comments on commit a14ff37

Please sign in to comment.