Skip to content

Commit

Permalink
python atomic_update binding
Browse files Browse the repository at this point in the history
* mc::client_t::atomic_update interface has changed
  • Loading branch information
Ondrej Holecek committed Oct 3, 2014
1 parent 1582668 commit f7f32c2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 22 deletions.
26 changes: 19 additions & 7 deletions include/mcache/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,26 +302,38 @@ template <
* @param key key for data
* @param fn transformation functor
* @param iters number of iterations limit
* @return the value before update
* @return the value after update
*/
template<typename fn_t>
std::string atomic_update(const std::string &key, fn_t fn,
const opts_t &opts = opts_t(), uint32_t iters = 64)
std::pair<std::string, uint32_t> atomic_update(const std::string &key,
fn_t fn, const opts_t &opts = opts_t())
{
typedef std::pair<std::string, uint32_t> cbres_t;

uint64_t iters = opts.iters;
if (iters == 0) iters = 64;

for (;iters;--iters) {
mc::result_t res = gets(key);
if (!res) {
if (!add(key, boost::unwrap_ref(fn)(std::string()), opts))
mc::proto::opts_t oadd = opts;
cbres_t cbres = boost::unwrap_ref(fn)(std::string(), 0);
oadd.flags = cbres.second;
if (!add(key, cbres.first, oadd)) {
continue;
return std::string();
}
return cbres;
}

mc::proto::opts_t ocas = opts;
ocas.cas = res.cas;

try {
if (cas(key, boost::unwrap_ref(fn)(res.data), ocas))
return res.data;
cbres_t cbres = boost::unwrap_ref(fn)(res.data, res.flags);
ocas.flags = cbres.second;

if (cas(key, cbres.first, ocas))
return cbres;
} catch(const mc::proto::error_t &e) {
if (e.code() != mc::proto::resp::exists) throw;
}
Expand Down
1 change: 1 addition & 0 deletions include/mcache/proto/opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class opts_t {
union {
uint64_t cas; //!< unique identifier retrieved from gets
uint64_t initial; //!< the initial value for incr/decr
uint64_t iters; //!< iterations for atomic_update
};
};

Expand Down
100 changes: 85 additions & 15 deletions python/mcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,48 @@ static void set_from(type_t &res,
if (dict.contains(name)) res = boost::python::extract<type_t>(dict[name]);
}


template <typename client_t>
class atomic_update_fn_t {
public:
atomic_update_fn_t(client_t *client,
boost::python::object fn) :
client(client), fn(fn)
{}

std::pair<std::string, uint32_t>
operator()(const std::string &data, uint32_t flags) {
boost::python::object newdata;

if (data.empty() && flags == 0) {
newdata = fn(boost::python::object());
} else {
boost::python::object pydata =
client->data_from_string(data, flags);
newdata = fn(pydata);
}

std::pair<std::string, uint32_t> rv;
mc::opts_t o;
rv.first = client->to_string(newdata, o);
rv.second = o.flags;
return rv;
}

client_t *client;
boost::python::object fn;
};


} // namespace

/** Python wrapper around memcache client.
*/
template <typename impl_t>
class client_t {
public:


/** Flags that are used as type mark.
*/
enum {
Expand Down Expand Up @@ -190,37 +225,52 @@ class client_t {
return std::string(boost::python::extract<std::string>(dumps(data)));
}


boost::python::object
data_from_string(const std::pair<std::string, uint32_t> &result) {
return data_from_string(result.first, result.second);
}

/** Converts string fetched from memcache server to python object.
*/
boost::python::object
from_string(mc::result_t result,
boost::python::object def = boost::python::object())
{
// call and solve not found as none
if (!result) return not_found(def);

// extract data
boost::python::object data;
switch (result.flags & MASK) {
data_from_string(const std::string &data, uint32_t flags) {
boost::python::object rv;
switch (flags & MASK) {
case PICKLED:
data = loads(as_python_bytes(result.data));
rv = loads(as_python_bytes(data));
break;
case DOUBLE:
data = boost::python::object(aux::cnv<double>::as(result.data));
rv = boost::python::object(aux::cnv<double>::as(data));
break;
#if PY_VERSION_HEX < 0x03000000
case INTEGER:
data = boost::python::object(aux::cnv<int32_t>::as(result.data));
rv = boost::python::object(aux::cnv<int32_t>::as(data));
break;
#endif /* PY_VERSION_HEX */
case LONG:
data = boost::python::object(aux::cnv<int64_t>::as(result.data));
rv = boost::python::object(aux::cnv<int64_t>::as(data));
break;
case STRING:
default:
data = boost::python::object(result.data);
rv = boost::python::object(data);
break;
}
return rv;
}

/** Converts string fetched from memcache server to python result object.
*/
boost::python::object
from_string(mc::result_t result,
boost::python::object def = boost::python::object())
{
// call and solve not found as none
if (!result) return not_found(def);

// extract data
boost::python::object data =
data_from_string(result.data, result.flags);

// return result
boost::python::dict dict;
Expand Down Expand Up @@ -355,6 +405,23 @@ class client_t {
}

bool del(const std::string &key) { return client->del(key);}

boost::python::object atomic_updateo(const std::string &key,
boost::python::object fn, const opts_t &opts) {

atomic_update_fn_t<client_t<impl_t> > pyfn(this, fn);

std::pair<std::string, uint32_t> rv =
client-> template atomic_update(key, pyfn, opts);

return data_from_string(rv);
}

boost::python::object atomic_update(const std::string &key,
boost::python::object fn) {
return atomic_updateo(key, fn, opts_t());
}

// @}

/** Register client object methods.
Expand Down Expand Up @@ -389,7 +456,9 @@ class client_t {
.def("decr", &client_t::decr)
.def("decr", &client_t::decri)
.def("touch", &client_t::touch)
.def("delete", &client_t::del);
.def("delete", &client_t::del)
.def("atomic_update", &client_t::atomic_update)
.def("atomic_update", &client_t::atomic_updateo);
}

private:
Expand Down Expand Up @@ -473,6 +542,7 @@ struct opts_from_python_dict {
set_from(expiration, dict, "expiration");
set_from(flags, dict, "flags");
if (dict.contains("cas")) set_from(cas, dict, "cas");
else if (dict.contains("iters")) set_from(cas, dict, "iters");
else set_from(cas, dict, "initial");

// grab pointer to memory into which to construct the new mc::opts_t
Expand Down

0 comments on commit f7f32c2

Please sign in to comment.