diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index 3197ccc08411..c5c2816e8e3d 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -28,6 +28,7 @@ use log::debug; use super::core::LakefsCore; use super::core::LakefsStatus; use super::error::parse_error; +use super::lister::LakefsLister; use crate::raw::*; use crate::services::LakefsConfig; use crate::*; @@ -193,7 +194,7 @@ pub struct LakefsBackend { impl Access for LakefsBackend { type Reader = HttpBody; type Writer = (); - type Lister = (); + type Lister = oio::PageLister; type BlockingReader = (); type BlockingWriter = (); type BlockingLister = (); @@ -203,7 +204,7 @@ impl Access for LakefsBackend { am.set_scheme(Scheme::Lakefs) .set_native_capability(Capability { stat: true, - + list: true, read: true, ..Default::default() @@ -228,8 +229,9 @@ impl Access for LakefsBackend { let decoded_response: LakefsStatus = serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; - - meta.set_content_length(decoded_response.size_bytes); + if let Some(size_bytes) = decoded_response.size_bytes { + meta.set_content_length(size_bytes); + } meta.set_mode(EntryMode::FILE); if let Some(v) = parse_content_disposition(resp.headers())? { meta.set_content_disposition(v); @@ -262,4 +264,16 @@ impl Access for LakefsBackend { } } } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let l = LakefsLister::new( + self.core.clone(), + path.to_string(), + args.limit(), + args.start_after(), + args.recursive(), + ); + + Ok((RpList::default(), oio::PageLister::new(l))) + } } diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index c275bbc17fad..84cb1298cfad 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -103,16 +103,69 @@ impl LakefsCore { self.client.fetch(req).await } + + pub async fn list_objects( + &self, + path: &str, + delimiter: &str, + amount: &Option, + after: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/api/v1/repositories/{}/refs/{}/objects/ls?", + self.endpoint, self.repository, self.branch + ); + + if !p.is_empty() { + url.push_str(&format!("&prefix={}", percent_encode_path(&p))); + } + + if !delimiter.is_empty() { + url.push_str(&format!("&delimiter={}", delimiter)); + } + + if let Some(amount) = amount { + url.push_str(&format!("&amount={}", amount)); + } + + if let Some(after) = after { + url.push_str(&format!("&after={}", after)); + } + + let mut req = Request::get(&url); + + let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + + self.client.send(req).await + } } #[derive(Deserialize, Eq, PartialEq, Debug)] -#[allow(dead_code)] pub(super) struct LakefsStatus { pub path: String, pub path_type: String, pub physical_address: String, pub checksum: String, - pub size_bytes: u64, + pub size_bytes: Option, pub mtime: i64, - pub content_type: String, + pub content_type: Option, +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +pub(super) struct LakefsListResponse { + pub pagination: Pagination, + pub results: Vec, +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +pub(super) struct Pagination { + pub has_more: bool, + pub max_per_page: u64, + pub next_offset: String, + pub results: u64, } diff --git a/core/src/services/lakefs/docs.md b/core/src/services/lakefs/docs.md index 2f71184512d5..a4589bccf1bb 100644 --- a/core/src/services/lakefs/docs.md +++ b/core/src/services/lakefs/docs.md @@ -14,7 +14,7 @@ This service can be used to: - [ ] delete - [ ] copy - [ ] rename -- [ ] list +- [x] list - [ ] ~~presign~~ - [ ] blocking diff --git a/core/src/services/lakefs/lister.rs b/core/src/services/lakefs/lister.rs new file mode 100644 index 000000000000..fcef5bc28a36 --- /dev/null +++ b/core/src/services/lakefs/lister.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use bytes::Buf; +use chrono::{TimeZone, Utc}; + +use crate::raw::*; +use crate::*; + +use super::core::{LakefsCore, LakefsListResponse}; +use super::error::parse_error; + +pub struct LakefsLister { + core: Arc, + path: String, + delimiter: &'static str, + amount: Option, + after: Option, +} + +impl LakefsLister { + pub fn new( + core: Arc, + path: String, + amount: Option, + after: Option<&str>, + recursive: bool, + ) -> Self { + let delimiter = if recursive { "" } else { "/" }; + Self { + core, + path, + delimiter, + amount, + after: after.map(String::from), + } + } +} + +impl oio::PageList for LakefsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let response = self + .core + .list_objects( + &self.path, + self.delimiter, + &self.amount, + // start after should only be set for the first page. + if ctx.token.is_empty() { + self.after.clone() + } else { + None + }, + ) + .await?; + + let status_code = response.status(); + if !status_code.is_success() { + let error = parse_error(response).await?; + return Err(error); + } + + let bytes = response.into_body(); + + let decoded_response: LakefsListResponse = + serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?; + + ctx.done = true; + + for status in decoded_response.results { + let entry_type = match status.path_type.as_str() { + "common_prefix" => EntryMode::DIR, + "object" => EntryMode::FILE, + _ => EntryMode::Unknown, + }; + + let mut meta = Metadata::new(entry_type); + + if status.mtime != 0 { + meta.set_last_modified(Utc.timestamp_opt(status.mtime, 0).unwrap()); + } + + if entry_type == EntryMode::FILE { + if let Some(size_bytes) = status.size_bytes { + meta.set_content_length(size_bytes); + } + } + + let path = if entry_type == EntryMode::DIR { + format!("{}/", &status.path) + } else { + status.path.clone() + }; + + ctx.entries.push_back(oio::Entry::new( + &build_rel_path(&self.core.root, &path), + meta, + )); + } + + Ok(()) + } +} diff --git a/core/src/services/lakefs/mod.rs b/core/src/services/lakefs/mod.rs index 5f5675039925..2436a6ef6e9e 100644 --- a/core/src/services/lakefs/mod.rs +++ b/core/src/services/lakefs/mod.rs @@ -20,6 +20,9 @@ mod core; #[cfg(feature = "services-lakefs")] mod error; +#[cfg(feature = "services-lakefs")] +mod lister; + #[cfg(feature = "services-lakefs")] mod backend; #[cfg(feature = "services-lakefs")]