From 43e8868875e1d5287979e5b9060a9b16be45cc79 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 22 Feb 2011 16:23:36 +0100 Subject: [PATCH] Added explicit error message in case of memory exhaustion Signed-off-by: Martin Sustrik --- src/connect_session.cpp | 6 +++--- src/ctx.cpp | 6 +++--- src/decoder.hpp | 2 +- src/encoder.hpp | 2 +- src/epoll.cpp | 2 +- src/err.hpp | 20 +++++++++++++------- src/io_thread.cpp | 2 +- src/ip.cpp | 2 +- src/kqueue.cpp | 2 +- src/object.cpp | 4 ++-- src/pgm_receiver.cpp | 1 + src/pgm_sender.cpp | 2 +- src/pgm_socket.cpp | 3 ++- src/pipe.cpp | 8 ++++---- src/reaper.cpp | 2 +- src/session.cpp | 2 -- src/socket_base.cpp | 6 +++--- src/swap.cpp | 13 ++++++++----- src/trie.cpp | 6 +++--- src/yqueue.hpp | 4 ++-- src/zmq.cpp | 8 ++++---- src/zmq_connecter.cpp | 2 +- src/zmq_init.cpp | 6 +++--- src/zmq_listener.cpp | 2 +- 24 files changed, 61 insertions(+), 52 deletions(-) diff --git a/src/connect_session.cpp b/src/connect_session.cpp index 62799b3b..fda39d61 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -56,7 +56,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( io_thread, this, options, protocol.c_str (), address.c_str (), wait_); - zmq_assert (connecter); + alloc_assert (connecter); launch_child (connecter); return; } @@ -77,7 +77,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) // PGM sender. pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( io_thread, options); - zmq_assert (pgm_sender); + alloc_assert (pgm_sender); int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); zmq_assert (rc == 0); @@ -89,7 +89,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) // PGM receiver. pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( io_thread, options); - zmq_assert (pgm_receiver); + alloc_assert (pgm_receiver); int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); zmq_assert (rc == 0); diff --git a/src/ctx.cpp b/src/ctx.cpp index 84f9b037..953516af 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -43,21 +43,21 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : // internal log socket and the zmq_term thread the reaper thread. slot_count = max_sockets + io_threads_ + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); - zmq_assert (slots); + alloc_assert (slots); // Initialise the infrastructure for zmq_term thread. slots [term_tid] = &term_mailbox; // Create the reaper thread. reaper = new (std::nothrow) reaper_t (this, reaper_tid); - zmq_assert (reaper); + alloc_assert (reaper); slots [reaper_tid] = reaper->get_mailbox (); reaper->start (); // Create I/O thread objects and launch them. for (uint32_t i = 2; i != io_threads_ + 2; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); - zmq_assert (io_thread); + alloc_assert (io_thread); io_threads.push_back (io_thread); slots [i] = io_thread->get_mailbox (); io_thread->start (); diff --git a/src/decoder.hpp b/src/decoder.hpp index 80987a81..6d288018 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -54,7 +54,7 @@ namespace zmq bufsize (bufsize_) { buf = (unsigned char*) malloc (bufsize_); - zmq_assert (buf); + alloc_assert (buf); } // The destructor doesn't have to be virtual. It is mad virtual diff --git a/src/encoder.hpp b/src/encoder.hpp index 90517fec..695c9f03 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -44,7 +44,7 @@ namespace zmq bufsize (bufsize_) { buf = (unsigned char*) malloc (bufsize_); - zmq_assert (buf); + alloc_assert (buf); } // The destructor doesn't have to be virtual. It is made virtual diff --git a/src/epoll.cpp b/src/epoll.cpp index ae394071..dcc0f1c5 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -53,7 +53,7 @@ zmq::epoll_t::~epoll_t () zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { poll_entry_t *pe = new (std::nothrow) poll_entry_t; - zmq_assert (pe != NULL); + alloc_assert (pe); // The memset is not actually needed. It's here to prevent debugging // tools to complain about using uninitialised memory. diff --git a/src/err.hpp b/src/err.hpp index 9d8e153a..fc8b7c1f 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -98,7 +98,7 @@ namespace zmq }\ } while (false) -// Provides convenient way to check for POSIX errors. +// Provides convenient way to check for POSIX errors. #define posix_assert(x) \ do {\ if (unlikely (x)) {\ @@ -107,7 +107,7 @@ namespace zmq }\ } while (false) -// Provides convenient way to check for errors from getaddrinfo. +// Provides convenient way to check for errors from getaddrinfo. #define gai_assert(x) \ do {\ if (unlikely (x)) {\ @@ -117,10 +117,16 @@ namespace zmq }\ } while (false) -#endif - -#define zmq_not_implemented() \ +// Provides convenient way to check whether memory allocation have succeeded. +#define alloc_assert(x) \ do {\ - fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\ - abort ();\ + if (unlikely (!x)) {\ + fprintf (stderr, "FATAL ERROR: OUT OF MEMORY (%s:%d)\n",\ + __FILE__, __LINE__);\ + abort ();\ + }\ } while (false) + +#endif + + diff --git a/src/io_thread.cpp b/src/io_thread.cpp index ca768f5c..f819009d 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -30,7 +30,7 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { poller = new (std::nothrow) poller_t; - zmq_assert (poller); + alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (mailbox_handle); diff --git a/src/ip.cpp b/src/ip.cpp index eb05aec2..cab048ad 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -50,7 +50,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) // Allocate memory to get interface names. size_t ifr_size = sizeof (struct lifreq) * ifn.lifn_count; char *ifr = (char*) malloc (ifr_size); - errno_assert (ifr); + alloc_assert (ifr); // Retrieve interface names. lifconf ifc; diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 63c1c420..b442d905 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -79,7 +79,7 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) { poll_entry_t *pe = new (std::nothrow) poll_entry_t; - zmq_assert (pe != NULL); + alloc_assert (pe); pe->fd = fd_; pe->flag_pollin = 0; diff --git a/src/object.cpp b/src/object.cpp index 1376699a..44da6471 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -228,7 +228,7 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, (unsigned char) peer_identity_.size (); cmd.args.attach.peer_identity = (unsigned char*) malloc (peer_identity_.size ()); - zmq_assert (cmd.args.attach.peer_identity_size); + alloc_assert (cmd.args.attach.peer_identity_size); memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), peer_identity_.size ()); } @@ -259,7 +259,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, (unsigned char) peer_identity_.size (); cmd.args.bind.peer_identity = (unsigned char*) malloc (peer_identity_.size ()); - zmq_assert (cmd.args.bind.peer_identity_size); + alloc_assert (cmd.args.bind.peer_identity_size); memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), peer_identity_.size ()); } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index f0b2601a..1b961989 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -212,6 +212,7 @@ void zmq::pgm_receiver_t::in_event () // Create and connect decoder for the peer. it->second.decoder = new (std::nothrow) decoder_t (0); + alloc_assert (it->second.decoder); it->second.decoder->set_inout (inout); } diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 3fb24c1c..3c49a545 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -55,7 +55,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) out_buffer_size = pgm_socket.get_max_tsdu_size (); out_buffer = (unsigned char*) malloc (out_buffer_size); - zmq_assert (out_buffer); + alloc_assert (out_buffer); return rc; } diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 9f96f6f1..4b6c8b9d 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -358,6 +358,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) zmq_assert (pgm_msgv_len); pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); + alloc_assert (pgm_msgv); } return 0; @@ -602,7 +603,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Data loss. if (status == PGM_IO_STATUS_RESET) { - struct pgm_sk_buff_t* skb = pgm_msgv[0].msgv_skb[0]; + struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0]; // Save lost data TSI. *tsi_ = &skb->tsi; diff --git a/src/pipe.cpp b/src/pipe.cpp index 6335690b..9888a7bd 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -182,7 +182,7 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, // Open the swap file, if required. if (swap_size_ > 0) { swap = new (std::nothrow) swap_t (swap_size_); - zmq_assert (swap); + alloc_assert (swap); int rc = swap->init (); zmq_assert (rc == 0); } @@ -399,10 +399,10 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, // writer. The pipe will be handled by reader and writer, its never passed // to the user. Reader and writer are returned to the user. pipe_t *pipe = new (std::nothrow) pipe_t (); - zmq_assert (pipe); + alloc_assert (pipe); *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); - zmq_assert (*reader_); + alloc_assert (*reader_); *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, hwm_, swap_size_); - zmq_assert (*writer_); + alloc_assert (*writer_); } diff --git a/src/reaper.cpp b/src/reaper.cpp index b1e2796e..4710a91c 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -27,7 +27,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : terminating (false) { poller = new (std::nothrow) poller_t; - zmq_assert (poller); + alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (mailbox_handle); diff --git a/src/session.cpp b/src/session.cpp index 645ebf06..be1f39cd 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -17,8 +17,6 @@ along with this program. If not, see . */ -#include - #include "session.hpp" #include "socket_base.hpp" #include "i_engine.hpp" diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f15ab9d8..2d366e84 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -103,7 +103,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, errno = EINVAL; return NULL; } - zmq_assert (s); + alloc_assert (s); return s; } @@ -318,7 +318,7 @@ int zmq::socket_base_t::bind (const char *addr_) // Create and run the listener. zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( io_thread, this, options); - zmq_assert (listener); + alloc_assert (listener); int rc = listener->set_address (protocol.c_str(), address.c_str ()); if (rc != 0) { delete listener; @@ -420,7 +420,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Create session. connect_session_t *session = new (std::nothrow) connect_session_t ( io_thread, this, options, protocol.c_str (), address.c_str ()); - zmq_assert (session); + alloc_assert (session); // If 'immediate connect' feature is required, we'll create the pipes // to the session straight away. Otherwise, they'll be created by the diff --git a/src/swap.cpp b/src/swap.cpp index b1add37b..1ec38272 100644 --- a/src/swap.cpp +++ b/src/swap.cpp @@ -53,10 +53,10 @@ zmq::swap_t::swap_t (int64_t filesize_) : zmq_assert (block_size > 0); buf1 = new (std::nothrow) char [block_size]; - zmq_assert (buf1); + alloc_assert (buf1); buf2 = new (std::nothrow) char [block_size]; - zmq_assert (buf2); + alloc_assert (buf2); read_buf = write_buf = buf1; } @@ -278,7 +278,8 @@ void zmq::swap_t::fill_buf (char *buf, int64_t pos) #ifdef ZMQ_HAVE_WINDOWS int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored); #else - ssize_t rc = read (fd, &buf [octets_stored], octets_total - octets_stored); + ssize_t rc = read (fd, &buf [octets_stored], + octets_total - octets_stored); #endif errno_assert (rc > 0); octets_stored += rc; @@ -302,9 +303,11 @@ void zmq::swap_t::save_write_buf () while (octets_stored < octets_total) { #ifdef ZMQ_HAVE_WINDOWS - int rc = _write (fd, &write_buf [octets_stored], octets_total - octets_stored); + int rc = _write (fd, &write_buf [octets_stored], + octets_total - octets_stored); #else - ssize_t rc = write (fd, &write_buf [octets_stored], octets_total - octets_stored); + ssize_t rc = write (fd, &write_buf [octets_stored], + octets_total - octets_stored); #endif errno_assert (rc > 0); octets_stored += rc; diff --git a/src/trie.cpp b/src/trie.cpp index b1b4d95c..4d0db162 100644 --- a/src/trie.cpp +++ b/src/trie.cpp @@ -73,7 +73,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_) count = (min < c ? c - min : min - c) + 1; next.table = (trie_t**) malloc (sizeof (trie_t*) * count); - zmq_assert (next.table); + alloc_assert (next.table); for (unsigned short i = 0; i != count; ++i) next.table [i] = 0; min = std::min (min, c); @@ -110,14 +110,14 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_) if (count == 1) { if (!next.node) { next.node = new (std::nothrow) trie_t; - zmq_assert (next.node); + alloc_assert (next.node); } next.node->add (prefix_ + 1, size_ - 1); } else { if (!next.table [c - min]) { next.table [c - min] = new (std::nothrow) trie_t; - zmq_assert (next.table [c - min]); + alloc_assert (next.table [c - min]); } next.table [c - min]->add (prefix_ + 1, size_ - 1); } diff --git a/src/yqueue.hpp b/src/yqueue.hpp index 10889064..d756f4eb 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -50,7 +50,7 @@ namespace zmq inline yqueue_t () { begin_chunk = (chunk_t*) malloc (sizeof (chunk_t)); - zmq_assert (begin_chunk); + alloc_assert (begin_chunk); begin_pos = 0; back_chunk = NULL; back_pos = 0; @@ -105,7 +105,7 @@ namespace zmq sc->prev = end_chunk; } else { end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t)); - zmq_assert (end_chunk->next); + alloc_assert (end_chunk->next); end_chunk->next->prev = end_chunk; } end_chunk = end_chunk->next; diff --git a/src/zmq.cpp b/src/zmq.cpp index 037f6c20..01f55543 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -115,7 +115,7 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_) { msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); - zmq_assert (msg_->content); + alloc_assert (msg_->content); msg_->flags = 0; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; content->data = data_; @@ -255,7 +255,7 @@ void *zmq_init (int io_threads_) // Create 0MQ context. zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); - zmq_assert (ctx); + alloc_assert (ctx); return (void*) ctx; } @@ -403,7 +403,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) uint64_t end = 0; pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); - zmq_assert (pollfds); + alloc_assert (pollfds); // Build pollset for poll () system call. for (int i = 0; i != nitems_; i++) { @@ -761,7 +761,7 @@ void zmq_sleep (int seconds_) void *zmq_stopwatch_start () { uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t)); - assert (watch); + alloc_assert (watch); *watch = zmq::clock_t::now_us (); return (void*) watch; } diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 57a6c3b5..6545149d 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -93,7 +93,7 @@ void zmq::zmq_connecter_t::out_event () // Create an init object. zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, session, fd, options); - zmq_assert (init); + alloc_assert (init); launch_sibling (init); // Shut the connecter down. diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index afb40119..747e1b41 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -43,7 +43,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, { // Create the engine object for this connection. engine = new (std::nothrow) zmq_engine_t (fd_, options); - zmq_assert (engine); + alloc_assert (engine); } zmq::zmq_init_t::~zmq_init_t () @@ -180,7 +180,7 @@ void zmq::zmq_init_t::dispatch_engine () if (peer_identity [0] == 0) { session = new (std::nothrow) transient_session_t (io_thread, socket, options); - zmq_assert (session); + alloc_assert (session); session->inc_seqnum (); launch_sibling (session); send_attach (session, ephemeral_engine, peer_identity, false); @@ -205,7 +205,7 @@ void zmq::zmq_init_t::dispatch_engine () // being attached. session = new (std::nothrow) named_session_t (io_thread, socket, options, peer_identity); - zmq_assert (session); + alloc_assert (session); session->inc_seqnum (); launch_sibling (session); send_attach (session, ephemeral_engine, peer_identity, false); diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 2a7f1eb4..06f76bf0 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -71,7 +71,7 @@ void zmq::zmq_listener_t::in_event () // Create and launch an init object. zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket, NULL, fd, options); - zmq_assert (init); + alloc_assert (init); launch_child (init); }