Skip to content

Commit

Permalink
feat: add list/copy/rename for dropbox (#4424)
Browse files Browse the repository at this point in the history
* feat: add list/copy/rename for dropbox

* fix clippy

* typo comments

* migrate to Buffer
  • Loading branch information
zjregee authored Apr 15, 2024
1 parent 962d810 commit a24026d
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 5 deletions.
56 changes: 55 additions & 1 deletion core/src/services/dropbox/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use http::StatusCode;

use super::core::*;
use super::error::*;
use super::lister::DropboxLister;
use super::reader::DropboxReader;
use super::writer::DropboxWriter;
use crate::raw::*;
Expand All @@ -39,7 +40,7 @@ pub struct DropboxBackend {
impl Accessor for DropboxBackend {
type Reader = DropboxReader;
type Writer = oio::OneShotWriter<DropboxWriter>;
type Lister = ();
type Lister = oio::PageLister<DropboxLister>;
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
Expand All @@ -59,6 +60,13 @@ impl Accessor for DropboxBackend {

delete: true,

list: true,
list_with_recursive: true,

copy: true,

rename: true,

batch: true,
batch_delete: true,

Expand Down Expand Up @@ -171,6 +179,52 @@ impl Accessor for DropboxBackend {
}
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Ok((
RpList::default(),
oio::PageLister::new(DropboxLister::new(
self.core.clone(),
path.to_string(),
args.recursive(),
args.limit(),
)),
))
}

async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
let resp = self.core.dropbox_copy(from, to).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(RpCopy::default()),
_ => {
let err = parse_error(resp).await?;
match err.kind() {
ErrorKind::NotFound => Ok(RpCopy::default()),
_ => Err(err),
}
}
}
}

async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
let resp = self.core.dropbox_move(from, to).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(RpRename::default()),
_ => {
let err = parse_error(resp).await?;
match err.kind() {
ErrorKind::NotFound => Ok(RpRename::default()),
_ => Err(err),
}
}
}
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let ops = args.into_operation();
if ops.len() > 1000 {
Expand Down
119 changes: 119 additions & 0 deletions core/src/services/dropbox/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,93 @@ impl DropboxCore {
}
}

pub async fn dropbox_list(
&self,
path: &str,
recursive: bool,
limit: Option<usize>,
) -> Result<Response<Buffer>> {
let url = "https://api.dropboxapi.com/2/files/list_folder".to_string();

// The default settings here align with the DropboxAPI default settings.
// Refer: https://www.dropbox.com/developers/documentation/http/documentation#files-list_folder
let args = DropboxListArgs {
path: self.build_path(path),
recursive,
limit: limit.unwrap_or(1000),
};

let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);

let mut request = Request::post(&url)
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, bs.len())
.body(Buffer::from(bs))
.map_err(new_request_build_error)?;

self.sign(&mut request).await?;
self.client.send(request).await
}

pub async fn dropbox_list_continue(&self, cursor: &str) -> Result<Response<Buffer>> {
let url = "https://api.dropboxapi.com/2/files/list_folder/continue".to_string();

let args = DropboxListContinueArgs {
cursor: cursor.to_string(),
};

let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);

let mut request = Request::post(&url)
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, bs.len())
.body(Buffer::from(bs))
.map_err(new_request_build_error)?;

self.sign(&mut request).await?;
self.client.send(request).await
}

pub async fn dropbox_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
let url = "https://api.dropboxapi.com/2/files/copy_v2".to_string();

let args = DropboxCopyArgs {
from_path: self.build_path(from),
to_path: self.build_path(to),
};

let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);

let mut request = Request::post(&url)
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, bs.len())
.body(Buffer::from(bs))
.map_err(new_request_build_error)?;

self.sign(&mut request).await?;
self.client.send(request).await
}

pub async fn dropbox_move(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
let url = "https://api.dropboxapi.com/2/files/move_v2".to_string();

let args = DropboxMoveArgs {
from_path: self.build_path(from),
to_path: self.build_path(to),
};

let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);

let mut request = Request::post(&url)
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, bs.len())
.body(Buffer::from(bs))
.map_err(new_request_build_error)?;

self.sign(&mut request).await?;
self.client.send(request).await
}

