Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC, WIP] Global memorypool implementation #3644

Closed
wants to merge 14 commits into from
Closed
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ include_HEADERS = \
src_libzmq_la_SOURCES = \
src/address.cpp \
src/address.hpp \
src/allocator.cpp \
src/allocator.hpp \
src/array.hpp \
src/atomic_counter.hpp \
src/atomic_ptr.hpp \
Expand Down
14 changes: 14 additions & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ typedef void(zmq_free_fn) (void *data_, void *hint_);

ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_);
ZMQ_EXPORT int
zmq_msg_init_allocator (zmq_msg_t *msg_, size_t size_, void *allocator_);
ZMQ_EXPORT int zmq_msg_init_data (
zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_);
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_);
Expand Down Expand Up @@ -669,6 +671,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);

/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
//#define ZMQ_MSG_ALLOCATOR 11

/* DRAFT Context methods. */
ZMQ_EXPORT int zmq_ctx_set_ext (void *context_,
Expand All @@ -680,6 +683,17 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_,
void *optval_,
size_t *optvallen_);

/* ZMQ-provided message-pool implementations. */
// default allocator using malloc/free
#define ZMQ_MSG_ALLOCATOR_DEFAULT 0
// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
#define ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should drop this for now and stay with the global pool

// using internally a MPMC queue
#define ZMQ_MSG_ALLOCATOR_GLOBAL_POOL 2

