Skip to content

Commit

Permalink
Merge pull request ceph#54878 from yuvalif/wip-yuval-split-rgw-tools
Browse files Browse the repository at this point in the history
rgw: split RGWDataAccess from rgw_tools.cc

Reviewed-by: Daniel Gryniewicz <[email protected]>
  • Loading branch information
cbodley authored Dec 15, 2023
2 parents 5252822 + f3cfd02 commit 7911eea
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 325 deletions.
1 change: 1 addition & 0 deletions src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ set(librgw_common_srcs
rgw_bucket_encryption.cc
rgw_tracer.cc
rgw_lua_background.cc
rgw_data_access.cc
driver/rados/cls_fifo_legacy.cc
driver/rados/rgw_bucket.cc
driver/rados/rgw_bucket_sync.cc
Expand Down
170 changes: 0 additions & 170 deletions src/rgw/driver/rados/rgw_tools.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,176 +280,6 @@ void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const strin
}
}

RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver)
{
}


int RGWDataAccess::Bucket::finish_init()
{
auto iter = attrs.find(RGW_ATTR_ACL);
if (iter == attrs.end()) {
return 0;
}

bufferlist::const_iterator bliter = iter->second.begin();
try {
policy.decode(bliter);
} catch (buffer::error& err) {
return -EIO;
}

return 0;
}

int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
{
std::unique_ptr<rgw::sal::Bucket> bucket;
int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y);
if (ret < 0) {
return ret;
}

bucket_info = bucket->get_info();
mtime = bucket->get_modification_time();
attrs = bucket->get_attrs();

return finish_init();
}

int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
const map<string, bufferlist>& _attrs)
{
bucket_info = _bucket_info;
attrs = _attrs;

return finish_init();
}

int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
ObjectRef *obj) {
obj->reset(new Object(sd, shared_from_this(), key));
return 0;
}

int RGWDataAccess::Object::put(bufferlist& data,
map<string, bufferlist>& attrs,
const DoutPrefixProvider *dpp,
optional_yield y)
{
rgw::sal::Driver* driver = sd->driver;
CephContext *cct = driver->ctx();

string tag;
append_rand_alpha(cct, tag, tag, 32);

RGWBucketInfo& bucket_info = bucket->bucket_info;

rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size);

std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info);
std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);

auto& owner = bucket->policy.get_owner();

string req_id = driver->zone_unique_id(driver->get_new_req_id());

std::unique_ptr<rgw::sal::Writer> processor;
processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id,
nullptr, olh_epoch, req_id);

int ret = processor->prepare(y);
if (ret < 0)
return ret;

rgw::sal::DataProcessor *filter = processor.get();

CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;

const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule);
if (compression_type != "none") {
plugin = Compressor::create(driver->ctx(), compression_type);
if (!plugin) {
ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
<< compression_type << dendl;
} else {
compressor.emplace(driver->ctx(), plugin, filter);
filter = &*compressor;
}
}

off_t ofs = 0;
auto obj_size = data.length();

RGWMD5Etag etag_calc;

do {
size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);

bufferlist bl;

data.splice(0, read_len, &bl);
etag_calc.update(bl);

ret = filter->process(std::move(bl), ofs);
if (ret < 0)
return ret;

ofs += read_len;
} while (data.length() > 0);

ret = filter->process({}, ofs);
if (ret < 0) {
return ret;
}
bool has_etag_attr = false;
auto iter = attrs.find(RGW_ATTR_ETAG);
if (iter != attrs.end()) {
bufferlist& bl = iter->second;
etag = bl.to_str();
has_etag_attr = true;
}

if (!aclbl) {
RGWAccessControlPolicy policy;

const auto& owner = bucket->policy.get_owner();
policy.create_default(owner.id, owner.display_name); // default private policy

policy.encode(aclbl.emplace());
}

if (etag.empty()) {
etag_calc.finish(&etag);
}

if (!has_etag_attr) {
bufferlist etagbl;
etagbl.append(etag);
attrs[RGW_ATTR_ETAG] = etagbl;
}
attrs[RGW_ATTR_ACL] = *aclbl;

string *puser_data = nullptr;
if (user_data) {
puser_data = &(*user_data);
}

const req_context rctx{dpp, y, nullptr};
return processor->complete(obj_size, etag,
&mtime, mtime,
attrs, delete_at,
nullptr, nullptr,
puser_data,
nullptr, nullptr, rctx);
}

