From 229f930823aea32bb4808805306873ac543ebbc3 Mon Sep 17 00:00:00 2001 From: yuchanns Date: Tue, 2 Jul 2024 12:49:42 +0800 Subject: [PATCH] refactor(services/aliyun-drive): directly implement `oio::Write`. Signed-off-by: Hanchin Hsieh --- core/src/services/aliyun_drive/backend.rs | 9 +- core/src/services/aliyun_drive/core.rs | 1 - core/src/services/aliyun_drive/writer.rs | 202 +++++++--------------- 3 files changed, 63 insertions(+), 149 deletions(-) diff --git a/core/src/services/aliyun_drive/backend.rs b/core/src/services/aliyun_drive/backend.rs index 545dec2dca84..b04c50621e3e 100644 --- a/core/src/services/aliyun_drive/backend.rs +++ b/core/src/services/aliyun_drive/backend.rs @@ -34,7 +34,6 @@ use super::error::parse_error; use super::lister::AliyunDriveLister; use super::lister::AliyunDriveParent; use super::writer::AliyunDriveWriter; -use super::writer::AliyunDriveWriters; use crate::raw::*; use crate::*; @@ -253,7 +252,7 @@ pub struct AliyunDriveBackend { impl Access for AliyunDriveBackend { type Reader = HttpBody; - type Writer = AliyunDriveWriters; + type Writer = AliyunDriveWriter; type Lister = oio::PageLister; type BlockingReader = (); type BlockingWriter = (); @@ -478,13 +477,9 @@ impl Access for AliyunDriveBackend { } }; - let executor = args.executor().cloned(); - let writer = AliyunDriveWriter::new(self.core.clone(), &parent_file_id, get_basename(path), args); - let w = oio::MultipartWriter::new(writer, executor, 1); - - Ok((RpWrite::default(), w)) + Ok((RpWrite::default(), writer)) } } diff --git a/core/src/services/aliyun_drive/core.rs b/core/src/services/aliyun_drive/core.rs index f6397edbdda8..79e2d7a4ff90 100644 --- a/core/src/services/aliyun_drive/core.rs +++ b/core/src/services/aliyun_drive/core.rs @@ -488,7 +488,6 @@ pub struct UploadUrlResponse { pub struct CreateResponse { pub file_id: String, pub upload_id: Option, - pub part_info_list: Option>, pub exist: Option, } diff --git a/core/src/services/aliyun_drive/writer.rs b/core/src/services/aliyun_drive/writer.rs index c1a3efa947e0..30ca2ef94e2e 100644 --- a/core/src/services/aliyun_drive/writer.rs +++ b/core/src/services/aliyun_drive/writer.rs @@ -23,9 +23,6 @@ use crate::{ }; use bytes::Buf; use std::sync::Arc; -use tokio::sync::RwLock; - -pub type AliyunDriveWriters = oio::MultipartWriter; pub struct AliyunDriveWriter { core: Arc, @@ -34,7 +31,9 @@ pub struct AliyunDriveWriter { parent_file_id: String, name: String, - file_id: Arc>>, + file_id: Option, + upload_id: Option, + part_number: usize, } impl AliyunDriveWriter { @@ -44,162 +43,83 @@ impl AliyunDriveWriter { _op: op, parent_file_id: parent_file_id.to_string(), name: name.to_string(), - file_id: Arc::new(RwLock::new(None)), + file_id: None, + upload_id: None, + part_number: 1, // must start from 1 } } +} - async fn write_file_id(&self, id: String) { - let mut file_id = self.file_id.write().await; - - *file_id = Some(id); - } - - async fn read_file_id(&self) -> Result { - let file_id = self.file_id.read().await; - let Some(ref file_id) = *file_id else { - return Err(Error::new(ErrorKind::Unexpected, "cannot find file_id")); - }; - - Ok(file_id.clone()) - } - - async fn write( - &self, - body: Option, - upload_url: Option<&str>, - ) -> Result> { - if let Some(upload_url) = upload_url { - let Some(body) = body else { - return Err(Error::new( - ErrorKind::Unexpected, - "cannot upload without body", - )); - }; - if let Err(err) = self.core.upload(upload_url, body).await { - if err.kind() != ErrorKind::AlreadyExists { - return Err(err); - } - }; - return Ok(None); - } - - let res = self - .core - .create( - Some(&self.parent_file_id), - &self.name, - CreateType::File, - CheckNameMode::Refuse, - ) - .await; - - let res = match res { - Err(err) if err.kind() == ErrorKind::IsSameFile => { - return Ok(None); - } - Err(err) => { - return Err(err); - } - Ok(res) => res, - }; - - let output: CreateResponse = - serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?; - self.write_file_id(output.file_id).await; - if output.exist.is_some_and(|x| x) { - return Err(Error::new(ErrorKind::AlreadyExists, "file exists")); - } - - if output.upload_id.is_some() { - if let Some(body) = body { - let Some(part_info_list) = output.part_info_list else { - return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); - }; - if part_info_list.is_empty() { - return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); - } - if let Err(err) = self.core.upload(&part_info_list[0].upload_url, body).await { - if err.kind() != ErrorKind::AlreadyExists { - return Err(err); - } +impl oio::Write for AliyunDriveWriter { + async fn write(&mut self, bs: Buffer) -> Result { + let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) { + (Some(upload_id), Some(file_id)) => (upload_id, file_id), + _ => { + let res = self + .core + .create( + Some(&self.parent_file_id), + &self.name, + CreateType::File, + CheckNameMode::Refuse, + ) + .await?; + let output: CreateResponse = + serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?; + if output.exist.is_some_and(|x| x) { + return Err(Error::new(ErrorKind::AlreadyExists, "file exists")); } + self.upload_id = output.upload_id; + self.file_id = Some(output.file_id); + ( + self.upload_id.as_ref().expect("cannot find upload_id"), + self.file_id.as_ref().expect("cannot find file_id"), + ) } - } - - Ok(output.upload_id) - } - - async fn complete(&self, upload_id: &str) -> Result { - let file_id = self.read_file_id().await?; - - self.core.complete(&file_id, upload_id).await - } - - async fn delete(&self) -> Result<()> { - let file_id = self.read_file_id().await?; - - self.core.delete_path(&file_id).await - } -} - -impl oio::MultipartWrite for AliyunDriveWriter { - async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> { - let upload_id = self.initiate_part().await?; - self.write_part(&upload_id, 0, size, body).await?; - - self.complete(&upload_id).await?; - Ok(()) - } - - async fn initiate_part(&self) -> Result { - let Some(upload_id) = self.write(None, None).await? else { - return Err(Error::new(ErrorKind::Unsupported, "cannot find upload_id")); }; - Ok(upload_id) - } - - async fn write_part( - &self, - upload_id: &str, - part_number: usize, - _size: u64, - body: crate::Buffer, - ) -> Result { - let file_id = self.read_file_id().await?; let res = self .core - .get_upload_url(&file_id, upload_id, Some(part_number + 1)) + .get_upload_url(file_id, upload_id, Some(self.part_number)) .await?; let output: UploadUrlResponse = serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?; - let Some(part_info_list) = output.part_info_list else { - return Err(Error::new( - ErrorKind::Unexpected, - "cannot find part_info_list", - )); - }; - if part_info_list.is_empty() { + let Some(upload_url) = output + .part_info_list + .as_ref() + .and_then(|list| list.first()) + .map(|part_info| &part_info.upload_url) + else { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); - } - self.write(Some(body), Some(&part_info_list[0].upload_url)) - .await?; + }; - Ok(oio::MultipartPart { - part_number, - etag: part_info_list[0].etag.clone().unwrap_or("".to_string()), - checksum: None, - }) - } + let size = bs.len(); - async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> { - self.complete(upload_id).await?; + if let Err(err) = self.core.upload(upload_url, bs).await { + if err.kind() != ErrorKind::AlreadyExists { + return Err(err); + } + }; + + self.part_number += 1; + + Ok(size) + } + async fn close(&mut self) -> Result<()> { + let (Some(upload_id), Some(file_id)) = (self.upload_id.as_ref(), self.file_id.as_ref()) + else { + return Ok(()); + }; + self.core.complete(file_id, upload_id).await?; Ok(()) } - async fn abort_part(&self, _upload_id: &str) -> Result<()> { - self.delete().await + async fn abort(&mut self) -> Result<()> { + let Some(file_id) = self.file_id.as_ref() else { + return Ok(()); + }; + self.core.delete_path(file_id).await } }