diff --git a/configure.ac b/configure.ac index b5a46a5..3774321 100644 --- a/configure.ac +++ b/configure.ac @@ -180,6 +180,11 @@ AC_LANG_C AC_CHECK_HEADER([readline/readline.h], [AC_DEFINE([HAVE_READLINE_H], [1])], []) AC_CHECK_LIB(readline, main, [READLINE_LIB="-lreadline"], []) +# check fore zlib +AC_LANG_C +AC_CHECK_HEADER([zlib.h], [], AC_MSG_WARN([zlib headers missing])) +AC_CHECK_LIB(z, main, [ZLIB_LIBS=-lz], [AC_MSG_WARN([zlib library missing])]) + ## Check for LIBTBB AC_LANG_CPLUSPLUS PKG_CHECK_MODULES([LIBTBB], [tbb >= 2.0], [has_tbb=yes AC_DEFINE([HAVE_LIBTBB])], [has_tbb=no]) @@ -191,7 +196,7 @@ AS_IF([test "x$has_tbb" = "xno"], [ ], []) ], []) -LIBMCACHE_EXTRA_LIBS="$BOOST_SYSTEM $BOOST_THREAD" +LIBMCACHE_EXTRA_LIBS="$BOOST_SYSTEM $BOOST_THREAD $ZLIB_LIBS" GIT_REVISION="`git describe --all --dirty --long --abbrev=40`" @@ -201,6 +206,7 @@ AC_SUBST(BOOST_SYSTEM) AC_SUBST(BOOST_THREAD) AC_SUBST(READLINE_LIB) AC_SUBST(LIBMCACHE_EXTRA_LIBS) +AC_SUBST(ZLIB_LIBS) # for tests AC_SEARCH_LIBS(dlopen, dl) diff --git a/include/mcache/client.h b/include/mcache/client.h index 63e9df1..6dcfb24 100644 --- a/include/mcache/client.h +++ b/include/mcache/client.h @@ -42,6 +42,8 @@ namespace mc { // push options into mc namespace using proto::opts_t; +/** Returns true if library is initialized. + */ bool is_initialized(); /** Result of all get commands. @@ -129,7 +131,10 @@ template < const client_config_t ccfg = client_config_t()) : pool(addresses), proxies(addresses), max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration) - {} + { + if (!is_initialized()) + throw error_t(err::internal_error, "mc::init() hasn't been called"); + } /** C'tor. */ @@ -139,7 +144,10 @@ template < const client_config_t ccfg = client_config_t()) : pool(addresses), proxies(addresses, scfg), max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration) - {} + { + if (!is_initialized()) + throw error_t(err::internal_error, "mc::init() hasn't been called"); + } /** C'tor. */ @@ -150,7 +158,10 @@ template < const client_config_t ccfg = client_config_t()) : pool(addresses, pcfg), proxies(addresses, scfg), max_continues(ccfg.max_continues), h404_duration(ccfg.h404_duration) - {} + { + if (!is_initialized()) + throw error_t(err::internal_error, "mc::init() hasn't been called"); + } ///////////////////// STANDARD MEMCACHE CLIENT API //////////////////////// @@ -303,42 +314,31 @@ template < /** Apply functor to variable in memcache * @param key key for data - * @param fn transformation functor + * @param callback transformation functor * @param iters number of iterations limit * @return the value after update */ - template<typename fn_t> - std::pair<std::string, uint32_t> atomic_update(const std::string &key, - fn_t fn, const opts_t &opts = opts_t()) + template <typename callback_t> + std::pair<std::string, uint32_t> + atomic_update(const std::string &key, + callback_t callback, + const opts_t &opts = opts_t()) { - typedef std::pair<std::string, uint32_t> cbres_t; - - uint64_t iters = opts.iters; - if (iters == 0) iters = 64; - - for (;iters;--iters) { - mc::result_t res = gets(key); - if (!res) { - mc::proto::opts_t oadd = opts; - cbres_t cbres = boost::unwrap_ref(fn)(std::string(), 0); - oadd.flags = cbres.second; - if (!add(key, cbres.first, oadd)) { - continue; + for (uint64_t iters = opts.iters? opts.iters: 64; iters; --iters) { + if (auto res = gets(key)) { + // try compare and swap command + auto new_data = callback(res.data, res.flags); + mc::opts_t cas_opts(opts.expiration, new_data.second, res.cas); + try { + if (cas(key, new_data.first, cas_opts)) return new_data; + } catch(const mc::proto::error_t &e) { + if (e.code() != mc::proto::resp::exists) throw; } - return cbres; - } - - mc::proto::opts_t ocas = opts; - ocas.cas = res.cas; - - try { - cbres_t cbres = boost::unwrap_ref(fn)(res.data, res.flags); - ocas.flags = cbres.second; - - if (cas(key, cbres.first, ocas)) - return cbres; - } catch(const mc::proto::error_t &e) { - if (e.code() != mc::proto::resp::exists) throw; + } else { + // cas failed we try add new key + auto new_data = callback(std::string(), 0); + mc::opts_t add_opts(opts.expiration, new_data.second); + if (add(key, new_data.first, add_opts)) return new_data; } } throw error_t(err::unable_cas, "max iterations reached"); diff --git a/include/mcache/proto/Makefile.am b/include/mcache/proto/Makefile.am index e116206..bc7ccd2 100644 --- a/include/mcache/proto/Makefile.am +++ b/include/mcache/proto/Makefile.am @@ -17,5 +17,5 @@ includedir = @includedir@/mcache/proto -include_HEADERS = binary.h error.h opts.h parser.h response.h txt.h +include_HEADERS = binary.h error.h opts.h parser.h response.h txt.h zlib.h diff --git a/include/mcache/proto/binary.h b/include/mcache/proto/binary.h index dc26cc3..a67b693 100644 --- a/include/mcache/proto/binary.h +++ b/include/mcache/proto/binary.h @@ -26,6 +26,7 @@ #include <mcache/proto/opts.h> #include <mcache/proto/response.h> +#include <mcache/proto/zlib.h> namespace mc { namespace proto { @@ -96,14 +97,16 @@ class storage_command_t: public command_t { /** C'tor. */ storage_command_t(const std::string &key, - const std::string &data, + const std::string &in_data, const opts_t &opts = opts_t()) - : command_t(static_cast <uint16_t>(key.size()), - static_cast <uint32_t>(key.size() + data.size() - + (has_extras? extras_length: 0)), - static_cast <uint8_t>(has_extras? extras_length: 0)), - key(key), data(data), opts(opts) - {} + : command_t(static_cast<uint16_t>(key.size()), 0, + static_cast<uint8_t>(has_extras? extras_length: 0)), + key(key), + data(opts.flags & opts.compress? zlib::compress(in_data): in_data), + opts(opts) + { + body_len = static_cast<uint32_t>(key_len + data.size() + extras_len); + } /** Deserialize responses for set, add, .. storage commands. */ diff --git a/include/mcache/proto/opts.h b/include/mcache/proto/opts.h index e73bdd7..49aa335 100644 --- a/include/mcache/proto/opts.h +++ b/include/mcache/proto/opts.h @@ -62,6 +62,10 @@ class opts_t { : expiration(expiration), flags(flags), cas(cas) {} + // builtin flags - uses lower bits of upper uint16_t since python uses + // upper bits of upper uint16_t + enum { compress = 0x00010000, builtin_mask = compress}; + time_t expiration; //!< expiration time (secs from now at server) uint32_t flags; //!< flags for held value on server union { diff --git a/include/mcache/proto/txt.h b/include/mcache/proto/txt.h index 416fd97..d7a78af 100644 --- a/include/mcache/proto/txt.h +++ b/include/mcache/proto/txt.h @@ -26,6 +26,7 @@ #include <mcache/error.h> #include <mcache/proto/opts.h> #include <mcache/proto/response.h> +#include <mcache/proto/zlib.h> namespace mc { namespace proto { @@ -87,7 +88,9 @@ class storage_command_t: public command_t { storage_command_t(const std::string &key, const std::string &data, const opts_t &opts = opts_t()) - : key(key), data(data), opts(opts) + : key(key), + data(opts.flags & opts.compress? zlib::compress(data): data), + opts(opts) {} /** Deserialize responses for set, add, .. storage commands. diff --git a/include/mcache/proto/zlib.h b/include/mcache/proto/zlib.h new file mode 100644 index 0000000..d8bbc1d --- /dev/null +++ b/include/mcache/proto/zlib.h @@ -0,0 +1,43 @@ +/* + * FILE $Id: $ + * + * DESCRIPTION Command serialization/deserialization: zlib stuff. + * + * PROJECT Seznam memcache client. + * + * LICENSE See COPYING + * + * AUTHOR Michal Bukovsky <michal.bukovsky@firma.seznam.cz> + * + * Copyright (C) Seznam.cz a.s. 2012 + * All Rights Reserved + * + * HISTORY + * 2016-03-03 (bukovsky) + * First draft. + */ + +#ifndef MCACHE_PROTO_ZLIB_H +#define MCACHE_PROTO_ZLIB_H + +namespace mc { +namespace proto { +namespace zlib { + +/** Returns compressed data. + */ +std::string compress(const std::string &data); + +/** Returns uncompressed data. + */ +std::string +uncompress(const std::string &data, + std::string::size_type index = 0, + std::string::size_type count = std::string::npos); + +} // namespace zlib +} // namespace proto +} // namespace mc + +#endif /* MCACHE_PROTO_ZLIB_H */ + diff --git a/python/mcache.cc b/python/mcache.cc index 9d14740..614590a 100644 --- a/python/mcache.cc +++ b/python/mcache.cc @@ -92,39 +92,6 @@ static void set_from(type_t &res, if (dict.contains(name)) res = boost::python::extract<type_t>(dict[name]); } - -template <typename client_t> -class atomic_update_fn_t { -public: - atomic_update_fn_t(client_t *client, - boost::python::object fn) : - client(client), fn(fn) - {} - - std::pair<std::string, uint32_t> - operator()(const std::string &data, uint32_t flags) { - boost::python::object newdata; - - if (data.empty() && flags == 0) { - newdata = fn(boost::python::object()); - } else { - boost::python::object pydata = - client->data_from_string(data, flags); - newdata = fn(pydata); - } - - std::pair<std::string, uint32_t> rv; - mc::opts_t o; - rv.first = client->to_string(newdata, o); - rv.second = o.flags; - return rv; - } - - client_t *client; - boost::python::object fn; -}; - - } // namespace /** Python wrapper around memcache client. @@ -224,7 +191,8 @@ class client_t { return std::string(boost::python::extract<std::string>(dumps(data))); } - + /** Just a shortcut. + */ boost::python::object data_from_string(const std::pair<std::string, uint32_t> &result) { return data_from_string(result.first, result.second); @@ -268,8 +236,7 @@ class client_t { if (!result) return not_found(def); // extract data - boost::python::object - data = data_from_string(result.data, result.flags); + auto data = data_from_string(result.data, result.flags); // return result boost::python::dict dict; @@ -355,14 +322,14 @@ class client_t { getd(const std::string &key, boost::python::object def) { try { return from_string(client->get(key), def); - } catch (const mc::out_of_servers_t &) { return not_found(def);} + } catch (const mc::out_of_servers_t &) { return not_found(def);} } boost::python::object getsd(const std::string &key, boost::python::object def) { try { return from_string(client->gets(key), def); - } catch (const mc::out_of_servers_t &) { return not_found(def);} + } catch (const mc::out_of_servers_t &) { return not_found(def);} } boost::python::object incr(const std::string &key) { @@ -407,22 +374,27 @@ class client_t { bool del(const std::string &key) { return client->del(key);} - boost::python::object atomic_updateo(const std::string &key, - boost::python::object fn, const opts_t &opts) { - - atomic_update_fn_t<client_t<impl_t> > pyfn(this, fn); - - std::pair<std::string, uint32_t> rv = - client-> template atomic_update(key, pyfn, opts); - - return data_from_string(rv); - } - - boost::python::object atomic_update(const std::string &key, - boost::python::object fn) { + boost::python::object + atomic_update(const std::string &key, boost::python::object fn) { return atomic_updateo(key, fn, opts_t()); } + boost::python::object + atomic_updateo(const std::string &key, + boost::python::object callback, + const opts_t &opts) + { + boost::python::object res; + client->atomic_update(key, + [&] (const std::string &data, uint32_t flags) { + mc::opts_t res_opts; + res = (data.empty() && !flags)? + callback(boost::python::object()): + callback(data_from_string(data, flags)); + return std::make_pair(to_string(res, res_opts), res_opts.flags); + }, opts); + return res; + } // @} /** Register client object methods. @@ -546,6 +518,18 @@ struct opts_from_python_dict { else if (dict.contains("iters")) set_from(cas, dict, "iters"); else set_from(cas, dict, "initial"); + // throw if flags has set bits used flags by the library + if (flags & mc::opts_t::builtin_mask) { + throw mc::error_t(mc::err::bad_argument, + "some lower bits of upper uint16_t of the " + "flags are used by mcache python wrapper"); + } + + // auto compress feature + if (dict.contains("compress")) + if (boost::python::extract<bool>(dict["compress"])) + flags |= mc::opts_t::compress; + // grab pointer to memory into which to construct the new mc::opts_t void *storage = ((boost::python::converter ::rvalue_from_python_storage<mc::opts_t> *)data) diff --git a/python/setup.py b/python/setup.py index dc4151b..34ebd68 100644 --- a/python/setup.py +++ b/python/setup.py @@ -53,6 +53,7 @@ libraries=[boost_python, 'boost_system', 'boost_thread', + 'z', 'mcache'], extra_compile_args=['-W', '-Wall', diff --git a/src/Makefile.am b/src/Makefile.am index 4a68a4e..a8f7939 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,11 +35,11 @@ SRC = mcache.cc logger.cc error.cc init.cc server-proxy.cc \ \ io/connection.cc \ \ - proto/txt.cc proto/binary.cc \ + proto/txt.cc proto/binary.cc proto/zlib.cc \ \ hash/city.cc hash/jenkins.cc hash/murmur3.cc hash/spooky.cc -libmcache_la_LIBADD = @LIBTBB_LIBS@ +libmcache_la_LIBADD = @LIBTBB_LIBS@ @ZLIB_LIBS@ libmcache_la_LDFLAGS = @VERSION_INFO@ libmcache_la_SOURCES = $(SRC) diff --git a/src/proto/binary.cc b/src/proto/binary.cc index cc0eef4..152b217 100644 --- a/src/proto/binary.cc +++ b/src/proto/binary.cc @@ -187,10 +187,12 @@ void retrieve_command_t::set_body(uint32_t &flags, const std::string &data, uint16_t key_len) { - std::copy(data.begin(), data.begin() + sizeof(flags), - reinterpret_cast<char *>(&flags)); - flags = ntohl(flags); - body = data.substr(sizeof(flags) + key_len); + flags = ntohl(*reinterpret_cast<const uint32_t *>(data.data())); + if (flags & opts_t::compress) { + body = zlib::uncompress(data, sizeof(flags) + key_len); + } else { + body = data.substr(sizeof(flags) + key_len); + } } template <bool has_extras> diff --git a/src/proto/txt.cc b/src/proto/txt.cc index 6a43b26..b453f6b 100644 --- a/src/proto/txt.cc +++ b/src/proto/txt.cc @@ -123,14 +123,19 @@ std::string retrieve_command_t::serialize(const char *name) const { return result; } -void retrieve_command_t::set_body(uint32_t &, +void retrieve_command_t::set_body(uint32_t &flags, std::string &body, - const std::string &data) { - body = data; - body.resize(body.size() - footer_size); + const std::string &data) +{ + if (flags & opts_t::compress) { + body = zlib::uncompress(data, 0, data.size() - footer_size); + + } else { + body = data; + body.resize(body.size() - footer_size); + } } - storage_command_t::response_t storage_command_t::deserialize_header(const std::string &header) const { // reject empty response diff --git a/src/proto/zlib.cc b/src/proto/zlib.cc new file mode 100644 index 0000000..fd3e3a9 --- /dev/null +++ b/src/proto/zlib.cc @@ -0,0 +1,89 @@ +/* + * FILE $Id: $ + * + * DESCRIPTION Command serialization/deserialization: zlib stuff. + * + * PROJECT Seznam memcache client. + * + * LICENSE See COPYING + * + * AUTHOR Michal Bukovsky <michal.bukovsky@firma.seznam.cz> + * + * Copyright (C) Seznam.cz a.s. 2012 + * All Rights Reserved + * + * HISTORY + * 2016-03-03 (bukovsky) + * First draft. + */ + +#include <vector> +#include <limits> +#include <zlib.h> + +#include "mcache/error.h" +#include "mcache/proto/zlib.h" + +namespace mc { +namespace proto { +namespace zlib { +namespace { + +template <typename Int_t> +Bytef *b(Int_t *p) { + return reinterpret_cast<Bytef *>(p); +} + +template <typename Int_t> +const Bytef *b(const Int_t *p) { + return reinterpret_cast<const Bytef *>(p); +} + +} // namespace + +std::string compress(const std::string &data) { + // estimate size of compress data and allocate buffer + auto dst_size = ::compressBound(data.size()); + std::vector<uint8_t> dst(dst_size); + if (dst_size > std::numeric_limits<uint32_t>::max()) + throw error_t(mc::err::bad_request, "too large data"); + + // compress + if (::compress(b(&dst[0]), &dst_size, b(data.data()), data.size()) != Z_OK) + throw error_t(mc::err::bad_request, "zlib compress error"); + + // make the result string + uint32_t src_size = static_cast<uint32_t>(data.size()); + auto ipos = reinterpret_cast<char *>(&src_size); + std::string res(ipos, ipos + sizeof(uint32_t)); + res.append(dst.begin(), dst.begin() + dst_size); + return res; +} + +std::string +uncompress(const std::string &data, + std::string::size_type index, + std::string::size_type count) +{ + // fix data boundaries + auto isrc = data.data() + index; + auto src_size = std::min(data.size() - index, count); + + // read size of the uncompressed data and prepare result buffer + uint64_t dst_size = *reinterpret_cast<const uint32_t *>(isrc); + if (dst_size > std::numeric_limits<uint32_t>::max()) + throw error_t(mc::err::bad_request, "too large data"); + isrc += sizeof(uint32_t); + src_size -= sizeof(uint32_t); + std::vector<uint8_t> dst(dst_size); + + // uncompress + if (::uncompress(b(&dst[0]), &dst_size, b(isrc), src_size) != Z_OK) + throw error_t(mc::err::bad_request, "zlib uncompress error"); + return std::string(dst.begin(), dst.begin() + dst_size); +} + +} // namespace zlib +} // namespace proto +} // namespace mc +