Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(services/aliyun-drive): directly implement oio::Write. #4821

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions core/src/services/aliyun_drive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use super::error::parse_error;
use super::lister::AliyunDriveLister;
use super::lister::AliyunDriveParent;
use super::writer::AliyunDriveWriter;
use super::writer::AliyunDriveWriters;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -253,7 +252,7 @@ pub struct AliyunDriveBackend {

impl Access for AliyunDriveBackend {
type Reader = HttpBody;
type Writer = AliyunDriveWriters;
type Writer = AliyunDriveWriter;
type Lister = oio::PageLister<AliyunDriveLister>;
type BlockingReader = ();
type BlockingWriter = ();
Expand Down Expand Up @@ -478,13 +477,9 @@ impl Access for AliyunDriveBackend {
}
};

let executor = args.executor().cloned();

let writer =
AliyunDriveWriter::new(self.core.clone(), &parent_file_id, get_basename(path), args);

let w = oio::MultipartWriter::new(writer, executor, 1);

Ok((RpWrite::default(), w))
Ok((RpWrite::default(), writer))
}
}
1 change: 0 additions & 1 deletion core/src/services/aliyun_drive/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,6 @@ pub struct UploadUrlResponse {
pub struct CreateResponse {
pub file_id: String,
pub upload_id: Option<String>,
pub part_info_list: Option<Vec<PartInfo>>,
pub exist: Option<bool>,
}

Expand Down
202 changes: 61 additions & 141 deletions core/src/services/aliyun_drive/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ use crate::{
};
use bytes::Buf;
use std::sync::Arc;
use tokio::sync::RwLock;

pub type AliyunDriveWriters = oio::MultipartWriter<AliyunDriveWriter>;

pub struct AliyunDriveWriter {
core: Arc<AliyunDriveCore>,
Expand All @@ -34,7 +31,9 @@ pub struct AliyunDriveWriter {
parent_file_id: String,
name: String,

file_id: Arc<RwLock<Option<String>>>,
file_id: Option<String>,
upload_id: Option<String>,
part_number: usize,
}

impl AliyunDriveWriter {
Expand All @@ -44,162 +43,83 @@ impl AliyunDriveWriter {
_op: op,
parent_file_id: parent_file_id.to_string(),
name: name.to_string(),
file_id: Arc::new(RwLock::new(None)),
file_id: None,
upload_id: None,
part_number: 1, // must start from 1
}
}
}

async fn write_file_id(&self, id: String) {
let mut file_id = self.file_id.write().await;

*file_id = Some(id);
}

async fn read_file_id(&self) -> Result<String> {
let file_id = self.file_id.read().await;
let Some(ref file_id) = *file_id else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find file_id"));
};

Ok(file_id.clone())
}

async fn write(
&self,
body: Option<Buffer>,
upload_url: Option<&str>,
) -> Result<Option<String>> {
if let Some(upload_url) = upload_url {
let Some(body) = body else {
return Err(Error::new(
ErrorKind::Unexpected,
"cannot upload without body",
));
};
if let Err(err) = self.core.upload(upload_url, body).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
};
return Ok(None);
}

let res = self
.core
.create(
Some(&self.parent_file_id),
&self.name,
CreateType::File,
CheckNameMode::Refuse,
)
.await;

let res = match res {
Err(err) if err.kind() == ErrorKind::IsSameFile => {
return Ok(None);
}
Err(err) => {
return Err(err);
}
Ok(res) => res,
};

let output: CreateResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
self.write_file_id(output.file_id).await;
if output.exist.is_some_and(|x| x) {
return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
}

if output.upload_id.is_some() {
if let Some(body) = body {
let Some(part_info_list) = output.part_info_list else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
};
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
if let Err(err) = self.core.upload(&part_info_list[0].upload_url, body).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
impl oio::Write for AliyunDriveWriter {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) {
(Some(upload_id), Some(file_id)) => (upload_id, file_id),
_ => {
let res = self
.core
.create(
Some(&self.parent_file_id),
&self.name,
CreateType::File,
CheckNameMode::Refuse,
)
.await?;
let output: CreateResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
if output.exist.is_some_and(|x| x) {
return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
}
self.upload_id = output.upload_id;
self.file_id = Some(output.file_id);
(
self.upload_id.as_ref().expect("cannot find upload_id"),
self.file_id.as_ref().expect("cannot find file_id"),
)
}
}

Ok(output.upload_id)
}

async fn complete(&self, upload_id: &str) -> Result<Buffer> {
let file_id = self.read_file_id().await?;

self.core.complete(&file_id, upload_id).await
}

async fn delete(&self) -> Result<()> {
let file_id = self.read_file_id().await?;

self.core.delete_path(&file_id).await
}
}

impl oio::MultipartWrite for AliyunDriveWriter {
async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
let upload_id = self.initiate_part().await?;
self.write_part(&upload_id, 0, size, body).await?;

self.complete(&upload_id).await?;
Ok(())
}

async fn initiate_part(&self) -> Result<String> {
let Some(upload_id) = self.write(None, None).await? else {
return Err(Error::new(ErrorKind::Unsupported, "cannot find upload_id"));
};

Ok(upload_id)
}

async fn write_part(
&self,
upload_id: &str,
part_number: usize,
_size: u64,
body: crate::Buffer,
) -> Result<oio::MultipartPart> {
let file_id = self.read_file_id().await?;
let res = self
.core
.get_upload_url(&file_id, upload_id, Some(part_number + 1))
.get_upload_url(file_id, upload_id, Some(self.part_number))
.await?;
let output: UploadUrlResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;

let Some(part_info_list) = output.part_info_list else {
return Err(Error::new(
ErrorKind::Unexpected,
"cannot find part_info_list",
));
};
if part_info_list.is_empty() {
let Some(upload_url) = output
.part_info_list
.as_ref()
.and_then(|list| list.first())
.map(|part_info| &part_info.upload_url)
else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
self.write(Some(body), Some(&part_info_list[0].upload_url))
.await?;
};

Ok(oio::MultipartPart {
part_number,
etag: part_info_list[0].etag.clone().unwrap_or("".to_string()),
checksum: None,
})
}
let size = bs.len();

async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> {
self.complete(upload_id).await?;
if let Err(err) = self.core.upload(upload_url, bs).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
};

self.part_number += 1;

Ok(size)
}

async fn close(&mut self) -> Result<()> {
let (Some(upload_id), Some(file_id)) = (self.upload_id.as_ref(), self.file_id.as_ref())
else {
return Ok(());
};
self.core.complete(file_id, upload_id).await?;
Ok(())
}

async fn abort_part(&self, _upload_id: &str) -> Result<()> {
self.delete().await
async fn abort(&mut self) -> Result<()> {
let Some(file_id) = self.file_id.as_ref() else {
return Ok(());
};
self.core.delete_path(file_id).await
}
}
Loading