diff --git a/Cargo.lock b/Cargo.lock index fffd5b98e480..5b73909cc979 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9610,6 +9610,7 @@ dependencies = [ "hyper-rustls", "hyper-tls", "itertools 0.12.0", + "madsim", "madsim-aws-sdk-s3", "madsim-tokio", "opendal", @@ -9775,6 +9776,7 @@ dependencies = [ "risingwave_frontend", "risingwave_hummock_sdk", "risingwave_meta_node", + "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_sqlparser", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 95354557dd94..61f526511c3a 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -26,6 +26,7 @@ hyper = { version = "0.14", features = ["tcp", "client"] } hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" +madsim = "0.2.22" opendal = "0.44" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index f1c18bb39118..d4927a1595fc 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -43,6 +43,9 @@ pub enum ObjectErrorInner { #[error("Internal error: {0}")] #[construct(skip)] Internal(String), + #[cfg(madsim)] + #[error(transparent)] + Sim(#[from] crate::object::sim::SimError), } impl ObjectError { @@ -80,6 +83,10 @@ impl ObjectError { ObjectErrorInner::Mem(e) => { return e.is_object_not_found_error(); } + #[cfg(madsim)] + ObjectErrorInner::Sim(e) => { + return e.is_object_not_found_error(); + } _ => {} }; false @@ -108,4 +115,11 @@ impl From for ObjectError { } } +#[cfg(madsim)] +impl From for ObjectError { + fn from(e: std::io::Error) -> Self { + ObjectErrorInner::Internal(e.to_string()).into() + } +} + pub type ObjectResult = std::result::Result; diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index ab5abe0d0747..5399b6d253b2 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(madsim)] +pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; use std::time::Duration; @@ -39,6 +41,9 @@ pub use error::*; use object_metrics::ObjectStoreMetrics; use thiserror_ext::AsReport; +#[cfg(madsim)] +use self::sim::SimObjectStore; + pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; @@ -139,6 +144,8 @@ pub enum ObjectStoreImpl { InMem(MonitoredObjectStore), Opendal(MonitoredObjectStore), S3(MonitoredObjectStore), + #[cfg(madsim)] + Sim(MonitoredObjectStore), } macro_rules! dispatch_async { @@ -165,7 +172,12 @@ macro_rules! object_store_impl_method_body { ObjectStoreImpl::S3(s3) => { $dispatch_macro!(s3, $method_name, path $(, $args)*) }, + #[cfg(madsim)] + ObjectStoreImpl::Sim(in_mem) => { + $dispatch_macro!(in_mem, $method_name, path $(, $args)*) + }, } + } }; } @@ -179,6 +191,7 @@ macro_rules! object_store_impl_method_body_slice { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { { let paths_rem = partition_object_store_paths($paths); + match $object_store { ObjectStoreImpl::InMem(in_mem) => { $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) @@ -189,6 +202,10 @@ macro_rules! object_store_impl_method_body_slice { ObjectStoreImpl::S3(s3) => { $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) }, + #[cfg(madsim)] + ObjectStoreImpl::Sim(in_mem) => { + $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) + }, } } }; @@ -247,6 +264,8 @@ impl ObjectStoreImpl { ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), + #[cfg(madsim)] + ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id), } } @@ -257,6 +276,8 @@ impl ObjectStoreImpl { store.inner.op.info().native_capability().write_can_multi } ObjectStoreImpl::S3(_) => true, + #[cfg(madsim)] + ObjectStoreImpl::Sim(_) => true, } } @@ -265,6 +286,8 @@ impl ObjectStoreImpl { ObjectStoreImpl::InMem(store) => store.recv_buffer_size(), ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(), ObjectStoreImpl::S3(store) => store.recv_buffer_size(), + #[cfg(madsim)] + ObjectStoreImpl::Sim(store) => store.recv_buffer_size(), } } } @@ -896,6 +919,10 @@ pub async fn build_remote_object_store( } ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics)) } + #[cfg(madsim)] + sim if sim.starts_with("sim://") => { + ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics)) + } other => { unimplemented!( "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.", diff --git a/src/object_store/src/object/sim/client.rs b/src/object_store/src/object/sim/client.rs new file mode 100644 index 000000000000..4e7aa67834a5 --- /dev/null +++ b/src/object_store/src/object/sim/client.rs @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::net::SocketAddr; + +use madsim::net::{Endpoint, Payload}; + +use super::service::{Request, Response}; +use crate::object::error::ObjectResult as Result; + +#[derive(Debug, Clone)] +pub struct Client { + addr: SocketAddr, +} + +impl Client { + pub fn new(addr: SocketAddr) -> Self { + Self { addr } + } + + pub(crate) async fn send_request(&self, req: Request) -> Result { + let resp = self.send_request_io(req).await?; + let resp = *resp + .downcast::>() + .expect("failed to downcast"); + resp + } + + async fn send_request_io(&self, req: Request) -> std::io::Result { + let addr = self.addr; + let ep = Endpoint::connect(addr).await?; + let (tx, mut rx) = ep.connect1(addr).await?; + tx.send(Box::new(req)).await?; + rx.recv().await + } +} diff --git a/src/object_store/src/object/sim/error.rs b/src/object_store/src/object/sim/error.rs new file mode 100644 index 000000000000..29ca09f6825b --- /dev/null +++ b/src/object_store/src/object/sim/error.rs @@ -0,0 +1,39 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SimError { + #[error("NotFound error: {0}")] + NotFound(String), + #[error("Other error: {0}")] + Other(String), +} + +impl SimError { + pub fn is_object_not_found_error(&self) -> bool { + matches!(self, SimError::NotFound(_)) + } +} + +impl SimError { + pub(crate) fn not_found(msg: impl ToString) -> Self { + SimError::NotFound(msg.to_string()) + } + + pub(crate) fn other(msg: impl ToString) -> Self { + SimError::Other(msg.to_string()) + } +} diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs new file mode 100644 index 000000000000..31f34cbcd326 --- /dev/null +++ b/src/object_store/src/object/sim/mod.rs @@ -0,0 +1,254 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod client; +mod error; +use bytes::{BufMut, BytesMut}; +pub use error::SimError; +use futures::Stream; +mod rpc_server; +pub use rpc_server::SimServer; +mod service; + +use std::net::SocketAddr; +use std::ops::{Range, RangeBounds}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use madsim::rand::{thread_rng, RngCore}; +use madsim::time::{sleep, Duration}; +use risingwave_common::range::RangeBoundsExt; + +use self::client::Client; +use self::service::Response; +use super::{ + BoxedStreamingUploader, Bytes, ObjectDataStream, ObjectError, ObjectMetadata, + ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, +}; + +pub struct SimStreamingUploader { + client: Client, + key: String, + buf: BytesMut, +} + +impl SimStreamingUploader { + fn new(client: Client, key: String) -> Self { + Self { + client, + key, + buf: Default::default(), + } + } +} + +#[async_trait::async_trait] +impl StreamingUploader for SimStreamingUploader { + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { + self.buf.put(data); + Ok(()) + } + + async fn finish(mut self: Box) -> ObjectResult<()> { + if self.buf.is_empty() { + Err(ObjectError::internal("upload empty object")) + } else { + let resp = self + .client + .send_request(service::Request::Upload { + path: self.key, + obj: self.buf.freeze(), + }) + .await?; + let Response::Upload = resp else { + panic!("expect Response::Upload"); + }; + Ok(()) + } + } + + fn get_memory_usage(&self) -> u64 { + self.buf.len() as u64 + } +} + +pub struct SimDataIterator { + data: Bytes, + offset: usize, +} + +impl SimDataIterator { + pub fn new(data: Bytes) -> Self { + Self { data, offset: 0 } + } +} + +impl Stream for SimDataIterator { + type Item = ObjectResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + const MAX_PACKET_SIZE: usize = 128 * 1024; + if self.offset >= self.data.len() { + return Poll::Ready(None); + } + let read_len = std::cmp::min(self.data.len() - self.offset, MAX_PACKET_SIZE); + let data = self.data.slice(self.offset..(self.offset + read_len)); + self.offset += read_len; + Poll::Ready(Some(Ok(data))) + } +} + +pub struct SimObjectStore { + client: Client, +} + +#[async_trait::async_trait] +impl ObjectStore for SimObjectStore { + fn get_object_prefix(&self, _obj_id: u64) -> String { + String::default() + } + + async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { + if obj.is_empty() { + Err(ObjectError::internal("upload empty object")) + } else { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Upload { path, obj }) + .await?; + if let Response::Upload = resp { + Ok(()) + } else { + panic!("expect Response::Upload"); + } + } + } + + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(Box::new(SimStreamingUploader::new( + self.client.clone(), + path.to_string(), + ))) + } + + async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Read { path }) + .await?; + if let Response::Read(obj) = resp { + if let Some(end) = range.end() + && end > obj.len() + { + return Err(SimError::other("bad block offset and size").into()); + } + Ok(obj.slice(range)) + } else { + panic!("expect Response::Read"); + } + } + + async fn metadata(&self, path: &str) -> ObjectResult { + let path = path.to_string(); + if let Response::Metadata(m) = self + .client + .send_request(service::Request::Metadata { path }) + .await? + { + Ok(m) + } else { + panic!("expect Response::Metadata"); + } + } + + async fn streaming_read( + &self, + path: &str, + range: Range, + ) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Read { path }) + .await?; + let Response::Read(body) = resp else { + panic!("expect Response::Read"); + }; + + Ok(Box::pin(SimDataIterator::new(body.slice(range)))) + } + + async fn delete(&self, path: &str) -> ObjectResult<()> { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Delete { path }) + .await?; + if let Response::Delete = resp { + Ok(()) + } else { + panic!("expect Response::Delete"); + } + } + + async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { + let resp = self + .client + .send_request(service::Request::DeleteObjects { + paths: paths.to_vec(), + }) + .await?; + if let Response::DeleteObjects = resp { + Ok(()) + } else { + panic!("expect Response::DeleteObjects"); + } + } + + async fn list(&self, path: &str) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::List { path }) + .await?; + if let Response::List(o) = resp { + Ok(Box::pin(o)) + } else { + panic!("expect Response::List"); + } + } + + fn store_media_type(&self) -> &'static str { + "sim" + } +} + +impl SimObjectStore { + pub fn new(addr: &str) -> Self { + let addr = addr.strip_prefix("sim://").unwrap(); + let (_access_key_id, rest) = addr.split_once(':').unwrap(); + let (_secret_access_key, rest) = rest.split_once('@').unwrap(); + let (address, _bucket) = rest.split_once('/').unwrap(); + + Self { + client: Client::new( + address + .parse::() + .expect(&format!("parse SockAddr failed: {}", address)), + ), + } + } +} diff --git a/src/object_store/src/object/sim/rpc_server.rs b/src/object_store/src/object/sim/rpc_server.rs new file mode 100644 index 000000000000..068dbed5a27a --- /dev/null +++ b/src/object_store/src/object/sim/rpc_server.rs @@ -0,0 +1,57 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Result; +use std::net::SocketAddr; +use std::sync::Arc; + +use madsim::net::{Endpoint, Payload}; + +use super::service::{Request, SimService}; + +/// A simulated ObjectStore server. +#[derive(Default, Clone)] +pub struct SimServer {} + +impl SimServer { + pub fn builder() -> Self { + SimServer::default() + } + + pub async fn serve(self, addr: SocketAddr) -> Result<()> { + let ep = Endpoint::bind(addr).await?; + let service = SimService::new(); + let service = Arc::new(service); + + loop { + let (tx, mut rx, _) = ep.accept1().await?; + let service = service.clone(); + madsim::task::spawn(async move { + let request = *rx.recv().await?.downcast::().unwrap(); + use super::service::Request::*; + + let response: Payload = match request { + Upload { path, obj } => Box::new(service.upload(path, obj).await), + Read { path } => Box::new(service.read(path).await), + Delete { path } => Box::new(service.delete(path).await), + DeleteObjects { paths } => Box::new(service.delete_objects(paths).await), + List { path } => Box::new(service.list(path).await), + Metadata { path } => Box::new(service.metadata(path).await), + }; + tx.send(response).await?; + Ok(()) as Result<()> + }); + } + } +} diff --git a/src/object_store/src/object/sim/service.rs b/src/object_store/src/object/sim/service.rs new file mode 100644 index 000000000000..fb03a1b13c17 --- /dev/null +++ b/src/object_store/src/object/sim/service.rs @@ -0,0 +1,151 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, VecDeque}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use futures::Stream; +use itertools::Itertools; +use spin::Mutex; + +use crate::object::error::ObjectResult as Result; +use crate::object::sim::SimError; +use crate::object::{ObjectError, ObjectMetadata}; + +#[derive(Debug)] +pub(crate) enum Request { + Upload { path: String, obj: Bytes }, + Read { path: String }, + Delete { path: String }, + DeleteObjects { paths: Vec }, + List { path: String }, + Metadata { path: String }, +} + +#[derive(Debug)] +pub(crate) enum Response { + Upload, + Read(Bytes), + Delete, + DeleteObjects, + List(SimObjectIter), + Metadata(ObjectMetadata), +} + +#[derive(Debug)] +pub(crate) struct SimService { + storage: Mutex>, +} + +fn get_obj_meta(path: &str, obj: &Bytes) -> Result { + Ok(ObjectMetadata { + key: path.to_string(), + last_modified: SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(ObjectError::internal)? + .as_secs_f64(), + total_size: obj.len(), + }) +} + +impl SimService { + pub fn new() -> Self { + SimService { + storage: Mutex::new(BTreeMap::new()), + } + } + + pub async fn upload(&self, path: String, obj: Bytes) -> Result { + let metadata = get_obj_meta(&path, &obj)?; + self.storage.lock().insert(path.into(), (metadata, obj)); + Ok(Response::Upload) + } + + pub async fn read(&self, path: String) -> Result { + let storage = self.storage.lock(); + let obj = storage + .get(&path) + .map(|(_, o)| o) + .ok_or_else(|| SimError::NotFound(format!("no object at path '{}'", path)))?; + Ok(Response::Read(obj.clone())) + } + + pub async fn delete(&self, path: String) -> Result { + self.storage.lock().remove(&path); + Ok(Response::Delete) + } + + pub async fn delete_objects(&self, paths: Vec) -> Result { + for path in paths { + self.storage.lock().remove(&path); + } + Ok(Response::DeleteObjects) + } + + pub async fn list(&self, prefix: String) -> Result { + if prefix.is_empty() { + return Ok(Response::List(SimObjectIter::new(vec![]))); + } + + let mut scan_end = prefix.chars().collect_vec(); + let next = *scan_end.last().unwrap() as u8 + 1; + *scan_end.last_mut().unwrap() = next as char; + let scan_end = scan_end.into_iter().collect::(); + + let list_result = self + .storage + .lock() + .range(prefix..scan_end) + .map(|(_, (o, _))| o.clone()) + .collect_vec(); + Ok(Response::List(SimObjectIter::new(list_result))) + } + + pub async fn metadata(&self, path: String) -> Result { + self.storage + .lock() + .get(&path) + .map(|(metadata, _)| metadata) + .cloned() + .ok_or_else(|| SimError::not_found(format!("no object at path '{}'", path)).into()) + .map(|meta| Response::Metadata(meta)) + } +} + +#[derive(Debug)] +pub(crate) struct SimObjectIter { + list_result: VecDeque, +} + +impl SimObjectIter { + fn new(list_result: Vec) -> Self { + Self { + list_result: list_result.into(), + } + } +} + +impl Stream for SimObjectIter { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Some(i) = self.list_result.pop_front() { + return Poll::Ready(Some(Ok(i))); + } + Poll::Ready(None) + } +} diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 6ce0c7ed0523..dbde0d5d4665 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -42,6 +42,7 @@ risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_meta_node = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index a81a770bf0bd..dd591954d945 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -32,6 +32,8 @@ use itertools::Itertools; use madsim::runtime::{Handle, NodeHandle}; use rand::seq::IteratorRandom; use rand::Rng; +#[cfg(madsim)] +use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer; use risingwave_pb::common::WorkerNode; use sqllogictest::AsyncDB; #[cfg(not(madsim))] @@ -297,18 +299,18 @@ metrics_level = "Disabled" /// /// # Nodes /// -/// | Name | IP | -/// | -------------- | ------------- | -/// | meta-x | 192.168.1.x | -/// | frontend-x | 192.168.2.x | -/// | compute-x | 192.168.3.x | -/// | compactor-x | 192.168.4.x | -/// | etcd | 192.168.10.1 | -/// | kafka-broker | 192.168.11.1 | -/// | kafka-producer | 192.168.11.2 | -/// | s3 | 192.168.12.1 | -/// | client | 192.168.100.1 | -/// | ctl | 192.168.101.1 | +/// | Name | IP | +/// | ---------------- | ------------- | +/// | meta-x | 192.168.1.x | +/// | frontend-x | 192.168.2.x | +/// | compute-x | 192.168.3.x | +/// | compactor-x | 192.168.4.x | +/// | etcd | 192.168.10.1 | +/// | kafka-broker | 192.168.11.1 | +/// | kafka-producer | 192.168.11.2 | +/// | object_store_sim | 192.168.12.1 | +/// | client | 192.168.100.1 | +/// | ctl | 192.168.101.1 | pub struct Cluster { config: Configuration, handle: Handle, @@ -385,14 +387,13 @@ impl Cluster { }) .build(); - // s3 + // object_store_sim handle .create_node() - .name("s3") + .name("object_store_sim") .ip("192.168.12.1".parse().unwrap()) .init(move || async move { - aws_sdk_s3::server::SimServer::default() - .with_bucket("hummock001") + ObjectStoreSimServer::builder() .serve("0.0.0.0:9301".parse().unwrap()) .await }) @@ -422,7 +423,7 @@ impl Cluster { "--etcd-endpoints", "etcd:2388", "--state-store", - "hummock+minio://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", + "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", "--data-directory", "hummock_001", ]);