ZMQ_EXPORT void *zmq_msg_allocator_new (int type_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can drop the msg from the class name?

ZMQ_EXPORT int zmq_msg_allocator_destroy (void **allocator_);

/* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group);
ZMQ_EXPORT int zmq_leave (void *s, const char *group);
Expand Down
16 changes: 16 additions & 0 deletions perf/remote_thr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "../src/platform.hpp"
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -67,6 +68,11 @@ int main (int argc, char *argv[])
return -1;
}

#ifdef ZMQ_BUILD_DRAFT_API
// EXPERIMENTAL ALLOCATOR FOR MSG_T
void *allocator = zmq_msg_allocator_new (ZMQ_MSG_ALLOCATOR_GLOBAL_POOL);
#endif

s = zmq_socket (ctx, ZMQ_PUSH);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
Expand Down Expand Up @@ -105,7 +111,11 @@ int main (int argc, char *argv[])
}

for (i = 0; i != message_count; i++) {
#ifdef ZMQ_BUILD_DRAFT_API
rc = zmq_msg_init_allocator (&msg, message_size, allocator);
#else
rc = zmq_msg_init_size (&msg, message_size);
#endif
if (rc != 0) {
printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
return -1;
Expand Down Expand Up @@ -134,5 +144,11 @@ int main (int argc, char *argv[])
return -1;
}

#ifdef ZMQ_BUILD_DRAFT_API
// IMPORTANT: destroy the allocator only after zmq_ctx_term() since otherwise
// some zmq_msg_t may still be "in fly"
zmq_msg_allocator_destroy (&allocator);
#endif

return 0;
}
97 changes: 97 additions & 0 deletions src/allocator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file

This file is part of libzmq, the ZeroMQ core engine in C++.

libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.

As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.

libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "precompiled.hpp"
#include "allocator.hpp"


zmq::allocator_t::allocator_t ()
{
_type = ZMQ_MSG_ALLOCATOR_DEFAULT;
_tag = 0xCAFEEBEB;
}

size_t zmq::allocator_t::size () const
{
switch (_type) {
case ZMQ_MSG_ALLOCATOR_DEFAULT:
return 0;

// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL:
return 0;

// using internally a MPMC queue
case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL:
return _global_pool.size ();

default:
return 0;
}
}


void *zmq::allocator_t::allocate (size_t len)
{
switch (_type) {
case ZMQ_MSG_ALLOCATOR_DEFAULT:
return malloc (len);

// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL:
// FIXME
return NULL;

// using internally a MPMC queue
case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL:
return _global_pool.allocate_msg (len);
}
return NULL;
}

void zmq::allocator_t::deallocate_msg (void *data_, void *hint_)
{
allocator_t *alloc = reinterpret_cast<allocator_t *> (hint_);
switch (alloc->_type) {
case ZMQ_MSG_ALLOCATOR_DEFAULT:
free (data_);
return;

// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL:
// FIXME
return;

// using internally a MPMC queue
case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL:
zmq::msg_t::content_t *msg_content =
(zmq::msg_t::content_t *) data_;
alloc->_global_pool.deallocate_msg (msg_content, msg_content->size);
}
}
197 changes: 197 additions & 0 deletions src/allocator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file

This file is part of libzmq, the ZeroMQ core engine in C++.

libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.

As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.

libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef __ZMQ_MEMORYPOOL_HPP_INCLUDED__
#define __ZMQ_MEMORYPOOL_HPP_INCLUDED__

#include <vector>
#include "msg.hpp"
#include "concurrentqueue.h"

// FIXME: we need to grow dynamically the mempool
#define MAX_ACTIVE_MESSAGES (16384)

namespace zmq
{
class global_memory_pool_t
{
typedef struct
{
size_t num_msgs;
// actual user data
uint8_t *raw_data;
} msg_block_t;

typedef enum
{
MsgBlock_SizeClass_256 = 0, // for messages up to 256B long
MsgBlock_SizeClass_512,
MsgBlock_SizeClass_1024,
MsgBlock_SizeClass_2048,
MsgBlock_SizeClass_4096,
MsgBlock_SizeClass_8192,

MsgBlock_NumSizeClasses
} MsgBlock_e;

inline size_t MsgBlockToBytes (MsgBlock_e block_class)
{
switch (block_class) {
case MsgBlock_SizeClass_256:
return 256;
case MsgBlock_SizeClass_512:
return 512;
case MsgBlock_SizeClass_1024:
return 1024;
case MsgBlock_SizeClass_2048:
return 2048;
case MsgBlock_SizeClass_4096:
return 4096;
case MsgBlock_SizeClass_8192:
return 8192;
default:
return 0;
}
}
inline MsgBlock_e BytesToMsgBlock (size_t n)
{
if (n < 256)
return MsgBlock_SizeClass_256;
else if (n < 512)
return MsgBlock_SizeClass_512;
else if (n < 1024)
return MsgBlock_SizeClass_1024;
else if (n < 2048)
return MsgBlock_SizeClass_2048;
else if (n < 4096)
return MsgBlock_SizeClass_4096;
else if (n < 8192)
return MsgBlock_SizeClass_8192;

// size too big
return MsgBlock_NumSizeClasses;
}

public:
global_memory_pool_t ()
{
// enqueue all available blocks in the free list:
for (int i = 0; i < MsgBlock_NumSizeClasses; i++) {
size_t msg_size = MsgBlockToBytes ((MsgBlock_e) i);

m_storage[i].num_msgs = MAX_ACTIVE_MESSAGES;
m_storage[i].raw_data =
(uint8_t *) malloc (MAX_ACTIVE_MESSAGES * msg_size);

uint8_t *msg_memory = m_storage[i].raw_data;
for (int j = 0; j < MAX_ACTIVE_MESSAGES; j++) {
m_free_list[i].enqueue (msg_memory);
msg_memory += msg_size;
}
}
}
~global_memory_pool_t ()
{
// deallocate all message classes
for (int i = 0; i < MsgBlock_NumSizeClasses; i++) {
free (m_storage[i].raw_data);
m_storage[i].raw_data = NULL;
}
}

void *allocate_msg (size_t len) // consumer thread: user app thread
{
MsgBlock_e bl = BytesToMsgBlock (len);
assert (bl != MsgBlock_NumSizeClasses);

// consume 1 block from the list of free msg
uint8_t *next_avail = nullptr;
if (!m_free_list[bl].try_dequeue (next_avail)) {
assert (0); // I want to find out if this ever happens
return NULL;
}

assert (next_avail);
return next_avail;
}

void
deallocate_msg (void *data_,
size_t len) // producer thread: ZMQ background IO thread
{
MsgBlock_e bl = BytesToMsgBlock (len);
assert (bl != MsgBlock_NumSizeClasses);

// produce a new free msg:
m_free_list[bl].enqueue ((uint8_t *) data_);
}

size_t size () const
{
size_t acc = 0;
for (int i = 0; i < MsgBlock_NumSizeClasses; i++)
acc += m_free_list[i].size_approx ();
return acc;
}

private:
msg_block_t m_storage[MsgBlock_NumSizeClasses];
moodycamel::ConcurrentQueue<uint8_t *> m_free_list[MsgBlock_NumSizeClasses];
};

class allocator_t
{
public:
allocator_t ();
~allocator_t ()
{
// Mark this instance as dead
_tag = 0xdeadbeef;
}

void init (int type_) { _type = type_; }

// allocate() gets called by the consumer thread: the user app thread
void *allocate (size_t len);

// deallocate_msg() gets called by the producer thread: the ZMQ background IO thread
static void deallocate_msg (void *data_, void *hint_);

size_t size () const;
bool check_tag () const { return _tag == 0xCAFEEBEB; }


private:
int _type;
uint32_t _tag;
global_memory_pool_t _global_pool;
};
}

#endif
Loading