Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zlib compression #23

Merged
merged 3 commits into from
Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ AC_LANG_C
AC_CHECK_HEADER([readline/readline.h], [AC_DEFINE([HAVE_READLINE_H], [1])], [])
AC_CHECK_LIB(readline, main, [READLINE_LIB="-lreadline"], [])

# check fore zlib
AC_LANG_C
AC_CHECK_HEADER([zlib.h], [], AC_MSG_WARN([zlib headers missing]))
AC_CHECK_LIB(z, main, [ZLIB_LIBS=-lz], [AC_MSG_WARN([zlib library missing])])

## Check for LIBTBB
AC_LANG_CPLUSPLUS
PKG_CHECK_MODULES([LIBTBB], [tbb >= 2.0], [has_tbb=yes AC_DEFINE([HAVE_LIBTBB])], [has_tbb=no])
Expand All @@ -191,7 +196,7 @@ AS_IF([test "x$has_tbb" = "xno"], [
], [])
], [])

LIBMCACHE_EXTRA_LIBS="$BOOST_SYSTEM $BOOST_THREAD"
LIBMCACHE_EXTRA_LIBS="$BOOST_SYSTEM $BOOST_THREAD $ZLIB_LIBS"

GIT_REVISION="`git describe --all --dirty --long --abbrev=40`"

Expand All @@ -201,6 +206,7 @@ AC_SUBST(BOOST_SYSTEM)
AC_SUBST(BOOST_THREAD)
AC_SUBST(READLINE_LIB)
AC_SUBST(LIBMCACHE_EXTRA_LIBS)
AC_SUBST(ZLIB_LIBS)

# for tests
AC_SEARCH_LIBS(dlopen, dl)
Expand Down
68 changes: 34 additions & 34 deletions include/mcache/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace mc {
// push options into mc namespace
using proto::opts_t;

/** Returns true if library is initialized.
*/
bool is_initialized();

/** Result of all get commands.
Expand Down Expand Up @@ -129,7 +131,10 @@ template <
const client_config_t ccfg = client_config_t())
: pool(addresses), proxies(addresses),
max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration)
{}
{
if (!is_initialized())
throw error_t(err::internal_error, "mc::init() hasn't been called");
}

/** C'tor.
*/
Expand All @@ -139,7 +144,10 @@ template <
const client_config_t ccfg = client_config_t())
: pool(addresses), proxies(addresses, scfg),
max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration)
{}
{
if (!is_initialized())
throw error_t(err::internal_error, "mc::init() hasn't been called");
}

/** C'tor.
*/
Expand All @@ -150,7 +158,10 @@ template <
const client_config_t ccfg = client_config_t())
: pool(addresses, pcfg), proxies(addresses, scfg),
max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration)
{}
{
if (!is_initialized())
throw error_t(err::internal_error, "mc::init() hasn't been called");
}

///////////////////// STANDARD MEMCACHE CLIENT API ////////////////////////

Expand Down Expand Up @@ -303,42 +314,31 @@ template <

