Skip to content

Commit

Permalink
refactor(services/aliyun-drive): directly implement oio::Write.
Browse files Browse the repository at this point in the history
Signed-off-by: Hanchin Hsieh <[email protected]>
  • Loading branch information
yuchanns committed Jun 30, 2024
1 parent a58e802 commit 49db9bd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 142 deletions.
9 changes: 2 additions & 7 deletions core/src/services/aliyun_drive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use super::error::parse_error;
use super::lister::AliyunDriveLister;
use super::lister::AliyunDriveParent;
use super::writer::AliyunDriveWriter;
use super::writer::AliyunDriveWriters;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -253,7 +252,7 @@ pub struct AliyunDriveBackend {

impl Access for AliyunDriveBackend {
type Reader = HttpBody;
type Writer = AliyunDriveWriters;
type Writer = AliyunDriveWriter;
type Lister = oio::PageLister<AliyunDriveLister>;
type BlockingReader = ();
type BlockingWriter = ();
Expand Down Expand Up @@ -478,13 +477,9 @@ impl Access for AliyunDriveBackend {
}
};

let executor = args.executor().cloned();

let writer =
AliyunDriveWriter::new(self.core.clone(), &parent_file_id, get_basename(path), args);

let w = oio::MultipartWriter::new(writer, executor, 1);

Ok((RpWrite::default(), w))
Ok((RpWrite::default(), writer))
}
}
1 change: 0 additions & 1 deletion core/src/services/aliyun_drive/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,6 @@ pub struct UploadUrlResponse {
pub struct CreateResponse {
pub file_id: String,
pub upload_id: Option<String>,
pub part_info_list: Option<Vec<PartInfo>>,
pub exist: Option<bool>,
}

Expand Down
194 changes: 60 additions & 134 deletions core/src/services/aliyun_drive/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ use crate::{
};
use bytes::Buf;
use std::sync::Arc;
use tokio::sync::RwLock;

pub type AliyunDriveWriters = oio::MultipartWriter<AliyunDriveWriter>;

pub struct AliyunDriveWriter {
core: Arc<AliyunDriveCore>,
Expand All @@ -34,7 +31,9 @@ pub struct AliyunDriveWriter {
parent_file_id: String,
name: String,

file_id: Arc<RwLock<Option<String>>>,
file_id: Option<String>,
upload_id: Option<String>,
part_number: usize,
}

impl AliyunDriveWriter {
Expand All @@ -44,162 +43,89 @@ impl AliyunDriveWriter {
_op: op,
parent_file_id: parent_file_id.to_string(),
name: name.to_string(),
file_id: Arc::new(RwLock::new(None)),
file_id: None,
upload_id: None,
part_number: 1, // must start from 1
}
}

async fn write_file_id(&self, id: String) {
let mut file_id = self.file_id.write().await;

*file_id = Some(id);
}

async fn read_file_id(&self) -> Result<String> {
let file_id = self.file_id.read().await;
let Some(ref file_id) = *file_id else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find file_id"));
};

Ok(file_id.clone())
}

async fn write(
&self,
body: Option<Buffer>,
upload_url: Option<&str>,
) -> Result<Option<String>> {
if let Some(upload_url) = upload_url {
let Some(body) = body else {
async fn get_upload_info(&mut self, do_not_create: bool) -> Result<(String, String)> {
let (upload_id, file_id) = match (self.upload_id.clone(), self.file_id.clone()) {
(Some(upload_id), Some(file_id)) => (upload_id, file_id),
_ if do_not_create => {
return Err(Error::new(
ErrorKind::Unexpected,
"cannot upload without body",
"cannot find upload_id and file_id",
));
};
if let Err(err) = self.core.upload(upload_url, body).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
};
return Ok(None);
}

let res = self
.core
.create(
Some(&self.parent_file_id),
&self.name,
CreateType::File,
CheckNameMode::Refuse,
)
.await;

let res = match res {
Err(err) if err.kind() == ErrorKind::IsSameFile => {
return Ok(None);
}
Err(err) => {
return Err(err);
}
Ok(res) => res,
};

let output: CreateResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
self.write_file_id(output.file_id).await;
if output.exist.is_some_and(|x| x) {
return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
}

if output.upload_id.is_some() {
if let Some(body) = body {
let Some(part_info_list) = output.part_info_list else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
};
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
if let Err(err) = self.core.upload(&part_info_list[0].upload_url, body).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
_ => {
let res = self
.core
.create(
Some(&self.parent_file_id),
&self.name,
CreateType::File,
CheckNameMode::Refuse,
)
.await?;
let output: CreateResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
if output.exist.is_some_and(|x| x) {
return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
}
let upload_id = output.upload_id.expect("cannot find upload_id");
let file_id = output.file_id;
self.upload_id = Some(upload_id.clone());
self.file_id = Some(file_id.clone());
(upload_id, file_id)
}
}

Ok(output.upload_id)
}

async fn complete(&self, upload_id: &str) -> Result<Buffer> {
let file_id = self.read_file_id().await?;

self.core.complete(&file_id, upload_id).await
}

async fn delete(&self) -> Result<()> {
let file_id = self.read_file_id().await?;
};

self.core.delete_path(&file_id).await
Ok((upload_id, file_id))
}
}

impl oio::MultipartWrite for AliyunDriveWriter {
async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
let upload_id = self.initiate_part().await?;
self.write_part(&upload_id, 0, size, body).await?;

self.complete(&upload_id).await?;
Ok(())
}

async fn initiate_part(&self) -> Result<String> {
let Some(upload_id) = self.write(None, None).await? else {
return Err(Error::new(ErrorKind::Unsupported, "cannot find upload_id"));
};

Ok(upload_id)
}
impl oio::Write for AliyunDriveWriter {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let (upload_id, file_id) = self.get_upload_info(false).await?;

async fn write_part(
&self,
upload_id: &str,
part_number: usize,
_size: u64,
body: crate::Buffer,
) -> Result<oio::MultipartPart> {
let file_id = self.read_file_id().await?;
let res = self
.core
.get_upload_url(&file_id, upload_id, Some(part_number + 1))
.get_upload_url(&file_id, &upload_id, Some(self.part_number))
.await?;
let output: UploadUrlResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;

let Some(part_info_list) = output.part_info_list else {
return Err(Error::new(
ErrorKind::Unexpected,
"cannot find part_info_list",
));
let upload_url = match output.part_info_list {
Some(part_info_list) if !part_info_list.is_empty() => {
part_info_list[0].upload_url.to_owned()
}
_ => {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
};
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
self.write(Some(body), Some(&part_info_list[0].upload_url))
.await?;

Ok(oio::MultipartPart {
part_number,
etag: part_info_list[0].etag.clone().unwrap_or("".to_string()),
checksum: None,
})
}
let size = bs.len();

if let Err(err) = self.core.upload(&upload_url, bs).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
};

async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> {
self.complete(upload_id).await?;
self.part_number += 1;

Ok(size)
}

async fn close(&mut self) -> Result<()> {
let (upload_id, file_id) = self.get_upload_info(true).await?;
self.core.complete(&file_id, &upload_id).await?;
Ok(())
}

async fn abort_part(&self, _upload_id: &str) -> Result<()> {
self.delete().await
async fn abort(&mut self) -> Result<()> {
let (_, file_id) = self.get_upload_info(true).await?;
self.core.delete_path(&file_id).await
}
}

0 comments on commit 49db9bd

Please sign in to comment.