Skip to content

Commit

Permalink
fix stream read
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Axel <[email protected]>
  • Loading branch information
KveinAxel committed Jan 18, 2024
1 parent d4fabe6 commit 54daa90
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
1 change: 0 additions & 1 deletion src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,6 @@ pub async fn build_remote_object_store(
sim if sim.starts_with("sim://") => {
ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics))
}

other => {
unimplemented!(
"{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.",
Expand Down
16 changes: 9 additions & 7 deletions src/object_store/src/object/sim/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use rpc_server::SimServer;
mod service;

use std::net::SocketAddr;
use std::ops::Range;
use std::ops::{Range, RangeBounds};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -54,9 +54,7 @@ impl SimStreamingUploader {
#[async_trait::async_trait]
impl StreamingUploader for SimStreamingUploader {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
let data_len = data.len();
self.buf.put(data);

Ok(())
}

Expand Down Expand Up @@ -188,7 +186,7 @@ impl ObjectStore for SimObjectStore {
return Err(SimError::other("expect Response::Read").into());
};

Ok(Box::pin(SimDataIterator::new(body)))
Ok(Box::pin(SimDataIterator::new(body.slice(range))))
}

async fn delete(&self, path: &str) -> ObjectResult<()> {
Expand Down Expand Up @@ -237,11 +235,15 @@ impl ObjectStore for SimObjectStore {
impl SimObjectStore {
pub fn new(addr: &str) -> Self {
let addr = addr.strip_prefix("sim://").unwrap();
let (addr, _bucket) = addr.split_once('/').unwrap();
let (_access_key_id, rest) = addr.split_once(':').unwrap();
let (_secret_access_key, rest) = rest.split_once('@').unwrap();
let (address, _bucket) = rest.split_once('/').unwrap();

Self {
client: Client::new(
addr.parse::<SocketAddr>()
.expect(&format!("parse SockAddr failed: {}", addr)),
address
.parse::<SocketAddr>()
.expect(&format!("parse SockAddr failed: {}", address)),
),
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ impl Cluster {
.build();

// object_store_sim
#[cfg(madsim)]
handle
.create_node()
.name("object_store_sim")
Expand Down Expand Up @@ -422,8 +421,7 @@ impl Cluster {
"--etcd-endpoints",
"etcd:2388",
"--state-store",
// "hummock+minio://hummockadmin:[email protected]:9301/hummock001",
"hummock+sim://192.168.12.1:9301/hummock001",
"hummock+sim://hummockadmin:[email protected]:9301/hummock001",
"--data-directory",
"hummock_001",
]);
Expand Down

0 comments on commit 54daa90

Please sign in to comment.