From 64f86f48babcb6fffc79b46b278f746a7f1ccde5 Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 3 Sep 2024 22:50:50 +0800 Subject: [PATCH 1/6] 1 --- core/src/services/lakefs/backend.rs | 15 +++- core/src/services/lakefs/core.rs | 41 +++++++++++ core/src/services/lakefs/docs.md | 2 +- core/src/services/lakefs/lister.rs | 107 ++++++++++++++++++++++++++++ core/src/services/lakefs/mod.rs | 3 + 5 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 core/src/services/lakefs/lister.rs diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index 3197ccc08411..0e0705bb5cee 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -29,6 +29,7 @@ use super::core::LakefsCore; use super::core::LakefsStatus; use super::error::parse_error; use crate::raw::*; +use crate::services::lakefs::lister::LakefsLister; use crate::services::LakefsConfig; use crate::*; @@ -193,7 +194,7 @@ pub struct LakefsBackend { impl Access for LakefsBackend { type Reader = HttpBody; type Writer = (); - type Lister = (); + type Lister = oio::PageLister; type BlockingReader = (); type BlockingWriter = (); type BlockingLister = (); @@ -262,4 +263,16 @@ impl Access for LakefsBackend { } } } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let l = LakefsLister::new( + self.core.clone(), + path.to_string(), + args.limit(), + args.start_after(), + args.recursive(), + ); + + Ok((RpList::default(), oio::PageLister::new(l))) + } } diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index c275bbc17fad..805a372253df 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -103,6 +103,47 @@ impl LakefsCore { self.client.fetch(req).await } + + pub async fn list_objects( + &self, + path: &str, + delimiter: &str, + amount: &Option, + after: &Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/api/v1/repositories/{}/refs/{}/objects/ls?", + self.endpoint, self.repository, self.branch + ); + + if !p.is_empty() { + write!(url, "&prefix={}", percent_encode_path(&p)) + .expect("write into string must succeed"); + } + + if !delimiter.is_empty() { + write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); + } + + if let Some(amount) = amount { + write!(url, "&amount={amount}").expect("write into string must succeed"); + } + + if let Some(after) = after { + write!(url, "&after={after}").expect("write into string must succeed"); + } + + let mut req = Request::get(&url); + + let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + + self.client.send(req).await + } } #[derive(Deserialize, Eq, PartialEq, Debug)] diff --git a/core/src/services/lakefs/docs.md b/core/src/services/lakefs/docs.md index 2f71184512d5..a4589bccf1bb 100644 --- a/core/src/services/lakefs/docs.md +++ b/core/src/services/lakefs/docs.md @@ -14,7 +14,7 @@ This service can be used to: - [ ] delete - [ ] copy - [ ] rename -- [ ] list +- [x] list - [ ] ~~presign~~ - [ ] blocking diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs new file mode 100644 index 000000000000..3cd916ccfb7c --- /dev/null +++ b/core/src/services/lakefs/lister.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use bytes::Buf; +use chrono::{TimeZone, Utc}; + +use crate::raw::*; +use crate::*; + +use super::core::LakefsCore; +use super::core::LakefsStatus; +use super::error::parse_error; + +pub struct LakefsLister { + core: Arc, + path: String, + delimiter: &'static str, + amount: Option, + after: Option<&'static str>, +} + +impl LakefsLister { + pub fn new( + core: Arc, + path: String, + amount: Option, + after: Option<&str>, + recursive: bool, + ) -> Self { + let delimiter = if recursive { "" } else { "/" }; + Self { + core, + path, + delimiter, + amount, + after, + } + } +} + +impl oio::PageList for LakefsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let response = self + .core + .list_objects(&self.path, &self.delimiter, &self.amount, &self.after) + .await?; + + let status_code = response.status(); + if !status_code.is_success() { + let error = parse_error(response).await?; + return Err(error); + } + + let bytes = response.into_body(); + let decoded_response: Vec = + serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?; + + ctx.done = true; + + for status in decoded_response { + let entry_type = match status.path_type.as_str() { + "common_prefix" => EntryMode::DIR, + "object" => EntryMode::FILE, + _ => EntryMode::Unknown, + }; + + let mut meta = Metadata::new(entry_type); + + if status.mtime != 0 { + meta.set_last_modified(Utc.timestamp_opt(status.mtime, 0).unwrap()); + } + + if entry_type == EntryMode::FILE { + meta.set_content_length(status.size_bytes); + } + + let path = if entry_type == EntryMode::DIR { + format!("{}/", &status.path) + } else { + status.path.clone() + }; + + ctx.entries.push_back(oio::Entry::new( + &build_rel_path(&self.core.root, &path), + meta, + )); + } + + Ok(()) + } +} diff --git a/core/src/services/lakefs/mod.rs b/core/src/services/lakefs/mod.rs index 5f5675039925..2436a6ef6e9e 100644 --- a/core/src/services/lakefs/mod.rs +++ b/core/src/services/lakefs/mod.rs @@ -20,6 +20,9 @@ mod core; #[cfg(feature = "services-lakefs")] mod error; +#[cfg(feature = "services-lakefs")] +mod lister; + #[cfg(feature = "services-lakefs")] mod backend; #[cfg(feature = "services-lakefs")] From 450cabdf84b43fa0ef27e7518049f3bd9102d806 Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 3 Sep 2024 23:32:00 +0800 Subject: [PATCH 2/6] 1 --- core/src/services/lakefs/core.rs | 11 +++++------ core/src/services/lakefs/lister.rs | 16 +++++++++++++--- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 805a372253df..5a476aceb142 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -109,7 +109,7 @@ impl LakefsCore { path: &str, delimiter: &str, amount: &Option, - after: &Option<&str>, + after: Option, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -119,20 +119,19 @@ impl LakefsCore { ); if !p.is_empty() { - write!(url, "&prefix={}", percent_encode_path(&p)) - .expect("write into string must succeed"); + url.push_str(&format!("&prefix={}", percent_encode_path(&p))); } if !delimiter.is_empty() { - write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); + url.push_str(&format!("&delimiter={}", delimiter)); } if let Some(amount) = amount { - write!(url, "&amount={amount}").expect("write into string must succeed"); + url.push_str(&format!("&amount={}", amount)); } if let Some(after) = after { - write!(url, "&after={after}").expect("write into string must succeed"); + url.push_str(&format!("&after={}", after)); } let mut req = Request::get(&url); diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs index 3cd916ccfb7c..65391f16d93f 100644 --- a/core/src/services/lakefs/lister.rs +++ b/core/src/services/lakefs/lister.rs @@ -32,7 +32,7 @@ pub struct LakefsLister { path: String, delimiter: &'static str, amount: Option, - after: Option<&'static str>, + after: Option, } impl LakefsLister { @@ -49,7 +49,7 @@ impl LakefsLister { path, delimiter, amount, - after, + after: after.map(String::from), } } } @@ -58,7 +58,17 @@ impl oio::PageList for LakefsLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let response = self .core - .list_objects(&self.path, &self.delimiter, &self.amount, &self.after) + .list_objects( + &self.path, + &self.delimiter, + &self.amount, + // start after should only be set for the first page. + if ctx.token.is_empty() { + self.after.clone() + } else { + None + }, + ) .await?; let status_code = response.status(); From 90fd601a228534d29789a4aefcdf2702857a5adc Mon Sep 17 00:00:00 2001 From: liugddx Date: Wed, 4 Sep 2024 08:02:36 +0800 Subject: [PATCH 3/6] 1 --- core/src/services/lakefs/lister.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs index 65391f16d93f..1c2484765b37 100644 --- a/core/src/services/lakefs/lister.rs +++ b/core/src/services/lakefs/lister.rs @@ -60,7 +60,7 @@ impl oio::PageList for LakefsLister { .core .list_objects( &self.path, - &self.delimiter, + self.delimiter, &self.amount, // start after should only be set for the first page. if ctx.token.is_empty() { From fb44334492e4849e6ee9e4a7c30c927b7e0c78b2 Mon Sep 17 00:00:00 2001 From: liugddx Date: Wed, 4 Sep 2024 23:40:32 +0800 Subject: [PATCH 4/6] 1 --- core/src/services/lakefs/backend.rs | 4 ++-- core/src/services/lakefs/core.rs | 20 ++++++++++++++++++-- core/src/services/lakefs/lister.rs | 9 +++++---- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index 0e0705bb5cee..b4472d25b6b8 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -204,7 +204,7 @@ impl Access for LakefsBackend { am.set_scheme(Scheme::Lakefs) .set_native_capability(Capability { stat: true, - + list: true, read: true, ..Default::default() @@ -230,7 +230,7 @@ impl Access for LakefsBackend { let decoded_response: LakefsStatus = serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; - meta.set_content_length(decoded_response.size_bytes); + meta.set_content_length(decoded_response.size_bytes.unwrap()); meta.set_mode(EntryMode::FILE); if let Some(v) = parse_content_disposition(resp.headers())? { meta.set_content_disposition(v); diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 5a476aceb142..2e48a52ab7d1 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -152,7 +152,23 @@ pub(super) struct LakefsStatus { pub path_type: String, pub physical_address: String, pub checksum: String, - pub size_bytes: u64, + pub size_bytes: Option, pub mtime: i64, - pub content_type: String, + pub content_type: Option, +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +#[allow(dead_code)] +pub(super) struct LakefsListResponse { + pub pagination: Pagination, + pub results: Vec, +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +#[allow(dead_code)] +struct Pagination { + pub has_more: bool, + pub max_per_page: u64, + pub next_offset: String, + pub results: u64, } diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs index 1c2484765b37..9478db685965 100644 --- a/core/src/services/lakefs/lister.rs +++ b/core/src/services/lakefs/lister.rs @@ -23,8 +23,8 @@ use chrono::{TimeZone, Utc}; use crate::raw::*; use crate::*; -use super::core::LakefsCore; use super::core::LakefsStatus; +use super::core::{LakefsCore, LakefsListResponse}; use super::error::parse_error; pub struct LakefsLister { @@ -78,12 +78,13 @@ impl oio::PageList for LakefsLister { } let bytes = response.into_body(); - let decoded_response: Vec = + + let decoded_response: LakefsListResponse = serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?; ctx.done = true; - for status in decoded_response { + for status in decoded_response.results { let entry_type = match status.path_type.as_str() { "common_prefix" => EntryMode::DIR, "object" => EntryMode::FILE, @@ -97,7 +98,7 @@ impl oio::PageList for LakefsLister { } if entry_type == EntryMode::FILE { - meta.set_content_length(status.size_bytes); + meta.set_content_length(status.size_bytes.unwrap()); } let path = if entry_type == EntryMode::DIR { From bd6023734f0afb266ceaaa9965f89b0ef215a093 Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 5 Sep 2024 07:49:34 +0800 Subject: [PATCH 5/6] 1 --- core/src/services/lakefs/core.rs | 2 +- core/src/services/lakefs/lister.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 2e48a52ab7d1..c50201c0cf3b 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -166,7 +166,7 @@ pub(super) struct LakefsListResponse { #[derive(Deserialize, Eq, PartialEq, Debug)] #[allow(dead_code)] -struct Pagination { +pub(super) struct Pagination { pub has_more: bool, pub max_per_page: u64, pub next_offset: String, diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs index 9478db685965..2c515906ad0b 100644 --- a/core/src/services/lakefs/lister.rs +++ b/core/src/services/lakefs/lister.rs @@ -23,7 +23,6 @@ use chrono::{TimeZone, Utc}; use crate::raw::*; use crate::*; -use super::core::LakefsStatus; use super::core::{LakefsCore, LakefsListResponse}; use super::error::parse_error; From eac9cd72563fde644d568307c219cb0ed9d3acaa Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 5 Sep 2024 13:36:19 +0800 Subject: [PATCH 6/6] 1 --- core/src/services/lakefs/backend.rs | 7 ++++--- core/src/services/lakefs/core.rs | 3 --- core/src/services/lakefs/lister.rs | 4 +++- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index b4472d25b6b8..c5c2816e8e3d 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -28,8 +28,8 @@ use log::debug; use super::core::LakefsCore; use super::core::LakefsStatus; use super::error::parse_error; +use super::lister::LakefsLister; use crate::raw::*; -use crate::services::lakefs::lister::LakefsLister; use crate::services::LakefsConfig; use crate::*; @@ -229,8 +229,9 @@ impl Access for LakefsBackend { let decoded_response: LakefsStatus = serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; - - meta.set_content_length(decoded_response.size_bytes.unwrap()); + if let Some(size_bytes) = decoded_response.size_bytes { + meta.set_content_length(size_bytes); + } meta.set_mode(EntryMode::FILE); if let Some(v) = parse_content_disposition(resp.headers())? { meta.set_content_disposition(v); diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index c50201c0cf3b..84cb1298cfad 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -146,7 +146,6 @@ impl LakefsCore { } #[derive(Deserialize, Eq, PartialEq, Debug)] -#[allow(dead_code)] pub(super) struct LakefsStatus { pub path: String, pub path_type: String, @@ -158,14 +157,12 @@ pub(super) struct LakefsStatus { } #[derive(Deserialize, Eq, PartialEq, Debug)] -#[allow(dead_code)] pub(super) struct LakefsListResponse { pub pagination: Pagination, pub results: Vec, } #[derive(Deserialize, Eq, PartialEq, Debug)] -#[allow(dead_code)] pub(super) struct Pagination { pub has_more: bool, pub max_per_page: u64, diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs index 2c515906ad0b..fcef5bc28a36 100644 --- a/core/src/services/lakefs/lister.rs +++ b/core/src/services/lakefs/lister.rs @@ -97,7 +97,9 @@ impl oio::PageList for LakefsLister { } if entry_type == EntryMode::FILE { - meta.set_content_length(status.size_bytes.unwrap()); + if let Some(size_bytes) = status.size_bytes { + meta.set_content_length(size_bytes); + } } let path = if entry_type == EntryMode::DIR {