-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC, WIP] Global memorypool implementation #3644
Closed
Closed
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
4f84758
Implement a very simple zero-lock message pool to test performance gains
f18m ea0dc06
Merge branch 'master' of https://github.com/f18m/libzmq.git
f18m a24f2af
Allow to choose message sizes as well
f18m 1bd2ae1
Allow using env variables to do some basic overriding
f18m 252e8d4
fix typo
f18m 4a30795
add TCP kernel socket buffer setting
f18m 577232e
Merge remote-tracking branch 'upstream/master'
f18m ff8d79f
Merge remote-tracking branch 'upstream/master'
f18m 00e514e
First implementation of global memory pool for ZMQ
f18m 18c52c4
Remove changes related to graph generation
f18m a720a31
allow testing up to 8k msg sizes
f18m b9e1f01
correctly deallocate memory pool blocks
f18m 1649701
fix build with no draft API
f18m 0baafa4
never use allocator for VSM
f18m File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,6 +268,8 @@ typedef void(zmq_free_fn) (void *data_, void *hint_); | |
|
||
ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_); | ||
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_); | ||
ZMQ_EXPORT int | ||
zmq_msg_init_allocator (zmq_msg_t *msg_, size_t size_, void *allocator_); | ||
ZMQ_EXPORT int zmq_msg_init_data ( | ||
zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_); | ||
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_); | ||
|
@@ -669,6 +671,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); | |
|
||
/* DRAFT Context options */ | ||
#define ZMQ_ZERO_COPY_RECV 10 | ||
//#define ZMQ_MSG_ALLOCATOR 11 | ||
|
||
/* DRAFT Context methods. */ | ||
ZMQ_EXPORT int zmq_ctx_set_ext (void *context_, | ||
|
@@ -680,6 +683,17 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_, | |
void *optval_, | ||
size_t *optvallen_); | ||
|
||
/* ZMQ-provided message-pool implementations. */ | ||
// default allocator using malloc/free | ||
#define ZMQ_MSG_ALLOCATOR_DEFAULT 0 | ||
// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway | ||
#define ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL 1 | ||
// using internally a MPMC queue | ||
#define ZMQ_MSG_ALLOCATOR_GLOBAL_POOL 2 | ||
|
||
ZMQ_EXPORT void *zmq_msg_allocator_new (int type_); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we can drop the msg from the class name? |
||
ZMQ_EXPORT int zmq_msg_allocator_destroy (void **allocator_); | ||
|
||
/* DRAFT Socket methods. */ | ||
ZMQ_EXPORT int zmq_join (void *s, const char *group); | ||
ZMQ_EXPORT int zmq_leave (void *s, const char *group); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file | ||
|
||
This file is part of libzmq, the ZeroMQ core engine in C++. | ||
|
||
libzmq is free software; you can redistribute it and/or modify it under | ||
the terms of the GNU Lesser General Public License (LGPL) as published | ||
by the Free Software Foundation; either version 3 of the License, or | ||
(at your option) any later version. | ||
|
||
As a special exception, the Contributors give you permission to link | ||
this library with independent modules to produce an executable, | ||
regardless of the license terms of these independent modules, and to | ||
copy and distribute the resulting executable under terms of your choice, | ||
provided that you also meet, for each linked independent module, the | ||
terms and conditions of the license of that module. An independent | ||
module is a module which is not derived from or based on this library. | ||
If you modify this library, you must extend this exception to your | ||
version of the library. | ||
|
||
libzmq is distributed in the hope that it will be useful, but WITHOUT | ||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
License for more details. | ||
|
||
You should have received a copy of the GNU Lesser General Public License | ||
along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
#include "precompiled.hpp" | ||
#include "allocator.hpp" | ||
|
||
|
||
zmq::allocator_t::allocator_t () | ||
{ | ||
_type = ZMQ_MSG_ALLOCATOR_DEFAULT; | ||
_tag = 0xCAFEEBEB; | ||
} | ||
|
||
size_t zmq::allocator_t::size () const | ||
{ | ||
switch (_type) { | ||
case ZMQ_MSG_ALLOCATOR_DEFAULT: | ||
return 0; | ||
|
||
// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway | ||
case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL: | ||
return 0; | ||
|
||
// using internally a MPMC queue | ||
case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL: | ||
return _global_pool.size (); | ||
|
||
default: | ||
return 0; | ||
} | ||
} | ||
|
||
|
||
void *zmq::allocator_t::allocate (size_t len) | ||
{ | ||
switch (_type) { | ||
case ZMQ_MSG_ALLOCATOR_DEFAULT: | ||
return malloc (len); | ||
|
||
// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway | ||
case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL: | ||
// FIXME | ||
return NULL; | ||
|
||
// using internally a MPMC queue | ||
case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL: | ||
return _global_pool.allocate_msg (len); | ||
} | ||
return NULL; | ||
} | ||
|
||
void zmq::allocator_t::deallocate_msg (void *data_, void *hint_) | ||
{ | ||
allocator_t *alloc = reinterpret_cast<allocator_t *> (hint_); | ||
switch (alloc->_type) { | ||
case ZMQ_MSG_ALLOCATOR_DEFAULT: | ||
free (data_); | ||
return; | ||
|
||
// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway | ||
case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL: | ||
// FIXME | ||
return; | ||
|
||
// using internally a MPMC queue | ||
case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL: | ||
zmq::msg_t::content_t *msg_content = | ||
(zmq::msg_t::content_t *) data_; | ||
alloc->_global_pool.deallocate_msg (msg_content, msg_content->size); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
/* | ||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file | ||
|
||
This file is part of libzmq, the ZeroMQ core engine in C++. | ||
|
||
libzmq is free software; you can redistribute it and/or modify it under | ||
the terms of the GNU Lesser General Public License (LGPL) as published | ||
by the Free Software Foundation; either version 3 of the License, or | ||
(at your option) any later version. | ||
|
||
As a special exception, the Contributors give you permission to link | ||
this library with independent modules to produce an executable, | ||
regardless of the license terms of these independent modules, and to | ||
copy and distribute the resulting executable under terms of your choice, | ||
provided that you also meet, for each linked independent module, the | ||
terms and conditions of the license of that module. An independent | ||
module is a module which is not derived from or based on this library. | ||
If you modify this library, you must extend this exception to your | ||
version of the library. | ||
|
||
libzmq is distributed in the hope that it will be useful, but WITHOUT | ||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | ||
License for more details. | ||
|
||
You should have received a copy of the GNU Lesser General Public License | ||
along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
#ifndef __ZMQ_MEMORYPOOL_HPP_INCLUDED__ | ||
#define __ZMQ_MEMORYPOOL_HPP_INCLUDED__ | ||
|
||
#include <vector> | ||
#include "msg.hpp" | ||
#include "concurrentqueue.h" | ||
|
||
// FIXME: we need to grow dynamically the mempool | ||
#define MAX_ACTIVE_MESSAGES (16384) | ||
|
||
namespace zmq | ||
{ | ||
class global_memory_pool_t | ||
{ | ||
typedef struct | ||
{ | ||
size_t num_msgs; | ||
// actual user data | ||
uint8_t *raw_data; | ||
} msg_block_t; | ||
|
||
typedef enum | ||
{ | ||
MsgBlock_SizeClass_256 = 0, // for messages up to 256B long | ||
MsgBlock_SizeClass_512, | ||
MsgBlock_SizeClass_1024, | ||
MsgBlock_SizeClass_2048, | ||
MsgBlock_SizeClass_4096, | ||
MsgBlock_SizeClass_8192, | ||
|
||
MsgBlock_NumSizeClasses | ||
} MsgBlock_e; | ||
|
||
inline size_t MsgBlockToBytes (MsgBlock_e block_class) | ||
{ | ||
switch (block_class) { | ||
case MsgBlock_SizeClass_256: | ||
return 256; | ||
case MsgBlock_SizeClass_512: | ||
return 512; | ||
case MsgBlock_SizeClass_1024: | ||
return 1024; | ||
case MsgBlock_SizeClass_2048: | ||
return 2048; | ||
case MsgBlock_SizeClass_4096: | ||
return 4096; | ||
case MsgBlock_SizeClass_8192: | ||
return 8192; | ||
default: | ||
return 0; | ||
} | ||
} | ||
inline MsgBlock_e BytesToMsgBlock (size_t n) | ||
{ | ||
if (n < 256) | ||
return MsgBlock_SizeClass_256; | ||
else if (n < 512) | ||
return MsgBlock_SizeClass_512; | ||
else if (n < 1024) | ||
return MsgBlock_SizeClass_1024; | ||
else if (n < 2048) | ||
return MsgBlock_SizeClass_2048; | ||
else if (n < 4096) | ||
return MsgBlock_SizeClass_4096; | ||
else if (n < 8192) | ||
return MsgBlock_SizeClass_8192; | ||
|
||
// size too big | ||
return MsgBlock_NumSizeClasses; | ||
} | ||
|
||
public: | ||
global_memory_pool_t () | ||
{ | ||
// enqueue all available blocks in the free list: | ||
for (int i = 0; i < MsgBlock_NumSizeClasses; i++) { | ||
size_t msg_size = MsgBlockToBytes ((MsgBlock_e) i); | ||
|
||
m_storage[i].num_msgs = MAX_ACTIVE_MESSAGES; | ||
m_storage[i].raw_data = | ||
(uint8_t *) malloc (MAX_ACTIVE_MESSAGES * msg_size); | ||
|
||
uint8_t *msg_memory = m_storage[i].raw_data; | ||
for (int j = 0; j < MAX_ACTIVE_MESSAGES; j++) { | ||
m_free_list[i].enqueue (msg_memory); | ||
msg_memory += msg_size; | ||
} | ||
} | ||
} | ||
~global_memory_pool_t () | ||
{ | ||
// deallocate all message classes | ||
for (int i = 0; i < MsgBlock_NumSizeClasses; i++) { | ||
free (m_storage[i].raw_data); | ||
m_storage[i].raw_data = NULL; | ||
} | ||
} | ||
|
||
void *allocate_msg (size_t len) // consumer thread: user app thread | ||
{ | ||
MsgBlock_e bl = BytesToMsgBlock (len); | ||
assert (bl != MsgBlock_NumSizeClasses); | ||
|
||
// consume 1 block from the list of free msg | ||
uint8_t *next_avail = nullptr; | ||
if (!m_free_list[bl].try_dequeue (next_avail)) { | ||
assert (0); // I want to find out if this ever happens | ||
return NULL; | ||
} | ||
|
||
assert (next_avail); | ||
return next_avail; | ||
} | ||
|
||
void | ||
deallocate_msg (void *data_, | ||
size_t len) // producer thread: ZMQ background IO thread | ||
{ | ||
MsgBlock_e bl = BytesToMsgBlock (len); | ||
assert (bl != MsgBlock_NumSizeClasses); | ||
|
||
// produce a new free msg: | ||
m_free_list[bl].enqueue ((uint8_t *) data_); | ||
} | ||
|
||
size_t size () const | ||
{ | ||
size_t acc = 0; | ||
for (int i = 0; i < MsgBlock_NumSizeClasses; i++) | ||
acc += m_free_list[i].size_approx (); | ||
return acc; | ||
} | ||
|
||
private: | ||
msg_block_t m_storage[MsgBlock_NumSizeClasses]; | ||
moodycamel::ConcurrentQueue<uint8_t *> m_free_list[MsgBlock_NumSizeClasses]; | ||
}; | ||
|
||
class allocator_t | ||
{ | ||
public: | ||
allocator_t (); | ||
~allocator_t () | ||
{ | ||
// Mark this instance as dead | ||
_tag = 0xdeadbeef; | ||
} | ||
|
||
void init (int type_) { _type = type_; } | ||
|
||
// allocate() typically gets called by the consumer thread: the user app thread(s) | ||
void *allocate (size_t len); | ||
|
||
// deallocate_msg() typically gets called by the producer thread: the ZMQ background IO thread(s) | ||
static void deallocate_msg (void *data_, void *hint_); | ||
|
||
size_t size () const; | ||
bool check_tag () const { return _tag == 0xCAFEEBEB; } | ||
|
||
|
||
private: | ||
int _type; | ||
uint32_t _tag; | ||
global_memory_pool_t _global_pool; | ||
}; | ||
} | ||
|
||
#endif |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should drop this for now and stay with the global pool