From a73469b497e460b91cfe74855671871d39e862b9 Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 6 Sep 2024 00:07:48 +0800 Subject: [PATCH 01/12] 1 --- core/src/services/lakefs/backend.rs | 11 +++++- core/src/services/lakefs/core.rs | 48 +++++++++++++++++++++++++- core/src/services/lakefs/mod.rs | 4 +++ core/src/services/lakefs/writer.rs | 53 +++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 core/src/services/lakefs/writer.rs diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index c5c2816e8e3d..ae6c2133e6fe 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -30,6 +30,7 @@ use super::core::LakefsStatus; use super::error::parse_error; use super::lister::LakefsLister; use crate::raw::*; +use crate::services::lakefs::writer::LakefsWriter; use crate::services::LakefsConfig; use crate::*; @@ -193,7 +194,7 @@ pub struct LakefsBackend { impl Access for LakefsBackend { type Reader = HttpBody; - type Writer = (); + type Writer = oio::OneShotWriter; type Lister = oio::PageLister; type BlockingReader = (); type BlockingWriter = (); @@ -206,6 +207,7 @@ impl Access for LakefsBackend { stat: true, list: true, read: true, + write: true, ..Default::default() }); @@ -276,4 +278,11 @@ impl Access for LakefsBackend { Ok((RpList::default(), oio::PageLister::new(l))) } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + Ok(( + RpWrite::default(), + oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)), + )) + } } diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 84cb1298cfad..10fa77783282 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -17,9 +17,10 @@ use std::fmt::Debug; -use http::header; +use http::header::{CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}; use http::Request; use http::Response; +use http::{header, HeaderName}; use serde::Deserialize; use crate::raw::*; @@ -143,6 +144,51 @@ impl LakefsCore { self.client.send(req).await } + + pub async fn upload_object( + &self, + path: &str, + args: &OpWrite, + body: Buffer, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/v1/repositories/{}/refs/{}/objects?path={}", + self.endpoint, + self.repository, + self.branch, + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + + let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; + req = req.header(header::AUTHORIZATION, auth_header_content); + + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } } #[derive(Deserialize, Eq, PartialEq, Debug)] diff --git a/core/src/services/lakefs/mod.rs b/core/src/services/lakefs/mod.rs index 2436a6ef6e9e..6762ebbfda40 100644 --- a/core/src/services/lakefs/mod.rs +++ b/core/src/services/lakefs/mod.rs @@ -23,10 +23,14 @@ mod error; #[cfg(feature = "services-lakefs")] mod lister; +#[cfg(feature = "services-lakefs")] +mod writer; + #[cfg(feature = "services-lakefs")] mod backend; #[cfg(feature = "services-lakefs")] pub use backend::LakefsBuilder as Lakefs; mod config; + pub use config::LakefsConfig; diff --git a/core/src/services/lakefs/writer.rs b/core/src/services/lakefs/writer.rs new file mode 100644 index 000000000000..9c08ba5e4f78 --- /dev/null +++ b/core/src/services/lakefs/writer.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use http::StatusCode; + +use crate::raw::*; +use crate::services::lakefs::core::LakefsCore; +use crate::*; + +use super::error::parse_error; + +pub struct LakefsWriter { + core: Arc, + op: OpWrite, + path: String, +} + +impl LakefsWriter { + pub fn new(core: Arc, path: String, op: OpWrite) -> Self { + LakefsWriter { core, path, op } + } +} + +impl oio::OneShotWrite for LakefsWriter { + async fn write_once(&self, bs: Buffer) -> Result<()> { + let req = self.core.upload_object(&self.path, &self.op, bs).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } +} From 9a5fc025bda7ff9cdf661b427a7dfba9a8e1b7c5 Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 6 Sep 2024 00:09:49 +0800 Subject: [PATCH 02/12] 1 --- core/src/services/lakefs/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index ae6c2133e6fe..ff80134738e6 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -30,7 +30,7 @@ use super::core::LakefsStatus; use super::error::parse_error; use super::lister::LakefsLister; use crate::raw::*; -use crate::services::lakefs::writer::LakefsWriter; +use super::writer::LakefsWriter; use crate::services::LakefsConfig; use crate::*; From d2120136c96608a208e73a85333d78310ce7162b Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 6 Sep 2024 00:11:23 +0800 Subject: [PATCH 03/12] 1 --- core/src/services/lakefs/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs index ff80134738e6..69e59e395ffa 100644 --- a/core/src/services/lakefs/backend.rs +++ b/core/src/services/lakefs/backend.rs @@ -29,8 +29,8 @@ use super::core::LakefsCore; use super::core::LakefsStatus; use super::error::parse_error; use super::lister::LakefsLister; -use crate::raw::*; use super::writer::LakefsWriter; +use crate::raw::*; use crate::services::LakefsConfig; use crate::*; From fa2acc4c8875f60ebf1651bb6722ec2efafcc6e1 Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 6 Sep 2024 23:27:03 +0800 Subject: [PATCH 04/12] 1 --- core/src/services/lakefs/core.rs | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 10fa77783282..b07525e983b8 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; - +use bytes::Buf; use http::header::{CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}; use http::Request; use http::Response; use http::{header, HeaderName}; use serde::Deserialize; +use std::fmt::Debug; use crate::raw::*; use crate::*; @@ -156,14 +156,14 @@ impl LakefsCore { .to_string(); let url = format!( - "{}/api/v1/repositories/{}/refs/{}/objects?path={}", + "{}/api/v1/repositories/{}/branches/{}/staging/backing?path={}&presign=true", self.endpoint, self.repository, self.branch, percent_encode_path(&p) ); - let mut req = Request::post(&url); + let mut req = Request::get(&url); let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; req = req.header(header::AUTHORIZATION, auth_header_content); @@ -179,8 +179,23 @@ impl LakefsCore { if let Some(cache_control) = args.cache_control() { req = req.header(CACHE_CONTROL, cache_control) } + req = req.header(CONTENT_LENGTH, body.len()); + + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; - let req = req.body(body).map_err(new_request_build_error)?; + let res = self.send(req).await?; + println!( + "{:?}", + String::from_utf8(res.clone().into_body().to_vec()).unwrap() + ); + + let res: PhysicalAddressForStagingArea = + serde_json::from_reader(res.clone().into_body().reader()) + .map_err(new_json_deserialize_error)?; + + let req = Request::put(&res.presigned_url) + .body(body) + .map_err(new_request_build_error)?; Ok(req) } @@ -215,3 +230,10 @@ pub(super) struct Pagination { pub next_offset: String, pub results: u64, } + +#[derive(Deserialize, Eq, PartialEq, Debug)] +pub(super) struct PhysicalAddressForStagingArea { + pub physical_address: String, + pub presigned_url: String, + pub presigned_url_expiry: i64, +} From 7be6398963cffaf3542245f8da2f2e38d7b439fd Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 6 Sep 2024 23:32:00 +0800 Subject: [PATCH 05/12] 1 --- core/src/services/lakefs/core.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index b07525e983b8..1964fdc282c5 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -168,19 +168,6 @@ impl LakefsCore { let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; req = req.header(header::AUTHORIZATION, auth_header_content); - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime) - } - - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos) - } - - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } - req = req.header(CONTENT_LENGTH, body.len()); - let req = req.body(Buffer::new()).map_err(new_request_build_error)?; let res = self.send(req).await?; @@ -193,8 +180,23 @@ impl LakefsCore { serde_json::from_reader(res.clone().into_body().reader()) .map_err(new_json_deserialize_error)?; - let req = Request::put(&res.presigned_url) - .body(body) + let mut req = Request::put(&res.presigned_url); + let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; + req = req.header(header::AUTHORIZATION, auth_header_content); + req = req.header(CONTENT_LENGTH, body.len()); + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control) + } + let mut req = Request::put(&res.presigned_url) + .body(body.clone()) .map_err(new_request_build_error)?; Ok(req) From 12c7f118d579381e5b58452db9e5d31f7aad8a8f Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 6 Sep 2024 23:33:16 +0800 Subject: [PATCH 06/12] 1 --- core/src/services/lakefs/core.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 1964fdc282c5..250deea35bfe 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -171,10 +171,6 @@ impl LakefsCore { let req = req.body(Buffer::new()).map_err(new_request_build_error)?; let res = self.send(req).await?; - println!( - "{:?}", - String::from_utf8(res.clone().into_body().to_vec()).unwrap() - ); let res: PhysicalAddressForStagingArea = serde_json::from_reader(res.clone().into_body().reader()) From cec7dacfe756bc6ef4a8e388df761bab150f073d Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 8 Sep 2024 14:29:11 +0800 Subject: [PATCH 07/12] 1 --- core/src/services/lakefs/core.rs | 32 +++----------------------------- 1 file changed, 3 insertions(+), 29 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 250deea35bfe..bd533ff3f5ff 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -156,45 +156,19 @@ impl LakefsCore { .to_string(); let url = format!( - "{}/api/v1/repositories/{}/branches/{}/staging/backing?path={}&presign=true", + "{}/api/v1/repositories/{}/branches/{}/objects?path={}", self.endpoint, self.repository, self.branch, percent_encode_path(&p) ); - let mut req = Request::get(&url); + let mut req = Request::post(&url); let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; req = req.header(header::AUTHORIZATION, auth_header_content); - let req = req.body(Buffer::new()).map_err(new_request_build_error)?; - - let res = self.send(req).await?; - - let res: PhysicalAddressForStagingArea = - serde_json::from_reader(res.clone().into_body().reader()) - .map_err(new_json_deserialize_error)?; - - let mut req = Request::put(&res.presigned_url); - let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; - req = req.header(header::AUTHORIZATION, auth_header_content); - req = req.header(CONTENT_LENGTH, body.len()); - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime) - } - - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos) - } - - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } - let mut req = Request::put(&res.presigned_url) - .body(body.clone()) - .map_err(new_request_build_error)?; - + let req = req.body(body).map_err(new_request_build_error)?; Ok(req) } From 80bc64f72056f43c65ec6d3051e638be168fccf9 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 8 Sep 2024 14:36:07 +0800 Subject: [PATCH 08/12] 1 --- core/src/services/lakefs/core.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index bd533ff3f5ff..e8ed75deb377 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; -use http::header::{CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}; +use std::fmt::Debug; + +use http::header; use http::Request; use http::Response; -use http::{header, HeaderName}; use serde::Deserialize; -use std::fmt::Debug; use crate::raw::*; use crate::*; From 5fdc9264c2315d56260c728359773195051011ba Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 8 Sep 2024 14:36:39 +0800 Subject: [PATCH 09/12] 1 --- core/src/services/lakefs/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index e8ed75deb377..5538f4144934 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -147,7 +147,7 @@ impl LakefsCore { pub async fn upload_object( &self, path: &str, - args: &OpWrite, + _args: &OpWrite, body: Buffer, ) -> Result> { let p = build_abs_path(&self.root, path) From 0be28727af19f25c5b002af0fd6efb9bcb31242f Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 8 Sep 2024 14:37:36 +0800 Subject: [PATCH 10/12] 1 --- core/src/services/lakefs/core.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 5538f4144934..2ae060c27c9f 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -201,10 +201,3 @@ pub(super) struct Pagination { pub next_offset: String, pub results: u64, } - -#[derive(Deserialize, Eq, PartialEq, Debug)] -pub(super) struct PhysicalAddressForStagingArea { - pub physical_address: String, - pub presigned_url: String, - pub presigned_url_expiry: i64, -} From f2930f8d381881b15ac2ff94ffe6b1b575ca1687 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 8 Sep 2024 14:38:22 +0800 Subject: [PATCH 11/12] 1 --- core/src/services/lakefs/docs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/lakefs/docs.md b/core/src/services/lakefs/docs.md index a4589bccf1bb..892bf66b176d 100644 --- a/core/src/services/lakefs/docs.md +++ b/core/src/services/lakefs/docs.md @@ -9,7 +9,7 @@ This service can be used to: - [x] stat - [x] read -- [ ] write +- [x] write - [ ] create_dir - [ ] delete - [ ] copy From dceb61e70d5d92edd2319a4fe0aa186acc347f66 Mon Sep 17 00:00:00 2001 From: liugddx Date: Mon, 9 Sep 2024 08:12:49 +0800 Subject: [PATCH 12/12] 1 --- core/src/services/lakefs/core.rs | 6 +----- core/src/services/lakefs/writer.rs | 4 +--- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs index 2ae060c27c9f..91aed0e34937 100644 --- a/core/src/services/lakefs/core.rs +++ b/core/src/services/lakefs/core.rs @@ -149,7 +149,7 @@ impl LakefsCore { path: &str, _args: &OpWrite, body: Buffer, - ) -> Result> { + ) -> Result> { let p = build_abs_path(&self.root, path) .trim_end_matches('/') .to_string(); @@ -168,11 +168,7 @@ impl LakefsCore { req = req.header(header::AUTHORIZATION, auth_header_content); let req = req.body(body).map_err(new_request_build_error)?; - Ok(req) - } - #[inline] - pub async fn send(&self, req: Request) -> Result> { self.client.send(req).await } } diff --git a/core/src/services/lakefs/writer.rs b/core/src/services/lakefs/writer.rs index 9c08ba5e4f78..299cf8bd422b 100644 --- a/core/src/services/lakefs/writer.rs +++ b/core/src/services/lakefs/writer.rs @@ -39,9 +39,7 @@ impl LakefsWriter { impl oio::OneShotWrite for LakefsWriter { async fn write_once(&self, bs: Buffer) -> Result<()> { - let req = self.core.upload_object(&self.path, &self.op, bs).await?; - - let resp = self.core.send(req).await?; + let resp = self.core.upload_object(&self.path, &self.op, bs).await?; let status = resp.status();