/** Apply functor to variable in memcache
* @param key key for data
* @param fn transformation functor
* @param callback transformation functor
* @param iters number of iterations limit
* @return the value after update
*/
template<typename fn_t>
std::pair<std::string, uint32_t> atomic_update(const std::string &key,
fn_t fn, const opts_t &opts = opts_t())
template <typename callback_t>
std::pair<std::string, uint32_t>
atomic_update(const std::string &key,
callback_t callback,
const opts_t &opts = opts_t())
{
typedef std::pair<std::string, uint32_t> cbres_t;

uint64_t iters = opts.iters;
if (iters == 0) iters = 64;

for (;iters;--iters) {
mc::result_t res = gets(key);
if (!res) {
mc::proto::opts_t oadd = opts;
cbres_t cbres = boost::unwrap_ref(fn)(std::string(), 0);
oadd.flags = cbres.second;
if (!add(key, cbres.first, oadd)) {
continue;
for (uint64_t iters = opts.iters? opts.iters: 64; iters; --iters) {
if (auto res = gets(key)) {
// try compare and swap command
auto new_data = callback(res.data, res.flags);
mc::opts_t cas_opts(opts.expiration, new_data.second, res.cas);
try {
if (cas(key, new_data.first, cas_opts)) return new_data;
} catch(const mc::proto::error_t &e) {
if (e.code() != mc::proto::resp::exists) throw;
}
return cbres;
}

mc::proto::opts_t ocas = opts;
ocas.cas = res.cas;

try {
cbres_t cbres = boost::unwrap_ref(fn)(res.data, res.flags);
ocas.flags = cbres.second;

if (cas(key, cbres.first, ocas))
return cbres;
} catch(const mc::proto::error_t &e) {
if (e.code() != mc::proto::resp::exists) throw;
} else {
// cas failed we try add new key
auto new_data = callback(std::string(), 0);
mc::opts_t add_opts(opts.expiration, new_data.second);
if (add(key, new_data.first, add_opts)) return new_data;
}
}
throw error_t(err::unable_cas, "max iterations reached");
Expand Down
2 changes: 1 addition & 1 deletion include/mcache/proto/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

includedir = @includedir@/mcache/proto

include_HEADERS = binary.h error.h opts.h parser.h response.h txt.h
include_HEADERS = binary.h error.h opts.h parser.h response.h txt.h zlib.h

17 changes: 10 additions & 7 deletions include/mcache/proto/binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <mcache/proto/opts.h>
#include <mcache/proto/response.h>
#include <mcache/proto/zlib.h>

namespace mc {
namespace proto {
Expand Down Expand Up @@ -96,14 +97,16 @@ class storage_command_t: public command_t {
/** C'tor.
*/
storage_command_t(const std::string &key,
const std::string &data,
const std::string &in_data,
const opts_t &opts = opts_t())
: command_t(static_cast <uint16_t>(key.size()),
static_cast <uint32_t>(key.size() + data.size()
+ (has_extras? extras_length: 0)),
static_cast <uint8_t>(has_extras? extras_length: 0)),
key(key), data(data), opts(opts)
{}
: command_t(static_cast<uint16_t>(key.size()), 0,
static_cast<uint8_t>(has_extras? extras_length: 0)),
key(key),
data(opts.flags & opts.compress? zlib::compress(in_data): in_data),
opts(opts)
{
body_len = static_cast<uint32_t>(key_len + data.size() + extras_len);
}

/** Deserialize responses for set, add, .. storage commands.
*/
Expand Down
4 changes: 4 additions & 0 deletions include/mcache/proto/opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class opts_t {
: expiration(expiration), flags(flags), cas(cas)
{}

// builtin flags - uses lower bits of upper uint16_t since python uses
// upper bits of upper uint16_t
enum { compress = 0x00010000, builtin_mask = compress};

time_t expiration; //!< expiration time (secs from now at server)
uint32_t flags; //!< flags for held value on server
union {
Expand Down
5 changes: 4 additions & 1 deletion include/mcache/proto/txt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mcache/error.h>
#include <mcache/proto/opts.h>
#include <mcache/proto/response.h>
#include <mcache/proto/zlib.h>

namespace mc {
namespace proto {
Expand Down Expand Up @@ -87,7 +88,9 @@ class storage_command_t: public command_t {
storage_command_t(const std::string &key,
const std::string &data,
const opts_t &opts = opts_t())
: key(key), data(data), opts(opts)
: key(key),
data(opts.flags & opts.compress? zlib::compress(data): data),
opts(opts)
{}

/** Deserialize responses for set, add, .. storage commands.
Expand Down
43 changes: 43 additions & 0 deletions include/mcache/proto/zlib.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* FILE $Id: $
*
* DESCRIPTION Command serialization/deserialization: zlib stuff.
*
* PROJECT Seznam memcache client.
*
* LICENSE See COPYING
*
* AUTHOR Michal Bukovsky <[email protected]>
*
* Copyright (C) Seznam.cz a.s. 2012
* All Rights Reserved
*
* HISTORY
* 2016-03-03 (bukovsky)
* First draft.
*/

#ifndef MCACHE_PROTO_ZLIB_H
#define MCACHE_PROTO_ZLIB_H

namespace mc {
namespace proto {
namespace zlib {

/** Returns compressed data.
*/
std::string compress(const std::string &data);

/** Returns uncompressed data.
*/
std::string
uncompress(const std::string &data,
std::string::size_type index = 0,
std::string::size_type count = std::string::npos);

} // namespace zlib
} // namespace proto
} // namespace mc

#endif /* MCACHE_PROTO_ZLIB_H */

86 changes: 35 additions & 51 deletions python/mcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,39 +92,6 @@ static void set_from(type_t &res,
if (dict.contains(name)) res = boost::python::extract<type_t>(dict[name]);
}


template <typename client_t>
class atomic_update_fn_t {
public:
atomic_update_fn_t(client_t *client,
boost::python::object fn) :
client(client), fn(fn)
{}

std::pair<std::string, uint32_t>
operator()(const std::string &data, uint32_t flags) {
boost::python::object newdata;

if (data.empty() && flags == 0) {
newdata = fn(boost::python::object());
} else {
boost::python::object pydata =
client->data_from_string(data, flags);
newdata = fn(pydata);
}

std::pair<std::string, uint32_t> rv;
mc::opts_t o;
rv.first = client->to_string(newdata, o);
rv.second = o.flags;
return rv;
}

client_t *client;
boost::python::object fn;
};


} // namespace

/** Python wrapper around memcache client.
Expand Down Expand Up @@ -224,7 +191,8 @@ class client_t {
return std::string(boost::python::extract<std::string>(dumps(data)));
}


/** Just a shortcut.
*/
boost::python::object
data_from_string(const std::pair<std::string, uint32_t> &result) {
return data_from_string(result.first, result.second);
Expand Down Expand Up @@ -268,8 +236,7 @@ class client_t {
if (!result) return not_found(def);

// extract data
boost::python::object
data = data_from_string(result.data, result.flags);
auto data = data_from_string(result.data, result.flags);

// return result
boost::python::dict dict;
Expand Down Expand Up @@ -355,14 +322,14 @@ class client_t {
getd(const std::string &key, boost::python::object def) {
try {
return from_string(client->get(key), def);
} catch (const mc::out_of_servers_t &) { return not_found(def);}
} catch (const mc::out_of_servers_t &) { return not_found(def);}
}

boost::python::object
getsd(const std::string &key, boost::python::object def) {
try {
return from_string(client->gets(key), def);
} catch (const mc::out_of_servers_t &) { return not_found(def);}
} catch (const mc::out_of_servers_t &) { return not_found(def);}
}

boost::python::object incr(const std::string &key) {
Expand Down Expand Up @@ -407,22 +374,27 @@ class client_t {

bool del(const std::string &key) { return client->del(key);}

boost::python::object atomic_updateo(const std::string &key,
boost::python::object fn, const opts_t &opts) {

atomic_update_fn_t<client_t<impl_t> > pyfn(this, fn);

std::pair<std::string, uint32_t> rv =
client-> template atomic_update(key, pyfn, opts);

return data_from_string(rv);
}

boost::python::object atomic_update(const std::string &key,
boost::python::object fn) {
boost::python::object
atomic_update(const std::string &key, boost::python::object fn) {
return atomic_updateo(key, fn, opts_t());
}

boost::python::object
atomic_updateo(const std::string &key,
boost::python::object callback,
const opts_t &opts)
{
boost::python::object res;
client->atomic_update(key,
[&] (const std::string &data, uint32_t flags) {
mc::opts_t res_opts;
res = (data.empty() && !flags)?
callback(boost::python::object()):
callback(data_from_string(data, flags));
return std::make_pair(to_string(res, res_opts), res_opts.flags);
}, opts);
return res;
}
// @}

/** Register client object methods.
Expand Down Expand Up @@ -546,6 +518,18 @@ struct opts_from_python_dict {
else if (dict.contains("iters")) set_from(cas, dict, "iters");
else set_from(cas, dict, "initial");

// throw if flags has set bits used flags by the library
if (flags & mc::opts_t::builtin_mask) {
throw mc::error_t(mc::err::bad_argument,
"some lower bits of upper uint16_t of the "
"flags are used by mcache python wrapper");
}

// auto compress feature
if (dict.contains("compress"))
if (boost::python::extract<bool>(dict["compress"]))
flags |= mc::opts_t::compress;

// grab pointer to memory into which to construct the new mc::opts_t
void *storage = ((boost::python::converter
::rvalue_from_python_storage<mc::opts_t> *)data)
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
libraries=[boost_python,
'boost_system',
'boost_thread',
'z',
'mcache'],
extra_compile_args=['-W',
'-Wall',
Expand Down
Loading