Skip to content

Commit

Permalink
feat(core): Add object versioning for OSS (#4870)
Browse files Browse the repository at this point in the history
* feat: add object versioning support for oss

Signed-off-by: Lzzzt <[email protected]>

* feat: add object versioning support for oss

Signed-off-by: Lzzzt <[email protected]>

* feat: add object versioning support for oss

Signed-off-by: Lzzzt <[email protected]>

* feat: add object versioning support for oss

Signed-off-by: Lzzzt <[email protected]>

---------

Signed-off-by: Lzzzt <[email protected]>
  • Loading branch information
Lzzzzzt authored Jul 8, 2024
1 parent dd62200 commit e6ccc8f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 42 deletions.
32 changes: 16 additions & 16 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,21 +440,27 @@ impl Access for OssBackend {
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let resp = self
.core
.oss_head_object(path, args.if_match(), args.if_none_match())
.await?;
let resp = self.core.oss_head_object(path, &args).await?;

let status = resp.status();

match status {
StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new),
StatusCode::OK => {
let headers = resp.headers();
let mut meta = parse_into_metadata(path, headers)?;

if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? {
meta.set_version(v);
}

Ok(RpStat::new(meta))
}
_ => Err(parse_error(resp).await?),
}
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.oss_get_object(path, args.range(), &args).await?;
let resp = self.core.oss_get_object(path, &args).await?;

let status = resp.status();

Expand Down Expand Up @@ -486,8 +492,8 @@ impl Access for OssBackend {
Ok((RpWrite::default(), w))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let resp = self.core.oss_delete_object(path).await?;
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let resp = self.core.oss_delete_object(path, &args).await?;
let status = resp.status();
match status {
StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(RpDelete::default()),
Expand Down Expand Up @@ -519,14 +525,8 @@ impl Access for OssBackend {
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
// We will not send this request out, just for signing.
let mut req = match args.operation() {
PresignOperation::Stat(v) => {
self.core
.oss_head_object_request(path, true, v.if_match(), v.if_none_match())?
}
PresignOperation::Read(v) => {
self.core
.oss_get_object_request(path, BytesRange::default(), true, v)?
}
PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v)?,
PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v)?,
PresignOperation::Write(v) => {
self.core
.oss_put_object_request(path, None, v, Buffer::new(), true)?
Expand Down
75 changes: 51 additions & 24 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ mod constants {
pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = "x-oss-server-side-encryption-key-id";

pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";

pub const OSS_QUERY_VERSION_ID: &str = "versionId";
}

pub struct OssCore {
Expand Down Expand Up @@ -236,12 +238,12 @@ impl OssCore {
pub fn oss_get_object_request(
&self,
path: &str,
range: BytesRange,
is_presign: bool,
args: &OpRead,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
let range = args.range();
let mut url = format!("{}/{}", endpoint, percent_encode_path(&p));

// Add query arguments to the URL based on response overrides
Expand All @@ -253,6 +255,13 @@ impl OssCore {
percent_encode_path(override_content_disposition)
))
}
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::OSS_QUERY_VERSION_ID,
percent_encode_path(version)
))
}

if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
Expand Down Expand Up @@ -280,10 +289,25 @@ impl OssCore {
Ok(req)
}

fn oss_delete_object_request(&self, path: &str) -> Result<Request<Buffer>> {
fn oss_delete_object_request(&self, path: &str, args: &OpDelete) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(false);
let url = format!("{}/{}", endpoint, percent_encode_path(&p));
let mut url = format!("{}/{}", endpoint, percent_encode_path(&p));

let mut query_args = Vec::new();

if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::OSS_QUERY_VERSION_ID,
percent_encode_path(version)
))
}

if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let req = Request::delete(&url);

let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
Expand All @@ -295,18 +319,31 @@ impl OssCore {
&self,
path: &str,
is_presign: bool,
if_match: Option<&str>,
if_none_match: Option<&str>,
args: &OpStat,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
let url = format!("{}/{}", endpoint, percent_encode_path(&p));
let mut url = format!("{}/{}", endpoint, percent_encode_path(&p));

let mut query_args = Vec::new();

if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::OSS_QUERY_VERSION_ID,
percent_encode_path(version)
))
}

if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::head(&url);
if let Some(if_match) = if_match {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match)
}
if let Some(if_none_match) = if_none_match {
if let Some(if_none_match) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, if_none_match);
}
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
Expand Down Expand Up @@ -358,24 +395,14 @@ impl OssCore {
Ok(req)
}

pub async fn oss_get_object(
&self,
path: &str,
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
let mut req = self.oss_get_object_request(path, range, false, args)?;
pub async fn oss_get_object(&self, path: &str, args: &OpRead) -> Result<Response<HttpBody>> {
let mut req = self.oss_get_object_request(path, false, args)?;
self.sign(&mut req).await?;
self.client.fetch(req).await
}

pub async fn oss_head_object(
&self,
path: &str,
if_match: Option<&str>,
if_none_match: Option<&str>,
) -> Result<Response<Buffer>> {
let mut req = self.oss_head_object_request(path, false, if_match, if_none_match)?;
pub async fn oss_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
let mut req = self.oss_head_object_request(path, false, args)?;

self.sign(&mut req).await?;
self.send(req).await
Expand Down Expand Up @@ -431,8 +458,8 @@ impl OssCore {
self.send(req).await
}

pub async fn oss_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
let mut req = self.oss_delete_object_request(path)?;
pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
let mut req = self.oss_delete_object_request(path, args)?;
self.sign(&mut req).await?;
self.send(req).await
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ impl oio::MultipartWrite for OssWriter {

impl oio::AppendWrite for OssWriter {
async fn offset(&self) -> Result<u64> {
let resp = self.core.oss_head_object(&self.path, None, None).await?;
let resp = self
.core
.oss_head_object(&self.path, &OpStat::new())
.await?;

let status = resp.status();
match status {
Expand Down
8 changes: 7 additions & 1 deletion core/src/types/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,16 @@ impl Reader {
Bound::Unbounded => match self.size.load() {
Some(v) => v,
None => {
let mut op_stat = OpStat::new();

if let Some(v) = self.ctx.args().version() {
op_stat = op_stat.with_version(v);
}

let size = self
.ctx
.accessor()
.stat(self.ctx.path(), OpStat::new())
.stat(self.ctx.path(), op_stat)
.await?
.into_metadata()
.content_length();
Expand Down

0 comments on commit e6ccc8f

Please sign in to comment.