pub async fn dropbox_get_metadata(&self, path: &str) -> Result<Response<Buffer>> {
let url = "https://api.dropboxapi.com/2/files/get_metadata".to_string();
let args = DropboxMetadataArgs {
Expand Down Expand Up @@ -440,6 +527,30 @@ struct DropboxCreateFolderArgs {
path: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxListArgs {
path: String,
recursive: bool,
limit: usize,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxListContinueArgs {
cursor: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxCopyArgs {
from_path: String,
to_path: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxMoveArgs {
from_path: String,
to_path: String,
}

#[derive(Default, Clone, Debug, Deserialize, Serialize)]
struct DropboxMetadataArgs {
include_deleted: bool,
Expand Down Expand Up @@ -508,6 +619,14 @@ pub struct DropboxMetadataSharingInfo {
pub no_access: Option<bool>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxListResponse {
pub entries: Vec<DropboxMetadataResponse>,
pub cursor: String,
pub has_more: bool,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxDeleteBatchResponse {
Expand Down
6 changes: 3 additions & 3 deletions core/src/services/dropbox/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ This service can be used to:
- [x] write
- [x] create_dir
- [x] delete
- [ ] copy
- [ ] rename
- [ ] list
- [x] copy
- [x] rename
- [x] list
- [x] batch
- [ ] blocking

Expand Down
5 changes: 4 additions & 1 deletion core/src/services/dropbox/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ pub async fn parse_error(resp: Response<Buffer>) -> Result<Error> {
///
/// See <https://www.dropbox.com/developers/documentation/http/documentation#error-handling>
pub fn parse_dropbox_error_summary(summary: &str) -> Option<(ErrorKind, bool)> {
if summary.starts_with("path/not_found") || summary.starts_with("path_lookup/not_found") {
if summary.starts_with("path/not_found")
|| summary.starts_with("path_lookup/not_found")
|| summary.starts_with("from_lookup/not_found")
{
Some((ErrorKind::NotFound, false))
} else if summary.starts_with("path/conflict") {
Some((ErrorKind::AlreadyExists, false))
Expand Down
117 changes: 117 additions & 0 deletions core/src/services/dropbox/lister.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use bytes::Buf;

use super::core::*;
use super::error::parse_error;
use crate::raw::*;
use crate::*;

pub struct DropboxLister {
core: Arc<DropboxCore>,
path: String,
recursive: bool,
limit: Option<usize>,
}

impl DropboxLister {
pub fn new(
core: Arc<DropboxCore>,
path: String,
recursive: bool,
limit: Option<usize>,
) -> Self {
Self {
core,
path,
recursive,
limit,
}
}
}

impl oio::PageList for DropboxLister {
async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
// The token is set when obtaining entries and returning `has_more` flag.
// When the token exists, we should retrieve more entries using the Dropbox continue API.
// Refer: https://www.dropbox.com/developers/documentation/http/documentation#files-list_folder-continue
let response = if !ctx.token.is_empty() {
self.core.dropbox_list_continue(&ctx.token).await?
} else {
self.core
.dropbox_list(&self.path, self.recursive, self.limit)
.await?
};

let status_code = response.status();

if !status_code.is_success() {
let error = parse_error(response).await?;

let result = match error.kind() {
ErrorKind::NotFound => Ok(()),
_ => Err(error),
};

ctx.done = true;
return result;
}

let bytes = response.into_body();
let decoded_response: DropboxListResponse =
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;

for entry in decoded_response.entries {
let entry_mode = match entry.tag.as_str() {
"file" => EntryMode::FILE,
"folder" => EntryMode::DIR,
_ => EntryMode::Unknown,
};

let mut name = entry.name;
let mut meta = Metadata::new(entry_mode);

// Dropbox will return folder names that do not end with '/'.
if entry_mode == EntryMode::DIR && !name.ends_with('/') {
name.push('/');
}

// The behavior here aligns with Dropbox's stat function.
if entry_mode == EntryMode::FILE {
let date_utc_last_modified = parse_datetime_from_rfc3339(&entry.client_modified)?;
meta.set_last_modified(date_utc_last_modified);

if let Some(size) = entry.size {
meta.set_content_length(size);
}
}

ctx.entries.push_back(oio::Entry::with(name, meta));
}

if decoded_response.has_more {
ctx.token = decoded_response.cursor;
ctx.done = false;
} else {
ctx.done = true;
}
Ok(())
}
}
1 change: 1 addition & 0 deletions core/src/services/dropbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod backend;
mod builder;
mod core;
mod error;
mod lister;
mod reader;
mod writer;

Expand Down

0 comments on commit a24026d

Please sign in to comment.