Skip to content

Commit

Permalink
Merge pull request #23 from burlog/master
Browse files Browse the repository at this point in the history
zlib compression
  • Loading branch information
burlog committed Mar 4, 2016
2 parents 5db277c + 394c55d commit df33d83
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 106 deletions.
8 changes: 7 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ AC_LANG_C
AC_CHECK_HEADER([readline/readline.h], [AC_DEFINE([HAVE_READLINE_H], [1])], [])
AC_CHECK_LIB(readline, main, [READLINE_LIB="-lreadline"], [])

# check fore zlib
AC_LANG_C
AC_CHECK_HEADER([zlib.h], [], AC_MSG_WARN([zlib headers missing]))
AC_CHECK_LIB(z, main, [ZLIB_LIBS=-lz], [AC_MSG_WARN([zlib library missing])])

## Check for LIBTBB
AC_LANG_CPLUSPLUS
PKG_CHECK_MODULES([LIBTBB], [tbb >= 2.0], [has_tbb=yes AC_DEFINE([HAVE_LIBTBB])], [has_tbb=no])
Expand All @@ -191,7 +196,7 @@ AS_IF([test "x$has_tbb" = "xno"], [
], [])
], [])

LIBMCACHE_EXTRA_LIBS="$BOOST_SYSTEM $BOOST_THREAD"
LIBMCACHE_EXTRA_LIBS="$BOOST_SYSTEM $BOOST_THREAD $ZLIB_LIBS"

GIT_REVISION="`git describe --all --dirty --long --abbrev=40`"

Expand All @@ -201,6 +206,7 @@ AC_SUBST(BOOST_SYSTEM)
AC_SUBST(BOOST_THREAD)
AC_SUBST(READLINE_LIB)
AC_SUBST(LIBMCACHE_EXTRA_LIBS)
AC_SUBST(ZLIB_LIBS)

# for tests
AC_SEARCH_LIBS(dlopen, dl)
Expand Down
68 changes: 34 additions & 34 deletions include/mcache/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace mc {
// push options into mc namespace
using proto::opts_t;

/** Returns true if library is initialized.
*/
bool is_initialized();

/** Result of all get commands.
Expand Down Expand Up @@ -129,7 +131,10 @@ template <
const client_config_t ccfg = client_config_t())
: pool(addresses), proxies(addresses),
max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration)
{}
{
if (!is_initialized())
throw error_t(err::internal_error, "mc::init() hasn't been called");
}

/** C'tor.
*/
Expand All @@ -139,7 +144,10 @@ template <
const client_config_t ccfg = client_config_t())
: pool(addresses), proxies(addresses, scfg),
max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration)
{}
{
if (!is_initialized())
throw error_t(err::internal_error, "mc::init() hasn't been called");
}

/** C'tor.
*/
Expand All @@ -150,7 +158,10 @@ template <
const client_config_t ccfg = client_config_t())
: pool(addresses, pcfg), proxies(addresses, scfg),
max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration)
{}
{
if (!is_initialized())
throw error_t(err::internal_error, "mc::init() hasn't been called");
}

///////////////////// STANDARD MEMCACHE CLIENT API ////////////////////////

Expand Down Expand Up @@ -303,42 +314,31 @@ template <

