Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object store 0.10.2 debug #54

Open
wants to merge 19 commits into
base: object-store-0.10.2-cx1
Choose a base branch
from
34 changes: 29 additions & 5 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
@@ -33,6 +33,8 @@ use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::SystemTime;
use std::{sync::Arc, time::Duration};
use url::Url;

@@ -215,7 +217,7 @@ impl ObjectStore for AmazonS3 {
let upload_id = self.client.create_multipart(location, opts).await?;

Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
part_idx: AtomicUsize::new(0),
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
@@ -319,7 +321,7 @@ impl ObjectStore for AmazonS3 {

#[derive(Debug)]
struct S3MultiPartUpload {
part_idx: usize,
part_idx: AtomicUsize,
state: Arc<UploadState>,
}

@@ -334,21 +336,38 @@ struct UploadState {
#[async_trait]
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros();
let idx = self.part_idx.fetch_add(1, Ordering::AcqRel);
let len = data.content_length();
println!(
"uploading part: {}, location: {:?}, size: {}, upload_id: {}, timestamp: {}",
idx, self.state.location, len, self.state.upload_id, now
);
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.await?;
state.parts.put(idx, part);
println!(
"uploaded part: {}, location: {:?}, upload_id: {}, size: {}",
idx, state.location, state.upload_id, len
);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
let idx = self.part_idx.load(Ordering::Acquire);
let parts = self.state.parts.finish(idx)?;
println!(
"completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}",
self.state.upload_id, idx, self.state.location, parts
);

self.state
.client
@@ -357,6 +376,11 @@ impl MultipartUpload for S3MultiPartUpload {
}

async fn abort(&mut self) -> Result<()> {
let idx = self.part_idx.load(Ordering::Acquire);
println!(
"aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}",
self.state.upload_id, idx, self.state.location
);
self.state
.client
.request(Method::DELETE, &self.state.location)
3 changes: 3 additions & 0 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
@@ -363,6 +363,7 @@ impl AsyncWrite for BufWriter {
) -> Poll<Result<usize, Error>> {
let cap = self.capacity;
let max_concurrency = self.max_concurrency;
println!("BufWriter::poll_write writing chunk of {} bytes", buf.len());
loop {
return match &mut self.state {
BufWriterState::Write(Some(write)) => {
@@ -390,6 +391,7 @@ impl AsyncWrite for BufWriter {
let upload = store.put_multipart_opts(&path, opts).await?;
let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
for chunk in buffer.freeze() {
println!("putting {} bytes", chunk.len());
chunked.put(chunk);
}
Ok(chunked)
@@ -417,6 +419,7 @@ impl AsyncWrite for BufWriter {
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
println!("BufWriter::poll_shutdown");
loop {
match &mut self.state {
BufWriterState::Prepare(f) => {
33 changes: 30 additions & 3 deletions object_store/src/upload.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
// under the License.

use std::task::{Context, Poll};
use std::time::SystemTime;

use crate::{PutPayload, PutPayloadMut, PutResult, Result};
use async_trait::async_trait;
@@ -154,6 +155,10 @@ impl WriteMultipart {
while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
ready!(self.tasks.poll_join_next(cx)).unwrap()??
}
println!(
"WriteMultipart::poll_for_capacity ready for {}",
max_concurrency
);
Poll::Ready(Ok(()))
}

@@ -176,6 +181,7 @@ impl WriteMultipart {
/// Back pressure can optionally be applied to producers by calling
/// [`Self::wait_for_capacity`] prior to calling this method
pub fn write(&mut self, mut buf: &[u8]) {
let len = buf.len();
while !buf.is_empty() {
let remaining = self.chunk_size - self.buffer.content_length();
let to_read = buf.len().min(remaining);
@@ -186,6 +192,7 @@ impl WriteMultipart {
}
buf = &buf[to_read..]
}
println!("WriteMultipart::write write chunk of {} bytes", len);
}

/// Put a chunk of data into this [`WriteMultipart`] without copying
@@ -196,6 +203,7 @@ impl WriteMultipart {
///
/// See [`Self::write`] for information on backpressure
pub fn put(&mut self, mut bytes: Bytes) {
let len = bytes.len();
while !bytes.is_empty() {
let remaining = self.chunk_size - self.buffer.content_length();
if bytes.len() < remaining {
@@ -206,10 +214,21 @@ impl WriteMultipart {
let buffer = std::mem::take(&mut self.buffer);
self.put_part(buffer.into())
}
println!("WriteMultipart::put put chunk of {} bytes", len);
}

pub(crate) fn put_part(&mut self, part: PutPayload) {
self.tasks.spawn(self.upload.put_part(part));
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros();
let len = part.content_length();
let fut = self.upload.put_part(part);
self.tasks.spawn(fut);
println!(
"WriteMultipart::put_part spawned task for part of size: {}, time: {}",
len, now
);
}

/// Abort this upload, attempting to clean up any successfully uploaded parts
@@ -221,8 +240,13 @@ impl WriteMultipart {
/// Flush final chunk, and await completion of all in-flight requests
pub async fn finish(mut self) -> Result<PutResult> {
if !self.buffer.is_empty() {
let len = self.buffer.content_length();
let part = std::mem::take(&mut self.buffer);
self.put_part(part.into())
self.put_part(part.into());
println!(
"WriteMultipart::finish: flushing final chunk of {} bytes",
len
);
}

self.wait_for_capacity(0).await?;
@@ -233,7 +257,10 @@ impl WriteMultipart {
self.upload.abort().await?;
Err(e)
}
Ok(result) => Ok(result),
Ok(result) => {
println!("WriteMultipart::finish: done");
Ok(result)
}
}
}
}