From 54daa9007ef0867d4a7debf73d35411ec7527cfa Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Thu, 18 Jan 2024 18:08:03 +0800 Subject: [PATCH] fix stream read Signed-off-by: Kevin Axel --- src/object_store/src/object/mod.rs | 1 - src/object_store/src/object/sim/mod.rs | 16 +++++++++------- src/tests/simulation/src/cluster.rs | 4 +--- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index e43d349efef1e..b59ed5067bc34 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -922,7 +922,6 @@ pub async fn build_remote_object_store( sim if sim.starts_with("sim://") => { ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics)) } - other => { unimplemented!( "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.", diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index a747c84026dca..d49e87cc6641a 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -22,7 +22,7 @@ pub use rpc_server::SimServer; mod service; use std::net::SocketAddr; -use std::ops::Range; +use std::ops::{Range, RangeBounds}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -54,9 +54,7 @@ impl SimStreamingUploader { #[async_trait::async_trait] impl StreamingUploader for SimStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { - let data_len = data.len(); self.buf.put(data); - Ok(()) } @@ -188,7 +186,7 @@ impl ObjectStore for SimObjectStore { return Err(SimError::other("expect Response::Read").into()); }; - Ok(Box::pin(SimDataIterator::new(body))) + Ok(Box::pin(SimDataIterator::new(body.slice(range)))) } async fn delete(&self, path: &str) -> ObjectResult<()> { @@ -237,11 +235,15 @@ impl ObjectStore for SimObjectStore { impl SimObjectStore { pub fn new(addr: &str) -> Self { let addr = addr.strip_prefix("sim://").unwrap(); - let (addr, _bucket) = addr.split_once('/').unwrap(); + let (_access_key_id, rest) = addr.split_once(':').unwrap(); + let (_secret_access_key, rest) = rest.split_once('@').unwrap(); + let (address, _bucket) = rest.split_once('/').unwrap(); + Self { client: Client::new( - addr.parse::() - .expect(&format!("parse SockAddr failed: {}", addr)), + address + .parse::() + .expect(&format!("parse SockAddr failed: {}", address)), ), } } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index ca9a7cce2fd42..a5c8fdc08987f 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -386,7 +386,6 @@ impl Cluster { .build(); // object_store_sim - #[cfg(madsim)] handle .create_node() .name("object_store_sim") @@ -422,8 +421,7 @@ impl Cluster { "--etcd-endpoints", "etcd:2388", "--state-store", - // "hummock+minio://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", - "hummock+sim://192.168.12.1:9301/hummock001", + "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", "--data-directory", "hummock_001", ]);