From 96643b96b8997573a8ffc39cd53e155732fd1e3c Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Fri, 12 Jan 2024 15:22:55 +0800 Subject: [PATCH 1/9] add object store sim support Signed-off-by: Kevin Axel --- Cargo.lock | 1 + src/object_store/Cargo.toml | 1 + src/object_store/src/object/error.rs | 16 ++ src/object_store/src/object/mod.rs | 26 ++ src/object_store/src/object/sim/client.rs | 48 ++++ src/object_store/src/object/sim/error.rs | 39 +++ src/object_store/src/object/sim/mod.rs | 256 ++++++++++++++++++ src/object_store/src/object/sim/rpc_server.rs | 56 ++++ src/object_store/src/object/sim/service.rs | 140 ++++++++++ src/tests/simulation/src/cluster.rs | 1 + 10 files changed, 584 insertions(+) create mode 100644 src/object_store/src/object/sim/client.rs create mode 100644 src/object_store/src/object/sim/error.rs create mode 100644 src/object_store/src/object/sim/mod.rs create mode 100644 src/object_store/src/object/sim/rpc_server.rs create mode 100644 src/object_store/src/object/sim/service.rs diff --git a/Cargo.lock b/Cargo.lock index 713e70c52449..26bd91b9d17b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9058,6 +9058,7 @@ dependencies = [ "hyper-rustls", "hyper-tls", "itertools 0.12.0", + "madsim", "madsim-aws-sdk-s3", "madsim-tokio", "opendal", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 95354557dd94..9810e3b4e062 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -36,6 +36,7 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] } tokio-retry = "0.3" tracing = "0.1" +madsim = "0.2.22" # This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...## [target.'cfg(not(madsim))'.dependencies] # workspace-hack = { path = "../workspace-hack" } # diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index c84527e6bdb0..fb531b621d66 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -24,6 +24,8 @@ use risingwave_common::error::BoxedError; use thiserror::Error; use tokio::sync::oneshot::error::RecvError; +use super::sim::SimError; + #[derive(Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] #[thiserror_ext(newtype(name = ObjectError, backtrace, report_debug))] pub enum ObjectErrorInner { @@ -107,4 +109,18 @@ impl From for ObjectError { } } +#[cfg(madsim)] +impl From for ObjectError { + fn from(e: SimError) -> Self { + ObjectErrorInner::Internal(e.to_string()).into() + } +} + +#[cfg(madsim)] +impl From for ObjectError { + fn from(e: std::io::Error) -> Self { + ObjectErrorInner::Internal(e.to_string()).into() + } +} + pub type ObjectResult = std::result::Result; diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 6684b45b6d66..2457597c60d0 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; use std::time::Duration; @@ -38,6 +39,8 @@ pub mod object_metrics; pub use error::*; use object_metrics::ObjectStoreMetrics; +use self::sim::SimObjectStore; + pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; @@ -138,6 +141,8 @@ pub enum ObjectStoreImpl { InMem(MonitoredObjectStore), Opendal(MonitoredObjectStore), S3(MonitoredObjectStore), + #[cfg(madsim)] + Sim(MonitoredObjectStore), } macro_rules! dispatch_async { @@ -164,7 +169,12 @@ macro_rules! object_store_impl_method_body { ObjectStoreImpl::S3(s3) => { $dispatch_macro!(s3, $method_name, path $(, $args)*) }, + #[cfg(madsim)] + ObjectStoreImpl::Sim(in_mem) => { + $dispatch_macro!(in_mem, $method_name, path $(, $args)*) + }, } + } }; } @@ -178,6 +188,7 @@ macro_rules! object_store_impl_method_body_slice { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { { let paths_rem = partition_object_store_paths($paths); + match $object_store { ObjectStoreImpl::InMem(in_mem) => { $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) @@ -188,6 +199,10 @@ macro_rules! object_store_impl_method_body_slice { ObjectStoreImpl::S3(s3) => { $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) }, + #[cfg(madsim)] + ObjectStoreImpl::Sim(in_mem) => { + $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) + }, } } }; @@ -246,6 +261,8 @@ impl ObjectStoreImpl { ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), + #[cfg(madsim)] + ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id), } } @@ -256,6 +273,8 @@ impl ObjectStoreImpl { store.inner.op.info().native_capability().write_can_multi } ObjectStoreImpl::S3(_) => true, + #[cfg(madsim)] + ObjectStoreImpl::Sim(_) => true, } } @@ -264,6 +283,8 @@ impl ObjectStoreImpl { ObjectStoreImpl::InMem(store) => store.recv_buffer_size(), ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(), ObjectStoreImpl::S3(store) => store.recv_buffer_size(), + #[cfg(madsim)] + ObjectStoreImpl::Sim(store) => store.recv_buffer_size(), } } } @@ -890,6 +911,11 @@ pub async fn build_remote_object_store( } ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics)) } + #[cfg(madsim)] + sim if sim.starts_with("sim://") => { + ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics)) + } + other => { unimplemented!( "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.", diff --git a/src/object_store/src/object/sim/client.rs b/src/object_store/src/object/sim/client.rs new file mode 100644 index 000000000000..4e7aa67834a5 --- /dev/null +++ b/src/object_store/src/object/sim/client.rs @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::net::SocketAddr; + +use madsim::net::{Endpoint, Payload}; + +use super::service::{Request, Response}; +use crate::object::error::ObjectResult as Result; + +#[derive(Debug, Clone)] +pub struct Client { + addr: SocketAddr, +} + +impl Client { + pub fn new(addr: SocketAddr) -> Self { + Self { addr } + } + + pub(crate) async fn send_request(&self, req: Request) -> Result { + let resp = self.send_request_io(req).await?; + let resp = *resp + .downcast::>() + .expect("failed to downcast"); + resp + } + + async fn send_request_io(&self, req: Request) -> std::io::Result { + let addr = self.addr; + let ep = Endpoint::connect(addr).await?; + let (tx, mut rx) = ep.connect1(addr).await?; + tx.send(Box::new(req)).await?; + rx.recv().await + } +} diff --git a/src/object_store/src/object/sim/error.rs b/src/object_store/src/object/sim/error.rs new file mode 100644 index 000000000000..29ca09f6825b --- /dev/null +++ b/src/object_store/src/object/sim/error.rs @@ -0,0 +1,39 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SimError { + #[error("NotFound error: {0}")] + NotFound(String), + #[error("Other error: {0}")] + Other(String), +} + +impl SimError { + pub fn is_object_not_found_error(&self) -> bool { + matches!(self, SimError::NotFound(_)) + } +} + +impl SimError { + pub(crate) fn not_found(msg: impl ToString) -> Self { + SimError::NotFound(msg.to_string()) + } + + pub(crate) fn other(msg: impl ToString) -> Self { + SimError::Other(msg.to_string()) + } +} diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs new file mode 100644 index 000000000000..6b0bc73dae94 --- /dev/null +++ b/src/object_store/src/object/sim/mod.rs @@ -0,0 +1,256 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod client; +mod error; +use bytes::{BufMut, BytesMut}; +pub use error::SimError; +use futures::Stream; +mod rpc_server; +mod service; + +use std::net::SocketAddr; +use std::ops::Range; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use risingwave_common::range::RangeBoundsExt; + +use self::client::Client; +use self::service::Response; +use super::{ + BoxedStreamingUploader, Bytes, ObjectDataStream, ObjectError, ObjectMetadata, + ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, +}; + +type PartId = i32; + +const MIN_PART_ID: PartId = 1; + +const STREAM_PART_SIZE: usize = 16 * 1024 * 1024; + +pub struct SimStreamingUploader { + client: Client, + part_size: usize, + key: String, + buf: BytesMut, + not_uploaded_len: usize, +} + +impl SimStreamingUploader { + fn new(client: Client, key: String, part_size: usize) -> Self { + Self { + client, + key, + part_size, + buf: Default::default(), + not_uploaded_len: 0, + } + } +} + +#[async_trait::async_trait] +impl StreamingUploader for SimStreamingUploader { + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { + let data_len = data.len(); + self.buf.put(data); + + Ok(()) + } + + async fn finish(mut self: Box) -> ObjectResult<()> { + if self.buf.is_empty() { + debug_assert_eq!(self.not_uploaded_len, 0); + Err(ObjectError::internal("upload empty object")) + } else { + let resp = self + .client + .send_request(service::Request::Upload { + path: self.key, + obj: self.buf.freeze(), + }) + .await?; + let Response::Upload = resp else { + return Err(SimError::other("expect Response::Upload").into()); + }; + Ok(()) + } + } + + fn get_memory_usage(&self) -> u64 { + self.buf.len() as u64 + } +} + +pub struct SimDataIterator { + data: Bytes, + offset: usize, +} + +impl SimDataIterator { + pub fn new(data: Bytes) -> Self { + Self { data, offset: 0 } + } +} + +impl Stream for SimDataIterator { + type Item = ObjectResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + const MAX_PACKET_SIZE: usize = 128 * 1024; + if self.offset >= self.data.len() { + return Poll::Ready(None); + } + let read_len = std::cmp::min(self.data.len() - self.offset, MAX_PACKET_SIZE); + let data = self.data.slice(self.offset..(self.offset + read_len)); + self.offset += read_len; + Poll::Ready(Some(Ok(data))) + } +} + +pub struct SimObjectStore { + client: Client, + part_size: usize, +} + +#[async_trait::async_trait] +impl ObjectStore for SimObjectStore { + fn get_object_prefix(&self, _obj_id: u64) -> String { + String::default() + } + + async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { + if obj.is_empty() { + Err(ObjectError::internal("upload empty object")) + } else { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Upload { path, obj }) + .await?; + if let Response::Upload = resp { + Ok(()) + } else { + Err(SimError::other("expect Response::Upload").into()) + } + } + } + + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(Box::new(SimStreamingUploader::new( + self.client.clone(), + path.to_string(), + self.part_size, + ))) + } + + async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Read { path }) + .await?; + if let Response::Read(obj) = resp { + if let Some(end) = range.end() + && end > obj.len() + { + return Err(SimError::other("bad block offset and size").into()); + } + Ok(obj.slice(range)) + } else { + Err(SimError::other("expect Response::Read").into()) + } + } + + async fn metadata(&self, path: &str) -> ObjectResult { + let path = path.to_string(); + if let Response::Metadata(m) = self + .client + .send_request(service::Request::Metadata { path }) + .await? + { + Ok(m) + } else { + Err(SimError::other("expect Response::Metadata").into()) + } + } + + async fn streaming_read( + &self, + path: &str, + range: Range, + ) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Read { path }) + .await?; + let Response::Read(body) = resp else { + return Err(SimError::other("expect Response::Read").into()); + }; + + Ok(Box::pin(SimDataIterator::new(body))) + } + + async fn delete(&self, path: &str) -> ObjectResult<()> { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Delete { path }) + .await?; + if let Response::Delete = resp { + Ok(()) + } else { + Err(SimError::other("expect Response::Delete").into()) + } + } + + async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { + let mut error = None; + for path in paths { + error = error.or(self.delete(path).await.err()); + } + if let Some(e) = error { + Err(e) + } else { + Ok(()) + } + } + + async fn list(&self, path: &str) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::List { path }) + .await?; + if let Response::List(o) = resp { + Ok(Box::pin(o)) + } else { + Err(SimError::other("expect Response::List").into()) + } + } + + fn store_media_type(&self) -> &'static str { + "sim" + } +} + +impl SimObjectStore { + pub fn new(addr: &str) -> Self { + Self { + client: Client::new(addr.parse::().expect("parse SockAddr failed")), + part_size: STREAM_PART_SIZE, + } + } +} diff --git a/src/object_store/src/object/sim/rpc_server.rs b/src/object_store/src/object/sim/rpc_server.rs new file mode 100644 index 000000000000..cc976ffaced8 --- /dev/null +++ b/src/object_store/src/object/sim/rpc_server.rs @@ -0,0 +1,56 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Result; +use std::net::SocketAddr; +use std::sync::Arc; + +use madsim::net::{Endpoint, Payload}; + +use super::service::{Request, SimService}; + +/// A simulated ObjectStore server. +#[derive(Default, Clone)] +pub struct SimServer {} + +impl SimServer { + pub fn builder() -> Self { + SimServer::default() + } + + pub async fn serve(self, addr: SocketAddr) -> Result<()> { + let ep = Endpoint::bind(addr).await?; + let service = SimService::new(); + let service = Arc::new(service); + + loop { + let (tx, mut rx, _) = ep.accept1().await?; + let service = service.clone(); + madsim::task::spawn(async move { + let request = *rx.recv().await?.downcast::().unwrap(); + use super::service::Request::*; + + let response: Payload = match request { + Upload { path, obj } => Box::new(service.upload(path, obj).await), + Read { path } => Box::new(service.read(path).await), + Delete { path } => Box::new(service.delete(path).await), + List { path } => Box::new(service.list(path).await), + Metadata { path } => Box::new(service.metadata(path).await), + }; + tx.send(response).await?; + Ok(()) as Result<()> + }); + } + } +} diff --git a/src/object_store/src/object/sim/service.rs b/src/object_store/src/object/sim/service.rs new file mode 100644 index 000000000000..3db8b4f9bd40 --- /dev/null +++ b/src/object_store/src/object/sim/service.rs @@ -0,0 +1,140 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, VecDeque}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use futures::Stream; +use itertools::Itertools; +use spin::Mutex; + +use crate::object::error::ObjectResult as Result; +use crate::object::sim::SimError; +use crate::object::{ObjectError, ObjectMetadata}; + +#[derive(Debug)] +pub(crate) enum Request { + Upload { path: String, obj: Bytes }, + Read { path: String }, + Delete { path: String }, + List { path: String }, + Metadata { path: String }, +} + +#[derive(Debug)] +pub(crate) enum Response { + Upload, + Read(Bytes), + Delete, + List(SimObjectIter), + Metadata(ObjectMetadata), +} + +#[derive(Debug)] +pub(crate) struct SimService { + storage: Mutex>, +} + +fn get_obj_meta(path: &str, obj: &Bytes) -> Result { + Ok(ObjectMetadata { + key: path.to_string(), + last_modified: SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(ObjectError::internal)? + .as_secs_f64(), + total_size: obj.len(), + }) +} + +impl SimService { + pub fn new() -> Self { + SimService { + storage: Mutex::new(BTreeMap::new()), + } + } + + pub async fn upload(&self, path: String, obj: Bytes) -> Result { + let metadata = get_obj_meta(&path, &obj)?; + self.storage.lock().insert(path.into(), (metadata, obj)); + Ok(Response::Upload) + } + + pub async fn read(&self, path: String) -> Result { + let storage = self.storage.lock(); + let obj = storage + .get(&path) + .map(|(_, o)| o) + .ok_or_else(|| SimError::NotFound(format!("no object at path '{}'", path)))?; + Ok(Response::Read(obj.clone())) + } + + pub async fn delete(&self, path: String) -> Result { + self.storage.lock().remove(&path); + Ok(Response::Delete) + } + + pub async fn list(&self, prefix: String) -> Result { + let list_result = self + .storage + .lock() + .iter() + .filter_map(|(path, (metadata, _))| { + if path.starts_with(&prefix) { + return Some(metadata.clone()); + } + None + }) + .sorted_by(|a, b| Ord::cmp(&a.key, &b.key)) + .collect_vec(); + Ok(Response::List(SimObjectIter::new(list_result))) + } + + pub async fn metadata(&self, path: String) -> Result { + self.storage + .lock() + .get(&path) + .map(|(metadata, _)| metadata) + .cloned() + .ok_or_else(|| SimError::not_found(format!("no object at path '{}'", path)).into()) + .map(|meta| Response::Metadata(meta)) + } +} + +#[derive(Debug)] + +pub(crate) struct SimObjectIter { + list_result: VecDeque, +} + +impl SimObjectIter { + fn new(list_result: Vec) -> Self { + Self { + list_result: list_result.into(), + } + } +} + +impl Stream for SimObjectIter { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Some(i) = self.list_result.pop_front() { + return Poll::Ready(Some(Ok(i))); + } + Poll::Ready(None) + } +} diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 3bed45d06dd0..0e95d4e610d1 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -301,6 +301,7 @@ impl Cluster { .build(); // s3 + // TODO(wff): switch to ObjectStore trait sim handle .create_node() .name("s3") From 26e7e0b5d61c937cc277cf4d0df266f57ad22aa0 Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Fri, 12 Jan 2024 22:34:18 +0800 Subject: [PATCH 2/9] substitute s3 simulator with object store sim Signed-off-by: Kevin Axel --- Cargo.lock | 1 + src/object_store/src/object/error.rs | 16 +++++++--------- src/object_store/src/object/mod.rs | 2 +- src/object_store/src/object/sim/mod.rs | 24 ++++++++---------------- src/tests/simulation/Cargo.toml | 1 + src/tests/simulation/src/cluster.rs | 14 +++++++------- 6 files changed, 25 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 154e8d7b8c38..9d60e508d3b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9712,6 +9712,7 @@ dependencies = [ "risingwave_frontend", "risingwave_hummock_sdk", "risingwave_meta_node", + "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_sqlparser", diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index fb531b621d66..b782b4a447c8 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -24,8 +24,6 @@ use risingwave_common::error::BoxedError; use thiserror::Error; use tokio::sync::oneshot::error::RecvError; -use super::sim::SimError; - #[derive(Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] #[thiserror_ext(newtype(name = ObjectError, backtrace, report_debug))] pub enum ObjectErrorInner { @@ -44,6 +42,9 @@ pub enum ObjectErrorInner { #[error("Internal error: {0}")] #[construct(skip)] Internal(String), + #[cfg(madsim)] + #[error(transparent)] + Sim(#[from] crate::object::sim::SimError), } impl ObjectError { @@ -81,6 +82,10 @@ impl ObjectError { ObjectErrorInner::Mem(e) => { return e.is_object_not_found_error(); } + #[cfg(madsim)] + ObjectErrorInner::Sim(e) => { + return e.is_object_not_found_error(); + } _ => {} }; false @@ -109,13 +114,6 @@ impl From for ObjectError { } } -#[cfg(madsim)] -impl From for ObjectError { - fn from(e: SimError) -> Self { - ObjectErrorInner::Internal(e.to_string()).into() - } -} - #[cfg(madsim)] impl From for ObjectError { fn from(e: std::io::Error) -> Self { diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 270514cc155d..5216abec3a91 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod sim; +pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; use std::time::Duration; diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 6b0bc73dae94..a747c84026dc 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -18,6 +18,7 @@ use bytes::{BufMut, BytesMut}; pub use error::SimError; use futures::Stream; mod rpc_server; +pub use rpc_server::SimServer; mod service; use std::net::SocketAddr; @@ -34,28 +35,18 @@ use super::{ ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, }; -type PartId = i32; - -const MIN_PART_ID: PartId = 1; - -const STREAM_PART_SIZE: usize = 16 * 1024 * 1024; - pub struct SimStreamingUploader { client: Client, - part_size: usize, key: String, buf: BytesMut, - not_uploaded_len: usize, } impl SimStreamingUploader { - fn new(client: Client, key: String, part_size: usize) -> Self { + fn new(client: Client, key: String) -> Self { Self { client, key, - part_size, buf: Default::default(), - not_uploaded_len: 0, } } } @@ -71,7 +62,6 @@ impl StreamingUploader for SimStreamingUploader { async fn finish(mut self: Box) -> ObjectResult<()> { if self.buf.is_empty() { - debug_assert_eq!(self.not_uploaded_len, 0); Err(ObjectError::internal("upload empty object")) } else { let resp = self @@ -121,7 +111,6 @@ impl Stream for SimDataIterator { pub struct SimObjectStore { client: Client, - part_size: usize, } #[async_trait::async_trait] @@ -151,7 +140,6 @@ impl ObjectStore for SimObjectStore { Ok(Box::new(SimStreamingUploader::new( self.client.clone(), path.to_string(), - self.part_size, ))) } @@ -248,9 +236,13 @@ impl ObjectStore for SimObjectStore { impl SimObjectStore { pub fn new(addr: &str) -> Self { + let addr = addr.strip_prefix("sim://").unwrap(); + let (addr, _bucket) = addr.split_once('/').unwrap(); Self { - client: Client::new(addr.parse::().expect("parse SockAddr failed")), - part_size: STREAM_PART_SIZE, + client: Client::new( + addr.parse::() + .expect(&format!("parse SockAddr failed: {}", addr)), + ), } } } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 7783cfbcf0ab..5bac95f6f00d 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -32,6 +32,7 @@ prometheus = { version = "0.13" } rand = "0.8" rand_chacha = { version = "0.3.1" } rdkafka = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_common = { workspace = true } risingwave_compactor = { workspace = true } risingwave_compute = { workspace = true } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 08b348116b95..933b8550df53 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -32,6 +32,7 @@ use itertools::Itertools; use madsim::runtime::{Handle, NodeHandle}; use rand::seq::IteratorRandom; use rand::Rng; +use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer; use risingwave_pb::common::WorkerNode; use sqllogictest::AsyncDB; #[cfg(not(madsim))] @@ -262,7 +263,7 @@ metrics_level = "Disabled" /// | etcd | 192.168.10.1 | /// | kafka-broker | 192.168.11.1 | /// | kafka-producer | 192.168.11.2 | -/// | s3 | 192.168.12.1 | +/// | sim | 192.168.12.1 | /// | client | 192.168.100.1 | /// | ctl | 192.168.101.1 | pub struct Cluster { @@ -341,15 +342,13 @@ impl Cluster { }) .build(); - // s3 - // TODO(wff): switch to ObjectStore trait sim + // sim handle .create_node() - .name("s3") + .name("object_store_sim") .ip("192.168.12.1".parse().unwrap()) .init(move || async move { - aws_sdk_s3::server::SimServer::default() - .with_bucket("hummock001") + ObjectStoreSimServer::builder() .serve("0.0.0.0:9301".parse().unwrap()) .await }) @@ -379,7 +378,8 @@ impl Cluster { "--etcd-endpoints", "etcd:2388", "--state-store", - "hummock+minio://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", + // "hummock+minio://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", + "hummock+sim://192.168.12.1:9301/hummock001", "--data-directory", "hummock_001", ]); From 1bcf9cd28696b4971809b371be7501a92d6b3d44 Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Fri, 12 Jan 2024 22:35:28 +0800 Subject: [PATCH 3/9] fix typo Signed-off-by: Kevin Axel --- src/tests/simulation/src/cluster.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 933b8550df53..02af3312589d 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -254,18 +254,18 @@ metrics_level = "Disabled" /// /// # Nodes /// -/// | Name | IP | -/// | -------------- | ------------- | -/// | meta-x | 192.168.1.x | -/// | frontend-x | 192.168.2.x | -/// | compute-x | 192.168.3.x | -/// | compactor-x | 192.168.4.x | -/// | etcd | 192.168.10.1 | -/// | kafka-broker | 192.168.11.1 | -/// | kafka-producer | 192.168.11.2 | -/// | sim | 192.168.12.1 | -/// | client | 192.168.100.1 | -/// | ctl | 192.168.101.1 | +/// | Name | IP | +/// | ---------------- | ------------- | +/// | meta-x | 192.168.1.x | +/// | frontend-x | 192.168.2.x | +/// | compute-x | 192.168.3.x | +/// | compactor-x | 192.168.4.x | +/// | etcd | 192.168.10.1 | +/// | kafka-broker | 192.168.11.1 | +/// | kafka-producer | 192.168.11.2 | +/// | object_store_sim | 192.168.12.1 | +/// | client | 192.168.100.1 | +/// | ctl | 192.168.101.1 | pub struct Cluster { config: Configuration, handle: Handle, @@ -342,7 +342,7 @@ impl Cluster { }) .build(); - // sim + // object_store_sim handle .create_node() .name("object_store_sim") From 257202b9a6699aa85548f3a5e98b3d1ca5dbda7e Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Fri, 12 Jan 2024 22:53:53 +0800 Subject: [PATCH 4/9] fix check Signed-off-by: Kevin Axel --- src/object_store/Cargo.toml | 2 +- src/object_store/src/object/mod.rs | 1 + src/tests/simulation/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 9810e3b4e062..61f526511c3a 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -26,6 +26,7 @@ hyper = { version = "0.14", features = ["tcp", "client"] } hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" +madsim = "0.2.22" opendal = "0.44" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } @@ -36,7 +37,6 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] } tokio-retry = "0.3" tracing = "0.1" -madsim = "0.2.22" # This crate is excluded from hakari (see hakari.toml) after hdfs is introduced...## [target.'cfg(not(madsim))'.dependencies] # workspace-hack = { path = "../workspace-hack" } # diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 5216abec3a91..db490dfd4728 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(madsim)] pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 5bac95f6f00d..a08fc68e8c33 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -32,7 +32,6 @@ prometheus = { version = "0.13" } rand = "0.8" rand_chacha = { version = "0.3.1" } rdkafka = { workspace = true } -risingwave_object_store = { workspace = true } risingwave_common = { workspace = true } risingwave_compactor = { workspace = true } risingwave_compute = { workspace = true } @@ -43,6 +42,7 @@ risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_meta_node = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } From 642a684d179e0f3db4d2e2333599c6e8be7f265a Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Fri, 12 Jan 2024 22:57:48 +0800 Subject: [PATCH 5/9] fix check Signed-off-by: Kevin Axel --- src/object_store/src/object/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index db490dfd4728..e43d349efef1 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -40,6 +40,7 @@ pub mod object_metrics; pub use error::*; use object_metrics::ObjectStoreMetrics; +#[cfg(madsim)] use self::sim::SimObjectStore; pub type ObjectStoreRef = Arc; From 7809206aa23fe11c1952edbfb5cbcf385e46b4a7 Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Fri, 12 Jan 2024 23:30:01 +0800 Subject: [PATCH 6/9] fix check Signed-off-by: Kevin Axel --- src/tests/simulation/src/cluster.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 02af3312589d..ffd350d111ad 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -32,6 +32,7 @@ use itertools::Itertools; use madsim::runtime::{Handle, NodeHandle}; use rand::seq::IteratorRandom; use rand::Rng; +#[cfg(madsim)] use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer; use risingwave_pb::common::WorkerNode; use sqllogictest::AsyncDB; @@ -343,6 +344,7 @@ impl Cluster { .build(); // object_store_sim + #[cfg(madsim)] handle .create_node() .name("object_store_sim") From 54daa9007ef0867d4a7debf73d35411ec7527cfa Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Thu, 18 Jan 2024 18:08:03 +0800 Subject: [PATCH 7/9] fix stream read Signed-off-by: Kevin Axel --- src/object_store/src/object/mod.rs | 1 - src/object_store/src/object/sim/mod.rs | 16 +++++++++------- src/tests/simulation/src/cluster.rs | 4 +--- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index e43d349efef1..b59ed5067bc3 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -922,7 +922,6 @@ pub async fn build_remote_object_store( sim if sim.starts_with("sim://") => { ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics)) } - other => { unimplemented!( "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.", diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index a747c84026dc..d49e87cc6641 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -22,7 +22,7 @@ pub use rpc_server::SimServer; mod service; use std::net::SocketAddr; -use std::ops::Range; +use std::ops::{Range, RangeBounds}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -54,9 +54,7 @@ impl SimStreamingUploader { #[async_trait::async_trait] impl StreamingUploader for SimStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { - let data_len = data.len(); self.buf.put(data); - Ok(()) } @@ -188,7 +186,7 @@ impl ObjectStore for SimObjectStore { return Err(SimError::other("expect Response::Read").into()); }; - Ok(Box::pin(SimDataIterator::new(body))) + Ok(Box::pin(SimDataIterator::new(body.slice(range)))) } async fn delete(&self, path: &str) -> ObjectResult<()> { @@ -237,11 +235,15 @@ impl ObjectStore for SimObjectStore { impl SimObjectStore { pub fn new(addr: &str) -> Self { let addr = addr.strip_prefix("sim://").unwrap(); - let (addr, _bucket) = addr.split_once('/').unwrap(); + let (_access_key_id, rest) = addr.split_once(':').unwrap(); + let (_secret_access_key, rest) = rest.split_once('@').unwrap(); + let (address, _bucket) = rest.split_once('/').unwrap(); + Self { client: Client::new( - addr.parse::() - .expect(&format!("parse SockAddr failed: {}", addr)), + address + .parse::() + .expect(&format!("parse SockAddr failed: {}", address)), ), } } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index ca9a7cce2fd4..a5c8fdc08987 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -386,7 +386,6 @@ impl Cluster { .build(); // object_store_sim - #[cfg(madsim)] handle .create_node() .name("object_store_sim") @@ -422,8 +421,7 @@ impl Cluster { "--etcd-endpoints", "etcd:2388", "--state-store", - // "hummock+minio://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", - "hummock+sim://192.168.12.1:9301/hummock001", + "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", "--data-directory", "hummock_001", ]); From b144e3d85c5e6ebd1c07158bcd0d97c7bb1388c6 Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Tue, 30 Jan 2024 17:43:58 +0800 Subject: [PATCH 8/9] update for pr comment Signed-off-by: Kevin Axel --- src/object_store/src/object/sim/mod.rs | 30 ++++++++++--------- src/object_store/src/object/sim/rpc_server.rs | 1 + src/object_store/src/object/sim/service.rs | 29 ++++++++++++------ 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index d49e87cc6641..e31e244a5a28 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -26,6 +26,8 @@ use std::ops::{Range, RangeBounds}; use std::pin::Pin; use std::task::{Context, Poll}; +use madsim::rand::{thread_rng, RngCore}; +use madsim::time::{sleep, Duration}; use risingwave_common::range::RangeBoundsExt; use self::client::Client; @@ -70,7 +72,7 @@ impl StreamingUploader for SimStreamingUploader { }) .await?; let Response::Upload = resp else { - return Err(SimError::other("expect Response::Upload").into()); + panic!("expect Response::Upload"); }; Ok(()) } @@ -129,7 +131,7 @@ impl ObjectStore for SimObjectStore { if let Response::Upload = resp { Ok(()) } else { - Err(SimError::other("expect Response::Upload").into()) + panic!("expect Response::Upload"); } } } @@ -155,7 +157,7 @@ impl ObjectStore for SimObjectStore { } Ok(obj.slice(range)) } else { - Err(SimError::other("expect Response::Read").into()) + panic!("expect Response::Read"); } } @@ -168,7 +170,7 @@ impl ObjectStore for SimObjectStore { { Ok(m) } else { - Err(SimError::other("expect Response::Metadata").into()) + panic!("expect Response::Metadata"); } } @@ -183,7 +185,7 @@ impl ObjectStore for SimObjectStore { .send_request(service::Request::Read { path }) .await?; let Response::Read(body) = resp else { - return Err(SimError::other("expect Response::Read").into()); + panic!("expect Response::Read"); }; Ok(Box::pin(SimDataIterator::new(body.slice(range)))) @@ -198,19 +200,19 @@ impl ObjectStore for SimObjectStore { if let Response::Delete = resp { Ok(()) } else { - Err(SimError::other("expect Response::Delete").into()) + panic!("expect Response::Delete"); } } async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { - let mut error = None; - for path in paths { - error = error.or(self.delete(path).await.err()); - } - if let Some(e) = error { - Err(e) - } else { + let resp = self + .client + .send_request(service::Request::DeleteObjects { paths: paths.to_vec() }) + .await?; + if let Response::DeleteObjects = resp { Ok(()) + } else { + panic!("expect Response::DeleteObjects"); } } @@ -223,7 +225,7 @@ impl ObjectStore for SimObjectStore { if let Response::List(o) = resp { Ok(Box::pin(o)) } else { - Err(SimError::other("expect Response::List").into()) + panic!("expect Response::List"); } } diff --git a/src/object_store/src/object/sim/rpc_server.rs b/src/object_store/src/object/sim/rpc_server.rs index cc976ffaced8..068dbed5a27a 100644 --- a/src/object_store/src/object/sim/rpc_server.rs +++ b/src/object_store/src/object/sim/rpc_server.rs @@ -45,6 +45,7 @@ impl SimServer { Upload { path, obj } => Box::new(service.upload(path, obj).await), Read { path } => Box::new(service.read(path).await), Delete { path } => Box::new(service.delete(path).await), + DeleteObjects { paths } => Box::new(service.delete_objects(paths).await), List { path } => Box::new(service.list(path).await), Metadata { path } => Box::new(service.metadata(path).await), }; diff --git a/src/object_store/src/object/sim/service.rs b/src/object_store/src/object/sim/service.rs index 3db8b4f9bd40..e789d4d798b4 100644 --- a/src/object_store/src/object/sim/service.rs +++ b/src/object_store/src/object/sim/service.rs @@ -31,6 +31,7 @@ pub(crate) enum Request { Upload { path: String, obj: Bytes }, Read { path: String }, Delete { path: String }, + DeleteObjects { paths: Vec }, List { path: String }, Metadata { path: String }, } @@ -40,6 +41,7 @@ pub(crate) enum Response { Upload, Read(Bytes), Delete, + DeleteObjects, List(SimObjectIter), Metadata(ObjectMetadata), } @@ -86,19 +88,29 @@ impl SimService { self.storage.lock().remove(&path); Ok(Response::Delete) } + + pub async fn delete_objects(&self, paths: Vec) -> Result { + for path in paths { + self.storage.lock().remove(&path); + } + Ok(Response::DeleteObjects) + } pub async fn list(&self, prefix: String) -> Result { + if prefix.is_empty() { + return Ok(Response::List(SimObjectIter::new(vec![]))); + } + + let mut scan_end = prefix.chars().collect_vec(); + let next = *scan_end.last().unwrap() as u8 + 1; + *scan_end.last_mut().unwrap() = next as char; + let scan_end = scan_end.into_iter().collect::(); + let list_result = self .storage .lock() - .iter() - .filter_map(|(path, (metadata, _))| { - if path.starts_with(&prefix) { - return Some(metadata.clone()); - } - None - }) - .sorted_by(|a, b| Ord::cmp(&a.key, &b.key)) + .range(prefix..scan_end) + .map(|(_, (o, _))| o.clone()) .collect_vec(); Ok(Response::List(SimObjectIter::new(list_result))) } @@ -115,7 +127,6 @@ impl SimService { } #[derive(Debug)] - pub(crate) struct SimObjectIter { list_result: VecDeque, } From 0a8a15e4d3672d6f29284093e51ef934b23961d8 Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Tue, 30 Jan 2024 18:04:09 +0800 Subject: [PATCH 9/9] check Signed-off-by: Kevin Axel --- src/object_store/src/object/sim/mod.rs | 4 +++- src/object_store/src/object/sim/service.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index e31e244a5a28..31f34cbcd326 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -207,7 +207,9 @@ impl ObjectStore for SimObjectStore { async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { let resp = self .client - .send_request(service::Request::DeleteObjects { paths: paths.to_vec() }) + .send_request(service::Request::DeleteObjects { + paths: paths.to_vec(), + }) .await?; if let Response::DeleteObjects = resp { Ok(()) diff --git a/src/object_store/src/object/sim/service.rs b/src/object_store/src/object/sim/service.rs index e789d4d798b4..fb03a1b13c17 100644 --- a/src/object_store/src/object/sim/service.rs +++ b/src/object_store/src/object/sim/service.rs @@ -88,7 +88,7 @@ impl SimService { self.storage.lock().remove(&path); Ok(Response::Delete) } - + pub async fn delete_objects(&self, paths: Vec) -> Result { for path in paths { self.storage.lock().remove(&path);