/** Apply functor to variable in memcache
* @param key key for data
* @param fn transformation functor
* @param callback transformation functor
* @param iters number of iterations limit
* @return the value after update
*/
template<typename fn_t>
std::pair<std::string, uint32_t> atomic_update(const std::string &key,
fn_t fn, const opts_t &opts = opts_t())
template <typename callback_t>
std::pair<std::string, uint32_t>
atomic_update(const std::string &key,
callback_t callback,
const opts_t &opts = opts_t())
{
typedef std::pair<std::string, uint32_t> cbres_t;

uint64_t iters = opts.iters;
if (iters == 0) iters = 64;

for (;iters;--iters) {
mc::result_t res = gets(key);
if (!res) {
mc::proto::opts_t oadd = opts;
cbres_t cbres = boost::unwrap_ref(fn)(std::string(), 0);
oadd.flags = cbres.second;
if (!add(key, cbres.first, oadd)) {
continue;
for (uint64_t iters = opts.iters? opts.iters: 64; iters; --iters) {
if (auto res = gets(key)) {
// try compare and swap command
auto new_data = callback(res.data, res.flags);
mc::opts_t cas_opts(opts.expiration, new_data.second, res.cas);
try {
if (cas(key, new_data.first, cas_opts)) return new_data;
} catch(const mc::proto::error_t &e) {
if (e.code() != mc::proto::resp::exists) throw;
}
return cbres;
}

mc::proto::opts_t ocas = opts;
ocas.cas = res.cas;

try {
cbres_t cbres = boost::unwrap_ref(fn)(res.data, res.flags);
ocas.flags = cbres.second;

if (cas(key, cbres.first, ocas))
return cbres;
} catch(const mc::proto::error_t &e) {
if (e.code() != mc::proto::resp::exists) throw;
} else {
// cas failed we try add new key
auto new_data = callback(std::string(), 0);
mc::opts_t add_opts(opts.expiration, new_data.second);
if (add(key, new_data.first, add_opts)) return new_data;
}
}
throw error_t(err::unable_cas, "max iterations reached");
Expand Down
2 changes: 1 addition & 1 deletion include/mcache/proto/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

includedir = @includedir@/mcache/proto

include_HEADERS = binary.h error.h opts.h parser.h response.h txt.h
include_HEADERS = binary.h error.h opts.h parser.h response.h txt.h zlib.h

17 changes: 10 additions & 7 deletions include/mcache/proto/binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <mcache/proto/opts.h>
#include <mcache/proto/response.h>
#include <mcache/proto/zlib.h>

namespace mc {
namespace proto {
Expand Down Expand Up @@ -96,14 +97,16 @@ class storage_command_t: public command_t {
/** C'tor.
*/
storage_command_t(const std::string &key,
const std::string &data,
const std::string &in_data,
const opts_t &opts = opts_t())
: command_t(static_cast <uint16_t>(key.size()),
static_cast <uint32_t>(key.size() + data.size()
+ (has_extras? extras_length: 0)),
static_cast <uint8_t>(has_extras? extras_length: 0)),
key(key), data(data), opts(opts)
{}
: command_t(static_cast<uint16_t>(key.size()), 0,
static_cast<uint8_t>(has_extras? extras_length: 0)),
key(key),
data(opts.flags & opts.compress? zlib::compress(in_data): in_data),
opts(opts)
{
body_len = static_cast<uint32_t>(key_len + data.size() + extras_len);
}

/** Deserialize responses for set, add, .. storage commands.
*/
Expand Down
4 changes: 4 additions & 0 deletions include/mcache/proto/opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class opts_t {
: expiration(expiration), flags(flags), cas(cas)
{}

// builtin flags - uses lower bits of upper uint16_t since python uses
// upper bits of upper uint16_t
enum { compress = 0x00010000, builtin_mask = compress};

time_t expiration; //!< expiration time (secs from now at server)
uint32_t flags; //!< flags for held value on server
union {
Expand Down
5 changes: 4 additions & 1 deletion include/mcache/proto/txt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mcache/error.h>
#include <mcache/proto/opts.h>
#include <mcache/proto/response.h>
#include <mcache/proto/zlib.h>

namespace mc {
namespace proto {
Expand Down Expand Up @@ -87,7 +88,9 @@ class storage_command_t: public command_t {
storage_command_t(const std::string &key,
const std::string &data,
const opts_t &opts = opts_t())
: key(key), data(data), opts(opts)
: key(key),
data(opts.flags & opts.compress? zlib::compress(data): data),
opts(opts)
{}

/** Deserialize responses for set, add, .. storage commands.
Expand Down
43 changes: 43 additions & 0 deletions include/mcache/proto/zlib.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* FILE $Id: $
*
* DESCRIPTION Command serialization/deserialization: zlib stuff.
*
* PROJECT Seznam memcache client.
*
* LICENSE See COPYING
*
* AUTHOR Michal Bukovsky <[email protected]>
*
* Copyright (C) Seznam.cz a.s. 2012
* All Rights Reserved
*
* HISTORY
* 2016-03-03 (bukovsky)
* First draft.
*/

#ifndef MCACHE_PROTO_ZLIB_H
#define MCACHE_PROTO_ZLIB_H

namespace mc {
namespace proto {
namespace zlib {

/** Returns compressed data.
*/
std::string compress(const std::string &data);

/** Returns uncompressed data.
*/
std::string
uncompress(const std::string &data,
std::string::size_type index = 0,
std::string::size_type count = std::string::npos);

} // namespace zlib
} // namespace proto
} // namespace mc

#endif /* MCACHE_PROTO_ZLIB_H */

86 changes: 35 additions & 51 deletions python/mcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,39 +92,6 @@ static void set_from(type_t &res,
if (dict.contains(name)) res = boost::python::extract<type_t>(dict[name]);
}


template <typename client_t>
class atomic_update_fn_t {
public:
atomic_update_fn_t(client_t *client,
boost::python::object fn) :
client(client), fn(fn)
{}

std::pair<std::string, uint32_t>
operator()(const std::string &data, uint32_t flags) {
boost::python::object newdata;

if (data.empty() && flags == 0) {
newdata = fn(boost::python::object());
} else {
boost::python::object pydata =
client->data_from_string(data, flags);
newdata = fn(pydata);
}

std::pair<std::string, uint32_t> rv;
mc::opts_t o;
rv.first = client->to_string(newdata, o);
rv.second = o.flags;
return rv;
}

client_t *client;
boost::python::object fn;
};


} // namespace

/** Python wrapper around memcache client.
Expand Down Expand Up @@ -224,7 +191,8 @@ class client_t {
return std::string(boost::python::extract<std::string>(dumps(data)));
}


/** Just a shortcut.
*/
boost::python::object
data_from_string(const std::pair<std::string, uint32_t> &result) {
return data_from_string(result.first, result.second);
Expand Down Expand Up @@ -268,8 +236,7 @@ class client_t {
if (!result) return not_found(def);

// extract data
boost::python::object
data = data_from_string(result.data, result.flags);
auto data = data_from_string(result.data, result.flags);

// return result
boost::python::dict dict;
Expand Down Expand Up @@ -355,14 +322,14 @@ class client_t {
getd(const std::string &key, boost::python::object def) {
try {
return from_string(client->get(key), def);
} catch (const mc::out_of_servers_t &) { return not_found(def);}
} catch (const mc::out_of_servers_t &) { return not_found(def);}
}

boost::python::object
getsd(const std::string &key, boost::python::object def) {
try {
return from_string(client->gets(key), def);
} catch (const mc::out_of_servers_t &) { return not_found(def);}
} catch (const mc::out_of_servers_t &) { return not_found(def);}
}

boost::python::object incr(const std::string &key) {
Expand Down Expand Up @@ -407,22 +374,27 @@ class client_t {

bool del(const std::string &key) { return client->del(key);}

boost::python::object atomic_updateo(const std::string &key,
boost::python::object fn, const opts_t &opts) {

atomic_update_fn_t<client_t<impl_t> > pyfn(this, fn);

std::pair<std::string, uint32_t> rv =
client-> template atomic_update(key, pyfn, opts);

return data_from_string(rv);
}

boost::python::object atomic_update(const std::string &key,
boost::python::object fn) {
boost::python::object
atomic_update(const std::string &key, boost::python::object fn) {
return atomic_updateo(key, fn, opts_t());
}

boost::python::object
atomic_updateo(const std::string &key,
boost::python::object callback,
const opts_t &opts)
{
boost::python::object res;
client->atomic_update(key,
[&] (const std::string &data, uint32_t flags) {
mc::opts_t res_opts;
res = (data.empty() && !flags)?
callback(boost::python::object()):
callback(data_from_string(data, flags));
return std::make_pair(to_string(res, res_opts), res_opts.flags);
}, opts);
return res;
}
// @}

/** Register client object methods.
Expand Down Expand Up @@ -546,6 +518,18 @@ struct opts_from_python_dict {
else if (dict.contains("iters")) set_from(cas, dict, "iters");
else set_from(cas, dict, "initial");

// throw if flags has set bits used flags by the library
if (flags & mc::opts_t::builtin_mask) {
throw mc::error_t(mc::err::bad_argument,
"some lower bits of upper uint16_t of the "
"flags are used by mcache python wrapper");
}

// auto compress feature
if (dict.contains("compress"))
if (boost::python::extract<bool>(dict["compress"]))
flags |= mc::opts_t::compress;

// grab pointer to memory into which to construct the new mc::opts_t
void *storage = ((boost::python::converter
::rvalue_from_python_storage<mc::opts_t> *)data)
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
libraries=[boost_python,
'boost_system',
'boost_thread',
'z',
'mcache'],
extra_compile_args=['-W',
'-Wall',
Expand Down
Loading

0 comments on commit df33d83

Please sign in to comment.