diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 01165a030a104..084e2358cee69 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -353,13 +353,6 @@ pub struct StorageConfig { #[serde(default = "default::storage::disable_remote_compactor")] pub disable_remote_compactor: bool, - #[serde(default = "default::storage::enable_local_spill")] - pub enable_local_spill: bool, - - /// Local object store root. We should call `get_local_object_store` to get the object store. - #[serde(default = "default::storage::local_object_store")] - pub local_object_store: String, - /// Number of tasks shared buffer can upload in parallel. #[serde(default = "default::storage::share_buffer_upload_concurrency")] pub share_buffer_upload_concurrency: usize, @@ -675,14 +668,6 @@ mod default { false } - pub fn enable_local_spill() -> bool { - true - } - - pub fn local_object_store() -> String { - "tempdisk".to_string() - } - pub fn share_buffer_upload_concurrency() -> usize { 8 } diff --git a/src/config/example.toml b/src/config/example.toml index b9bc5c77582fe..0b97db80d02cc 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -53,8 +53,6 @@ share_buffer_compaction_worker_threads_number = 4 imm_merge_threshold = 4 write_conflict_detection_enabled = true disable_remote_compactor = false -enable_local_spill = true -local_object_store = "tempdisk" share_buffer_upload_concurrency = 8 sstable_id_remote_fetch_number = 10 min_sst_size_for_streaming_upload = 33554432 diff --git a/src/object_store/src/object/disk.rs b/src/object_store/src/object/disk.rs deleted file mode 100644 index 3ed98da84e03c..0000000000000 --- a/src/object_store/src/object/disk.rs +++ /dev/null @@ -1,703 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::DefaultHasher; -use std::fs::File; -use std::hash::{Hash, Hasher}; -use std::io::{Error, ErrorKind}; -use std::os::unix::fs::FileExt; -use std::path::PathBuf; -use std::sync::Arc; - -use bytes::Bytes; -use futures::future::try_join_all; -use risingwave_common::cache::{CachePriority, CacheableEntry, LruCache}; -use tokio::io::{AsyncRead, AsyncWriteExt}; - -use super::{ - BlockLocation, BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectResult, ObjectStore, -}; - -pub(super) mod utils { - use std::fs::Metadata; - use std::path::Path; - use std::time::{Duration, SystemTime}; - - use tokio::fs::{create_dir_all, OpenOptions}; - - use super::OpenReadFileHolder; - use crate::object::{ObjectError, ObjectResult}; - - pub async fn ensure_file_dir_exists(path: &Path) -> ObjectResult<()> { - if let Some(dir) = path.parent() { - // `create_dir_all` will not error even if the directory already exists. - create_dir_all(dir).await.map_err(|e| { - ObjectError::disk(format!("unable to create dir: {:?}.", dir.to_str(),), e) - })?; - } - Ok(()) - } - - pub async fn open_file( - path: &Path, - enable_read: bool, - enable_write: bool, - create_new: bool, - ) -> ObjectResult { - ensure_file_dir_exists(path).await?; - OpenOptions::new() - .read(enable_read) - .write(enable_write) - .create_new(create_new) - .open(path) - .await - .map_err(|err| { - ObjectError::disk(format!("Failed to open file {:?}", path.to_str(),), err) - }) - } - - pub async fn asyncify(f: F) -> ObjectResult - where - T: Send + 'static, - F: FnOnce() -> ObjectResult + Send + 'static, - { - #[cfg_attr(madsim, expect(deprecated))] - tokio::task::spawn_blocking(f).await.map_err(|e| { - ObjectError::internal(format!("Fail to join a blocking-spawned task: {}", e)) - })? - } - - pub async fn get_metadata(file: OpenReadFileHolder) -> ObjectResult { - asyncify(move || { - file.value() - .metadata() - .map_err(|err| ObjectError::disk("Failed to get metadata.".to_string(), err)) - }) - .await - } - - pub fn get_last_modified_timestamp_since_unix_epoch( - metadata: &Metadata, - ) -> ObjectResult { - metadata - .modified() - .map_err(|err| { - ObjectError::disk("Last modification metadata not available".to_string(), err) - })? - .duration_since(SystemTime::UNIX_EPOCH) - .map_err(ObjectError::internal) - } - - pub fn get_path_str(path: &Path) -> ObjectResult { - path.to_str() - .map(|s| s.to_owned()) - .ok_or_else(|| ObjectError::internal("Don't support non-UTF-8 in path")) - } -} - -pub type OpenReadFileHolder = Arc>; - -pub struct DiskObjectStore { - path_prefix: String, - opened_read_file_cache: Arc>, -} - -const OPENED_FILE_CACHE_DEFAULT_NUM_SHARD_BITS: usize = 2; -const OPENED_FILE_CACHE_DEFAULT_CAPACITY: usize = 1024; - -impl DiskObjectStore { - pub fn new(path_prefix: &str) -> DiskObjectStore { - DiskObjectStore { - path_prefix: path_prefix.to_string(), - opened_read_file_cache: Arc::new(LruCache::new( - OPENED_FILE_CACHE_DEFAULT_NUM_SHARD_BITS, - OPENED_FILE_CACHE_DEFAULT_CAPACITY, - 0, - )), - } - } - - pub fn new_file_path(&self, path: &str) -> ObjectResult { - if path.starts_with('/') { - return Err(ObjectError::disk( - "".to_string(), - std::io::Error::new(std::io::ErrorKind::Other, "path should not start with /"), - )); - }; - let mut ret = PathBuf::from(&self.path_prefix); - ret.push(path); - Ok(ret) - } - - pub async fn get_read_file(&self, path: &str) -> ObjectResult { - let path = self.new_file_path(path)?; - let hash = { - let mut hasher = DefaultHasher::default(); - path.hash(&mut hasher); - hasher.finish() - }; - let entry = self - .opened_read_file_cache - .lookup_with_request_dedup::<_, ObjectError, _>( - hash, - path.clone(), - CachePriority::High, - move || async move { - let file = utils::open_file(&path, true, false, false) - .await? - .into_std() - .await; - Ok((file, 1)) - }, - ) - .await?; - Ok(Arc::new(entry)) - } -} - -#[async_trait::async_trait] -impl ObjectStore for DiskObjectStore { - fn get_object_prefix(&self, _obj_id: u64) -> String { - String::default() - } - - async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { - if obj.is_empty() { - Err(ObjectError::internal("upload empty object")) - } else { - let mut file = - utils::open_file(self.new_file_path(path)?.as_path(), false, true, true).await?; - file.write_all(&obj) - .await - .map_err(|e| ObjectError::disk(format!("failed to write {}", path), e))?; - file.flush() - .await - .map_err(|e| ObjectError::disk(format!("failed to flush {}", path), e))?; - Ok(()) - } - } - - async fn streaming_upload(&self, _path: &str) -> ObjectResult { - unimplemented!("streaming upload is not implemented for disk object store"); - } - - async fn read(&self, path: &str, block_loc: Option) -> ObjectResult { - match block_loc { - Some(block_loc) => Ok(self.readv(path, &[block_loc]).await?.pop().unwrap()), - None => { - let file_holder = self.get_read_file(path).await?; - let metadata = utils::get_metadata(file_holder.clone()).await?; - let path_owned = path.to_owned(); - utils::asyncify(move || { - let mut buf = vec![0; metadata.len() as usize]; - file_holder - .value() - .read_exact_at(&mut buf, 0) - .map_err(|e| { - ObjectError::disk( - format!("failed to read the whole file {}", path_owned), - e, - ) - })?; - Ok(Bytes::from(buf)) - }) - .await - } - } - } - - async fn readv(&self, path: &str, block_locs: &[BlockLocation]) -> ObjectResult> { - let file_holder = self.get_read_file(path).await?; - let metadata = utils::get_metadata(file_holder.clone()).await?; - for block_loc in block_locs { - if block_loc.offset + block_loc.size > metadata.len() as usize { - return Err(ObjectError::disk( - "".to_string(), - Error::new( - ErrorKind::Other, - format!( - "block location {:?} is out of bounds for file of len {}", - block_loc, - metadata.len() - ), - ), - )); - } - } - let mut ret = Vec::with_capacity(block_locs.len()); - for block_loc_ref in block_locs { - let file_holder = file_holder.clone(); - let path_owned = path.to_owned(); - let block_loc = *block_loc_ref; - let future = utils::asyncify(move || { - let mut buf = vec![0; block_loc.size]; - file_holder - .value() - .read_exact_at(&mut buf, block_loc.offset as u64) - .map_err(|e| { - ObjectError::disk( - format!( - "failed to read file {} at offset {} for size {}", - path_owned, block_loc.offset, block_loc.size - ), - e, - ) - })?; - Ok(Bytes::from(buf)) - }); - ret.push(future) - } - - try_join_all(ret).await - } - - /// **Currently not implemented!** - /// - /// Returns a stream reading the object specified in `path`. If given, the stream starts at the - /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount - /// of data into memory that is read from the stream. - async fn streaming_read( - &self, - _path: &str, - _start_pos: Option, - ) -> ObjectResult> { - unimplemented!() - } - - async fn metadata(&self, path: &str) -> ObjectResult { - let file_holder = self.get_read_file(path).await?; - let metadata = utils::get_metadata(file_holder).await?; - Ok(ObjectMetadata { - key: path.to_owned(), - last_modified: utils::get_last_modified_timestamp_since_unix_epoch(&metadata)? - .as_secs_f64(), - total_size: metadata.len() as usize, - }) - } - - async fn delete(&self, path: &str) -> ObjectResult<()> { - let result = tokio::fs::remove_file(self.new_file_path(path)?.as_path()).await; - - // Note that S3 storage considers deleting successful if the `path` does not refer to an - // existing object. We therefore ignore if a file does not exist to ensures that - // `DiskObjectStore::delete()` behaves the same way as `S3ObjectStore::delete()`. - if let Err(e) = &result && e.kind() == ErrorKind::NotFound { - Ok(()) - } else { - result.map_err(|e| ObjectError::disk(format!("failed to delete {}", path), e)) - } - } - - /// Deletes the objects with the given paths permanently from the storage. If an object - /// specified in the request is not found, it will be considered as successfully deleted. - /// - /// Calling this function is equivalent to calling `delete` individually for each given path. - async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { - for path in paths { - self.delete(path).await? - } - Ok(()) - } - - async fn list(&self, prefix: &str) -> ObjectResult> { - let mut list_result = vec![]; - let mut path_to_walk = vec![]; - let common_prefix = { - let mut common_prefix = PathBuf::from(&self.path_prefix); - common_prefix.push(prefix.trim_start_matches(std::path::MAIN_SEPARATOR)); - utils::get_path_str(common_prefix.as_path())?.to_owned() - }; - path_to_walk.push(PathBuf::from(&self.path_prefix)); - while let Some(path) = path_to_walk.pop() { - let mut entries = tokio::fs::read_dir(path.as_path()).await.map_err(|err| { - ObjectError::disk( - format!("Failed to read dir {}", path.to_str().unwrap_or("")), - err, - ) - })?; - while let Some(entry) = entries - .next_entry() - .await - .map_err(|err| ObjectError::disk(format!("Failed to list {}", prefix), err))? - { - let metadata = entry - .metadata() - .await - .map_err(|err| ObjectError::disk("Failed to get metadata.".to_string(), err))?; - let entry_path = utils::get_path_str(entry.path().as_path())?; - if metadata.is_symlink() { - tracing::warn!("Skip symlink {}", entry_path); - continue; - } - if metadata.is_dir() { - if entry_path.starts_with(&common_prefix) - || common_prefix.starts_with(&entry_path) - { - path_to_walk.push(entry.path()); - } - continue; - } - if !entry_path.starts_with(&common_prefix) { - continue; - } - list_result.push(ObjectMetadata { - key: entry_path - .trim_start_matches(&self.path_prefix) - .trim_start_matches(std::path::MAIN_SEPARATOR) - .to_owned(), - last_modified: utils::get_last_modified_timestamp_since_unix_epoch(&metadata)? - .as_secs_f64(), - total_size: metadata.len() as usize, - }); - } - } - list_result.sort_by(|a, b| Ord::cmp(&a.key, &b.key)); - Ok(list_result) - } - - fn store_media_type(&self) -> &'static str { - "disk" - } -} - -#[cfg(test)] -mod tests { - use std::fs::OpenOptions; - use std::io::Read; - use std::path::PathBuf; - - use bytes::Bytes; - use itertools::{enumerate, Itertools}; - use tempfile::TempDir; - - use crate::object::disk::DiskObjectStore; - use crate::object::{BlockLocation, ObjectStore}; - - fn gen_test_payload() -> Vec { - let mut ret = Vec::new(); - for i in 0..100000 { - ret.extend(format!("{:05}", i).as_bytes()); - } - ret - } - - fn check_payload(payload: &[u8], path: &str) { - let mut file = OpenOptions::new().read(true).open(path).unwrap(); - assert_eq!(payload.len(), file.metadata().unwrap().len() as usize); - let mut buf = Vec::new(); - file.read_to_end(&mut buf).unwrap(); - assert_eq!(payload, &buf[..]); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_simple_upload() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - assert!(store - .metadata("not_exist.obj") - .await - .unwrap_err() - .is_object_not_found_error()); - let metadata = store.metadata("test.obj").await.unwrap(); - assert_eq!(payload.len(), metadata.total_size); - - let mut path = PathBuf::from(test_root_path); - path.push("test.obj"); - check_payload(&payload, path.to_str().unwrap()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_multi_level_dir_upload() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("1/2/test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - let metadata = store.metadata("1/2/test.obj").await.unwrap(); - assert_eq!(payload.len(), metadata.total_size); - - let mut path = PathBuf::from(test_root_path); - path.push("1/2/test.obj"); - check_payload(&payload, path.to_str().unwrap()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_read_all() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - let metadata = store.metadata("test.obj").await.unwrap(); - assert_eq!(payload.len(), metadata.total_size); - let read_data = store.read("test.obj", None).await.unwrap(); - assert_eq!(payload, &read_data[..]); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_read_partial() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - let metadata = store.metadata("test.obj").await.unwrap(); - assert_eq!(payload.len(), metadata.total_size); - let read_data = store - .read( - "test.obj", - Some(BlockLocation { - offset: 10000, - size: 1000, - }), - ) - .await - .unwrap(); - assert_eq!(&payload[10000..11000], &read_data[..]); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_read_multi_block() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - let metadata = store.metadata("test.obj").await.unwrap(); - assert_eq!(payload.len(), metadata.total_size); - let test_loc = vec![(0, 1000), (10000, 1000), (20000, 1000)]; - let read_data = store - .readv( - "test.obj", - &test_loc - .iter() - .map(|(offset, size)| BlockLocation { - offset: *offset, - size: *size, - }) - .collect_vec(), - ) - .await - .unwrap(); - assert_eq!(test_loc.len(), read_data.len()); - for (i, (offset, size)) in test_loc.iter().enumerate() { - assert_eq!(&payload[*offset..(*offset + *size)], &read_data[i][..]); - } - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_delete() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - let mut path = PathBuf::from(test_root_path); - path.push("test.obj"); - assert!(path.exists()); - store.delete("test.obj").await.unwrap(); - assert!(!path.exists()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_delete_objects() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - - // The number of files that will be created and uploaded to storage. - const REAL_COUNT: usize = 2; - - // The number of files that we do not create but still try to delete. - const FAKE_COUNT: usize = 2; - - let mut name_list = vec![]; - let mut path_list = vec![]; - - for i in 0..(REAL_COUNT + FAKE_COUNT) { - let file_name = format!("test{}.obj", i); - name_list.push(file_name.clone()); - - let mut path = PathBuf::from(test_root_path); - path.push(file_name); - path_list.push(path); - } - - // Upload data. - for file_name in name_list.iter().take(REAL_COUNT) { - store - .upload(file_name.as_str(), Bytes::from(payload.clone())) - .await - .unwrap(); - } - - // Verify that files do or do not exist. - for (i, path) in path_list.iter().enumerate() { - assert!((i < REAL_COUNT) == path.exists()); - } - - store.delete_objects(&name_list).await.unwrap(); - - for path in path_list { - assert!(!path.exists()); - } - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_read_not_exists() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let err = store.read("non-exist.obj", None).await.unwrap_err(); - assert!(err.is_object_not_found_error()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_read_out_of_range() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - store - .upload("test.obj", Bytes::from(payload.clone())) - .await - .unwrap(); - assert_eq!(payload.len(), 500000); - assert!(store - .read( - "test.obj", - Some(BlockLocation { - offset: 499999, - size: 1, - }) - ) - .await - .is_ok()); - assert!(store - .read( - "test.obj", - Some(BlockLocation { - offset: 499999, - size: 2, - }) - ) - .await - .is_err()); - assert!(store - .readv( - "test.obj", - &[ - BlockLocation { - offset: 499999, - size: 2, - }, - BlockLocation { - offset: 10000, - size: 2, - } - ] - ) - .await - .is_err()); - } - - #[tokio::test] - async fn test_invalid_path() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - // path is not allowed to be started with '/' - assert!(store - .upload("/test.obj", Bytes::from(payload)) - .await - .is_err()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // TODO: remove this when madsim supports fs - async fn test_list() { - let test_dir = TempDir::new().unwrap(); - let test_root_path = test_dir.path().to_str().unwrap(); - let store = DiskObjectStore::new(test_root_path); - let payload = gen_test_payload(); - assert!(store.list("").await.unwrap().is_empty()); - assert!(store.list("/").await.unwrap().is_empty()); - - let paths = vec!["001/002/test.obj", "001/003/test.obj"]; - for (i, path) in enumerate(paths.clone()) { - assert_eq!(store.list("").await.unwrap().len(), i); - store - .upload(path, Bytes::from(payload.clone())) - .await - .unwrap(); - assert_eq!(store.list("").await.unwrap().len(), i + 1); - } - - let list_path = store - .list("") - .await - .unwrap() - .iter() - .map(|p| p.key.clone()) - .collect_vec(); - assert_eq!(list_path, paths); - - for i in 0..=5 { - assert_eq!(store.list(&paths[0][0..=i]).await.unwrap().len(), 2); - } - for i in 6..=paths[0].len() - 1 { - assert_eq!(store.list(&paths[0][0..=i]).await.unwrap().len(), 1); - } - assert!(store.list("003").await.unwrap().is_empty()); - assert_eq!(store.list("/").await.unwrap().len(), 2); - - for (i, path) in enumerate(paths.clone()) { - assert_eq!(store.list("").await.unwrap().len(), paths.len() - i); - store.delete(path).await.unwrap(); - assert_eq!(store.list("").await.unwrap().len(), paths.len() - i - 1); - } - } -} diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 89ae6e3816cc9..a8e9d796f39c1 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -28,75 +28,29 @@ pub mod s3; use await_tree::InstrumentAwait; pub use s3::*; -mod disk; pub mod error; pub mod object_metrics; pub use error::*; use object_metrics::ObjectStoreMetrics; -use crate::object::disk::DiskObjectStore; - -pub const LOCAL_OBJECT_STORE_PATH_PREFIX: &str = "@local:"; - pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; type BoxedStreamingUploader = Box; -#[derive(Debug)] -pub enum ObjectStorePath<'a> { - Local(&'a str), - Remote(&'a str), -} - -impl ObjectStorePath<'_> { - pub fn is_local(&self) -> bool { - match self { - ObjectStorePath::Local(_) => true, - ObjectStorePath::Remote(_) => false, - } - } - - pub fn is_remote(&self) -> bool { - !self.is_local() - } - - pub fn as_str(&self) -> &str { - match self { - ObjectStorePath::Local(path) => path, - ObjectStorePath::Remote(path) => path, - } - } -} - -pub fn get_local_path(path: &str) -> String { - LOCAL_OBJECT_STORE_PATH_PREFIX.to_string() + path -} - -pub fn parse_object_store_path(path: &str) -> ObjectStorePath<'_> { - match path.strip_prefix(LOCAL_OBJECT_STORE_PATH_PREFIX) { - Some(path) => ObjectStorePath::Local(path), - None => ObjectStorePath::Remote(path), - } -} - /// Partitions a set of given paths into two vectors. The first vector contains all local paths, and /// the second contains all remote paths. -pub fn partition_object_store_paths(paths: &[String]) -> (Vec, Vec) { +pub fn partition_object_store_paths(paths: &[String]) -> Vec { // ToDo: Currently the result is a copy of the input. Would it be worth it to use an in-place // partition instead? - let mut vec_loc = vec![]; let mut vec_rem = vec![]; for path in paths { - match path.strip_prefix(LOCAL_OBJECT_STORE_PATH_PREFIX) { - Some(path) => vec_loc.push(path.to_string()), - None => vec_rem.push(path.to_string()), - }; + vec_rem.push(path.to_string()); } - (vec_loc, vec_rem) + vec_rem } #[derive(Debug, Copy, Clone)] @@ -186,23 +140,9 @@ pub trait ObjectStore: Send + Sync { pub enum ObjectStoreImpl { InMem(MonitoredObjectStore), - Disk(MonitoredObjectStore), Opendal(MonitoredObjectStore), S3(MonitoredObjectStore), S3Compatible(MonitoredObjectStore), - Hybrid { - local: Box, - remote: Box, - }, -} - -impl ObjectStoreImpl { - pub fn hybrid(local: ObjectStoreImpl, remote: ObjectStoreImpl) -> Self { - ObjectStoreImpl::Hybrid { - local: Box::new(local), - remote: Box::new(remote), - } - } } macro_rules! dispatch_async { @@ -214,57 +154,24 @@ macro_rules! dispatch_async { /// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl` /// enum type and the `path`. /// -/// For `path`, if the `path` starts with `LOCAL_OBJECT_STORE_PATH_PREFIX`, it indicates that the -/// operation should be performed on the local object store, and otherwise the operation should be -/// performed on remote object store. +/// Except for `InMem`,the operation should be performed on remote object store. macro_rules! object_store_impl_method_body { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $path:expr $(, $args:expr)*) => { { - let path = parse_object_store_path($path); + let path = $path; match $object_store { ObjectStoreImpl::InMem(in_mem) => { - assert!(path.is_remote(), "get local path in pure in-mem object store: {:?}", $path); - $dispatch_macro!(in_mem, $method_name, path.as_str() $(, $args)*) - }, - ObjectStoreImpl::Disk(disk) => { - assert!(path.is_remote(), "get local path in pure disk object store: {:?}", $path); - $dispatch_macro!(disk, $method_name, path.as_str() $(, $args)*) + $dispatch_macro!(in_mem, $method_name, path $(, $args)*) }, ObjectStoreImpl::Opendal(opendal) => { - assert!(path.is_remote(), "get local path in pure opendal object store engine: {:?}", $path); - $dispatch_macro!(opendal, $method_name, path.as_str() $(, $args)*) + $dispatch_macro!(opendal, $method_name, path $(, $args)*) }, ObjectStoreImpl::S3(s3) => { - assert!(path.is_remote(), "get local path in pure s3 object store: {:?}", $path); - $dispatch_macro!(s3, $method_name, path.as_str() $(, $args)*) + $dispatch_macro!(s3, $method_name, path $(, $args)*) }, ObjectStoreImpl::S3Compatible(s3) => { - assert!(path.is_remote(), "get local path in pure s3 compatible object store: {:?}", $path); - $dispatch_macro!(s3, $method_name, path.as_str() $(, $args)*) + $dispatch_macro!(s3, $method_name, path $(, $args)*) }, - ObjectStoreImpl::Hybrid { - local: local, - remote: remote, - } => { - match path { - ObjectStorePath::Local(_) => match local.as_ref() { - ObjectStoreImpl::InMem(in_mem) => $dispatch_macro!(in_mem, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::Disk(disk) => $dispatch_macro!(disk, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::Opendal(_) => unreachable!("Opendal object store cannot be used as local object store"), - ObjectStoreImpl::S3(_) => unreachable!("S3 cannot be used as local object store"), - ObjectStoreImpl::S3Compatible(_) => unreachable!("S3 compatible cannot be used as local object store"), - ObjectStoreImpl::Hybrid {..} => unreachable!("local object store of hybrid object store cannot be hybrid") - }, - ObjectStorePath::Remote(_) => match remote.as_ref() { - ObjectStoreImpl::InMem(in_mem) => $dispatch_macro!(in_mem, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::Disk(disk) => $dispatch_macro!(disk, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::Opendal(opendal) => $dispatch_macro!(opendal, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::S3(s3) => $dispatch_macro!(s3, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::S3Compatible(s3_compatible) => $dispatch_macro!(s3_compatible, $method_name, path.as_str() $(, $args)*), - ObjectStoreImpl::Hybrid {..} => unreachable!("remote object store of hybrid object store cannot be hybrid") - }, - } - } } } }; @@ -274,57 +181,23 @@ macro_rules! object_store_impl_method_body { /// enum type and the `paths`. It is a modification of the macro above to work with a slice of /// strings instead of just a single one. /// -/// If an entry in `paths` starts with `LOCAL_OBJECT_STORE_PATH_PREFIX`, it indicates that the -/// operation should be performed on the local object store, and otherwise the operation should be -/// performed on remote object store. +/// Except for `InMem`, the operation should be performed on remote object store. macro_rules! object_store_impl_method_body_slice { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { { - let (paths_loc, paths_rem) = partition_object_store_paths($paths); + let paths_rem = partition_object_store_paths($paths); match $object_store { ObjectStoreImpl::InMem(in_mem) => { - assert!(paths_loc.is_empty(), "get local path in pure in-mem object store: {:?}", $paths); $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) }, - ObjectStoreImpl::Disk(disk) => { - assert!(paths_loc.is_empty(), "get local path in pure disk object store: {:?}", $paths); - $dispatch_macro!(disk, $method_name, &paths_rem $(, $args)*) - }, ObjectStoreImpl::Opendal(opendal) => { - assert!(paths_loc.is_empty(), "get local path in pure opendal object store: {:?}", $paths); $dispatch_macro!(opendal, $method_name, &paths_rem $(, $args)*) }, ObjectStoreImpl::S3(s3) => { - assert!(paths_loc.is_empty(), "get local path in pure s3 object store: {:?}", $paths); $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) }, ObjectStoreImpl::S3Compatible(s3) => { - assert!(paths_loc.is_empty(), "get local path in pure s3 compatible object store: {:?}", $paths); $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) - }, - ObjectStoreImpl::Hybrid { - local: local, - remote: remote, - } => { - // Process local paths. - match local.as_ref() { - ObjectStoreImpl::InMem(in_mem) => $dispatch_macro!(in_mem, $method_name, &paths_loc $(, $args)*), - ObjectStoreImpl::Disk(disk) => $dispatch_macro!(disk, $method_name, &paths_loc $(, $args)*), - ObjectStoreImpl::Opendal(_) => unreachable!("Opendal object store cannot be used as local object store"), - ObjectStoreImpl::S3(_) => unreachable!("S3 cannot be used as local object store"), - ObjectStoreImpl::S3Compatible(_) => unreachable!("S3 cannot be used as local object store"), - ObjectStoreImpl::Hybrid {..} => unreachable!("local object store of hybrid object store cannot be hybrid") - }?; - - // Process remote paths. - match remote.as_ref() { - ObjectStoreImpl::InMem(in_mem) => $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*), - ObjectStoreImpl::Disk(disk) => $dispatch_macro!(disk, $method_name, &paths_rem $(, $args)*), - ObjectStoreImpl::Opendal(opendal) => $dispatch_macro!(opendal, $method_name, &paths_rem $(, $args)*), - ObjectStoreImpl::S3(s3) => $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*), - ObjectStoreImpl::S3Compatible(s3) => $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*), - ObjectStoreImpl::Hybrid {..} => unreachable!("remote object store of hybrid object store cannot be hybrid") - } } } } @@ -384,23 +257,15 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, list, dispatch_async, prefix) } - pub fn get_object_prefix(&self, obj_id: u64, is_remote: bool) -> String { + pub fn get_object_prefix(&self, obj_id: u64) -> String { // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore // trait. Macro object_store_impl_method_body routes to local or remote only depending on // the path match self { ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::Disk(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::S3Compatible(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::Hybrid { local, remote } => { - if is_remote { - remote.get_object_prefix(obj_id, true) - } else { - local.get_object_prefix(obj_id, false) - } - } } } } @@ -887,9 +752,6 @@ pub async fn parse_remote_object_store( .await .monitored(metrics), ), - disk if disk.starts_with("disk://") => ObjectStoreImpl::Disk( - DiskObjectStore::new(disk.strip_prefix("disk://").unwrap()).monitored(metrics), - ), "memory" => { tracing::warn!("You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident); ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics)) @@ -906,49 +768,3 @@ pub async fn parse_remote_object_store( } } } - -pub fn parse_local_object_store(url: &str, metrics: Arc) -> ObjectStoreImpl { - match url { - disk if disk.starts_with("disk://") => ObjectStoreImpl::Disk( - DiskObjectStore::new(disk.strip_prefix("disk://").unwrap()).monitored(metrics), - ), - temp_disk if temp_disk.starts_with("tempdisk") => { - let path = tempfile::TempDir::new() - .expect("should be able to create temp dir") - .into_path() - .to_str() - .expect("should be able to convert to str") - .to_owned(); - ObjectStoreImpl::Disk(DiskObjectStore::new(path.as_str()).monitored(metrics)) - } - "memory" => { - tracing::warn!("You're using Hummock in-memory local object store. This should never be used in benchmarks and production environment."); - ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics)) - } - #[cfg(feature = "hdfs-backend")] - hdfs if hdfs.starts_with("hdfs://") => { - let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); - let (namenode, root) = hdfs.split_once('@').unwrap(); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_hdfs_engine(namenode.to_string(), root.to_string()) - .unwrap() - .monitored(metrics), - ) - } - gcs if gcs.starts_with("gcs://") => { - let gcs = gcs.strip_prefix("gcs://").unwrap(); - let (bucket, root) = gcs.split_once('@').unwrap(); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string()) - .unwrap() - .monitored(metrics), - ) - } - other => { - unimplemented!( - "{} Hummock only supports s3, minio, disk, and memory for now.", - other - ) - } - } -} diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 6d53ad03a1210..0d3116d017954 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -52,14 +52,9 @@ macro_rules! assert_bytes_eq { pub const TEST_KEYS_COUNT: usize = 10; pub fn mock_sstable_store() -> SstableStoreRef { - mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::Hybrid { - local: Box::new(ObjectStoreImpl::InMem( - InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())), - )), - remote: Box::new(ObjectStoreImpl::InMem( - InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())), - )), - })) + mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem( + InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())), + ))) } pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef { diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index cb1a75b2e739c..5911650d630c5 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -317,7 +317,7 @@ impl SstableStore { } pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String { - let obj_prefix = self.store.get_object_prefix(object_id, true); + let obj_prefix = self.store.get_object_prefix(object_id); format!( "{}/{}{}.{}", self.path, obj_prefix, object_id, OBJECT_SUFFIX diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 86a1192aef188..4b69423d64efb 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -56,8 +56,6 @@ pub fn default_opts_for_test() -> StorageOpts { meta_cache_capacity_mb: 64, high_priority_ratio: 0, disable_remote_compactor: false, - enable_local_spill: false, - local_object_store: "memory".to_string(), share_buffer_upload_concurrency: 1, compactor_memory_limit_mb: 64, sstable_id_remote_fetch_number: 1, diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 6963884dfa832..151fd7dc0de95 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -45,9 +45,6 @@ pub struct StorageOpts { /// Percent of the ratio of high priority data in block-cache pub high_priority_ratio: usize, pub disable_remote_compactor: bool, - pub enable_local_spill: bool, - /// Local object store root. We should call `get_local_object_store` to get the object store. - pub local_object_store: String, /// Number of tasks shared buffer can upload in parallel. pub share_buffer_upload_concurrency: usize, /// Capacity of sstable meta cache. @@ -102,8 +99,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt block_cache_capacity_mb: s.block_cache_capacity_mb, meta_cache_capacity_mb: s.meta_cache_capacity_mb, disable_remote_compactor: c.storage.disable_remote_compactor, - enable_local_spill: c.storage.enable_local_spill, - local_object_store: c.storage.local_object_store.to_string(), share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency, compactor_memory_limit_mb: s.compactor_memory_limit_mb, sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number, diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index b91cdda47d3ef..8b443b8b5f12f 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use enum_as_inner::EnumAsInner; use risingwave_common_service::observer_manager::RpcNotificationClient; -use risingwave_object_store::object::{ - parse_local_object_store, parse_remote_object_store, ObjectStoreImpl, -}; +use risingwave_object_store::object::parse_remote_object_store; use crate::error::StorageResult; use crate::filter_key_extractor::{ @@ -579,21 +577,12 @@ impl StateStoreImpl { let store = match s { hummock if hummock.starts_with("hummock+") => { - let remote_object_store = parse_remote_object_store( + let object_store = parse_remote_object_store( hummock.strip_prefix("hummock+").unwrap(), object_store_metrics.clone(), "Hummock", ) .await; - let object_store = if opts.enable_local_spill { - let local_object_store = parse_local_object_store( - opts.local_object_store.as_str(), - object_store_metrics.clone(), - ); - ObjectStoreImpl::hybrid(local_object_store, remote_object_store) - } else { - remote_object_store - }; let sstable_store = Arc::new(SstableStore::new( Arc::new(object_store),