void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
{
policy.encode(aclbl.emplace());
}

void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
auto pc = c->pc;
librados::CB_AioCompleteAndSafe cb(pc);
Expand Down
155 changes: 0 additions & 155 deletions src/rgw/driver/rados/rgw_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,161 +165,6 @@ int rgw_get_rados_ref(const DoutPrefixProvider* dpp, librados::Rados* rados,
int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct);
void rgw_tools_cleanup();

template<class H, size_t S>
class RGWEtag
{
H hash;

public:
RGWEtag() {
if constexpr (std::is_same_v<H, MD5>) {
// Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
}
}

void update(const char *buf, size_t len) {
hash.Update((const unsigned char *)buf, len);
}

void update(bufferlist& bl) {
if (bl.length() > 0) {
update(bl.c_str(), bl.length());
}
}

void update(const std::string& s) {
if (!s.empty()) {
update(s.c_str(), s.size());
}
}
void finish(std::string *etag) {
char etag_buf[S];
char etag_buf_str[S * 2 + 16];

hash.Final((unsigned char *)etag_buf);
buf_to_hex((const unsigned char *)etag_buf, S,
etag_buf_str);

*etag = etag_buf_str;
}
};

using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>;

class RGWDataAccess
{
rgw::sal::Driver* driver;

public:
RGWDataAccess(rgw::sal::Driver* _driver);

class Object;
class Bucket;

using BucketRef = std::shared_ptr<Bucket>;
using ObjectRef = std::shared_ptr<Object>;

class Bucket : public std::enable_shared_from_this<Bucket> {
friend class RGWDataAccess;
friend class Object;

RGWDataAccess *sd{nullptr};
RGWBucketInfo bucket_info;
std::string tenant;
std::string name;
std::string bucket_id;
ceph::real_time mtime;
std::map<std::string, bufferlist> attrs;

RGWAccessControlPolicy policy;
int finish_init();

Bucket(RGWDataAccess *_sd,
const std::string& _tenant,
const std::string& _name,
const std::string& _bucket_id) : sd(_sd),
tenant(_tenant),
name(_name),
bucket_id(_bucket_id) {}
Bucket(RGWDataAccess *_sd) : sd(_sd) {}
int init(const DoutPrefixProvider *dpp, optional_yield y);
int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs);
public:
int get_object(const rgw_obj_key& key,
ObjectRef *obj);

};


class Object {
RGWDataAccess *sd{nullptr};
BucketRef bucket;
rgw_obj_key key;

ceph::real_time mtime;
std::string etag;
uint64_t olh_epoch{0};
ceph::real_time delete_at;
std::optional<std::string> user_data;

std::optional<bufferlist> aclbl;

Object(RGWDataAccess *_sd,
BucketRef&& _bucket,
const rgw_obj_key& _key) : sd(_sd),
bucket(_bucket),
key(_key) {}
public:
int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */

void set_mtime(const ceph::real_time& _mtime) {
mtime = _mtime;
}

void set_etag(const std::string& _etag) {
etag = _etag;
}

void set_olh_epoch(uint64_t epoch) {
olh_epoch = epoch;
}

void set_delete_at(ceph::real_time _delete_at) {
delete_at = _delete_at;
}

void set_user_data(const std::string& _user_data) {
user_data = _user_data;
}

void set_policy(const RGWAccessControlPolicy& policy);

friend class Bucket;
};

int get_bucket(const DoutPrefixProvider *dpp,
const std::string& tenant,
const std::string name,
const std::string bucket_id,
BucketRef *bucket,
optional_yield y) {
bucket->reset(new Bucket(this, tenant, name, bucket_id));
return (*bucket)->init(dpp, y);
}

int get_bucket(const RGWBucketInfo& bucket_info,
const std::map<std::string, bufferlist>& attrs,
BucketRef *bucket) {
bucket->reset(new Bucket(this));
return (*bucket)->init(bucket_info, attrs);
}
friend class Bucket;
friend class Object;
};

using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>;

/// Complete an AioCompletion. To return error values or otherwise
/// satisfy the caller. Useful for making complicated asynchronous
/// calls and error handling.
Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ extern "C" {
#include "rgw_lua.h"
#include "rgw_sal.h"
#include "rgw_sal_config.h"
#include "rgw_data_access.h"

#include "services/svc_sync_modules.h"
#include "services/svc_cls.h"
Expand Down
Loading

0 comments on commit 7911eea

Please sign in to comment.