diff --git a/bin_interface.c b/bin_interface.c index c0291602bbc..b6e2e50781d 100644 --- a/bin_interface.c +++ b/bin_interface.c @@ -37,7 +37,7 @@ void set_len(bin_packet_t *packet) { } /** - * bin_init - begins the construction of a new binary packet (header part): + * _bin_init - begins the construction of a new binary packet (header part): * * +-----------------------------+-------------------------------------------------------+ * | 12-byte HEADER | BODY max 65535 bytes | @@ -48,8 +48,8 @@ void set_len(bin_packet_t *packet) { * @param: { LEN, CAP } + CMD + VERSION * @length: initial packet size. specify 0 to use the default size (BIN_MAX_BUF_LEN) */ -int bin_init(bin_packet_t *packet, str *capability, int packet_type, - short version, int length) +int _bin_init(bin_packet_t *packet, str *capability, int packet_type, + short version, int length, int use_sysmalloc) { if (length != 0 && length < MIN_BIN_PACKET_SIZE + capability->len) { LM_ERR("Length parameter has to be greater than: %zu\n", @@ -60,14 +60,20 @@ int bin_init(bin_packet_t *packet, str *capability, int packet_type, if (!length) length = BIN_MAX_BUF_LEN; - packet->type = packet_type; - - packet->buffer.s = pkg_malloc(length); + if (use_sysmalloc) { + packet->buffer.s = malloc(length); + packet->flags = BINFL_SYSMEM; + } else { + packet->buffer.s = pkg_malloc(length); + packet->flags = 0; + } if (!packet->buffer.s) { LM_ERR("No more pkg memory!\n"); return -1; } + packet->buffer.len = 0; + packet->type = packet_type; packet->size = length; /* binary packet header: marker + pkg_len */ @@ -112,6 +118,7 @@ void bin_init_buffer(bin_packet_t *packet, char *buffer, int length) packet->buffer.len = length; packet->buffer.s = buffer; packet->size = length; + packet->flags = 0; bin_get_capability(packet, &capability); @@ -441,6 +448,7 @@ void call_callbacks(char* buffer, struct receive_info *rcv) packet.buffer.len = pkg_len; packet.size = pkg_len + 50; + packet.flags = 0; memcpy(packet.buffer.s, buffer, pkg_len); bin_get_capability(&packet, &capability); @@ -481,7 +489,9 @@ static int bin_extend(bin_packet_t *packet, int size) else packet->size = 2 * required; - packet->buffer.s = pkg_realloc(packet->buffer.s, packet->size); + packet->buffer.s = (packet->flags & BINFL_SYSMEM) ? + realloc(packet->buffer.s, packet->size) : + pkg_realloc(packet->buffer.s, packet->size); if (!packet->buffer.s) { LM_ERR("pkg realloc failed\n"); return -1; @@ -493,7 +503,10 @@ static int bin_extend(bin_packet_t *packet, int size) void bin_free_packet(bin_packet_t *packet) { if (packet->buffer.s) { - pkg_free(packet->buffer.s); + if (packet->flags & BINFL_SYSMEM) + free(packet->buffer.s); + else + pkg_free(packet->buffer.s); packet->buffer.s = NULL; } else { LM_INFO("atempting to free uninitialized binary packet\n"); diff --git a/bin_interface.h b/bin_interface.h index 61b54b8b24d..1db49240e91 100644 --- a/bin_interface.h +++ b/bin_interface.h @@ -59,11 +59,16 @@ } while (0) #define ensure_bin_version(pkt, needed) _ensure_bin_version(pkt, needed, "") +typedef unsigned bin_packet_flags_t; +#define BINFL_SYSMEM (1U<<0) + typedef struct bin_packet { str buffer; char *front_pointer; + struct bin_packet *next; int size; int type; + bin_packet_flags_t flags; /* not populated by bin_interface */ int src_id; } bin_packet_t; @@ -123,8 +128,10 @@ int bin_register_cb(str *cap, void (*cb)(bin_packet_t *, int, * * @return: 0 on success */ -int bin_init(bin_packet_t *packet, str *capability, int packet_type, short version, - int length); +int _bin_init(bin_packet_t *packet, str *capability, int packet_type, short version, + int length, int use_sysmalloc); +#define bin_init(_pk, _cap, _pt, _ver, _len) \ + _bin_init(_pk, _cap, _pt, _ver, _len, 0) /** * function called to build a binary packet with a known buffer diff --git a/modules/clusterer/sync.c b/modules/clusterer/sync.c index 43d6fd26717..0e37046d28c 100644 --- a/modules/clusterer/sync.c +++ b/modules/clusterer/sync.c @@ -30,9 +30,11 @@ int sync_packet_size = DEFAULT_SYNC_PACKET_SIZE; int _sync_from_id = 0; -static bin_packet_t *sync_packet_snd; +static bin_packet_t *sync_packet_last; static int sync_prev_buf_len; static int *sync_last_chunk_sz; +static bin_packet_t *sync_packets; +static unsigned sync_packets_cnt; int send_sync_req(str *capability, int cluster_id, int source_id) { @@ -204,8 +206,8 @@ bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id, int aloc_new_pkt = 0; bin_packet_t *new_packet = NULL; - if (sync_packet_snd) { - bin_get_buffer(sync_packet_snd, &bin_buffer); + if (sync_packet_last) { + bin_get_buffer(sync_packet_last, &bin_buffer); prev_chunk_size = bin_buffer.len - sync_prev_buf_len; /* assume this chunk will have aprox the same size as the previous one * and check if there is enough space in the packet */ @@ -215,62 +217,59 @@ bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id, aloc_new_pkt = 1; if (aloc_new_pkt) { /* next chunk will be in a new packet */ - if (sync_packet_snd) { + if (sync_packet_last) { *sync_last_chunk_sz = prev_chunk_size; - /* send and free the previous packet */ - msg_add_trailer(sync_packet_snd, cluster_id, dst_id); - - if (clusterer_send_msg(sync_packet_snd, cluster_id, dst_id, 0, - 1 /* we should be in a SYNC_REQ_RCV callback here so - * already locked*/) < 0) - LM_ERR("Failed to send sync packet\n"); - - bin_free_packet(sync_packet_snd); - pkg_free(sync_packet_snd); - sync_packet_snd = NULL; + /* properly end the previous packet (to be sent later) */ + msg_add_trailer(sync_packet_last, cluster_id, dst_id); sync_last_chunk_sz = NULL; } - new_packet = pkg_malloc(sizeof *new_packet); + new_packet = malloc(sizeof *new_packet); if (!new_packet) { LM_ERR("No more pkg memory\n"); return NULL; } + new_packet->next = NULL; - if (bin_init(new_packet,&cl_extra_cap,CLUSTERER_SYNC,BIN_SYNC_VERSION,0)<0) { + if (_bin_init(new_packet,&cl_extra_cap,CLUSTERER_SYNC,BIN_SYNC_VERSION,0,1)<0) { LM_ERR("Failed to init bin packet\n"); - pkg_free(new_packet); + free(new_packet); return NULL; } bin_push_str(new_packet, capability); bin_push_int(new_packet, data_version); - sync_packet_snd = new_packet; + if (sync_packet_last) + sync_packet_last->next = new_packet; + else + sync_packets = new_packet; + sync_packet_last = new_packet; + sync_packets_cnt++; } if (sync_last_chunk_sz) *sync_last_chunk_sz = prev_chunk_size; /* reserve and remember a holder for the upcoming data chunk size */ - bin_get_buffer(sync_packet_snd, &bin_buffer); - bin_push_int(sync_packet_snd, 0); + bin_get_buffer(sync_packet_last, &bin_buffer); + bin_push_int(sync_packet_last, 0); sync_last_chunk_sz = (int *)(bin_buffer.s + bin_buffer.len); - bin_push_int(sync_packet_snd, SYNC_CHUNK_START_MARKER); + bin_push_int(sync_packet_last, SYNC_CHUNK_START_MARKER); - bin_get_buffer(sync_packet_snd, &bin_buffer); + bin_get_buffer(sync_packet_last, &bin_buffer); sync_prev_buf_len = bin_buffer.len; no_sync_chunks_sent++; - return sync_packet_snd; + return sync_packet_last; } int no_sync_chunks_iter; -/* this mechanism allows modules to ignore all or part of a sync chunk - * without disrupting the sequencing / consuming of the remaining data */ +/* this mechanism allows modules to ignore all or part of a sync chunk on the + * receiver node, without affecting their consuming of remaining sync chunks */ char *next_data_chunk; int cl_sync_chunk_iter(bin_packet_t *packet) @@ -324,21 +323,18 @@ int cl_sync_chunk_iter(bin_packet_t *packet) void send_sync_repl(int sender, void *param) { - bin_packet_t sync_end_pkt; + bin_packet_t sync_end_pkt, *pkt, *next_pkt; str bin_buffer; struct local_cap *cap; - int rc, cluster_id; + int rc, cluster_id, pkt_no; struct reply_rpc_params *p = (struct reply_rpc_params *)param; - lock_start_read(cl_list_lock); - for (cap = p->cluster->capabilities; cap; cap = cap->next) if (!str_strcmp(&p->cap_name, &cap->reg.name)) break; if (!cap) { LM_ERR("Sync request for unknown capability: %.*s\n", p->cap_name.len, p->cap_name.s); - lock_stop_read(cl_list_lock); return; } @@ -346,20 +342,30 @@ void send_sync_repl(int sender, void *param) cap->reg.event_cb(SYNC_REQ_RCV, p->node_id); - if (sync_packet_snd) { - bin_get_buffer(sync_packet_snd, &bin_buffer); + lock_start_read(cl_list_lock); + + if (sync_packets) { + bin_get_buffer(sync_packet_last, &bin_buffer); *sync_last_chunk_sz = bin_buffer.len - sync_prev_buf_len; /* send and free the lastly built packet */ - msg_add_trailer(sync_packet_snd, p->cluster->cluster_id, p->node_id); + msg_add_trailer(sync_packet_last, p->cluster->cluster_id, p->node_id); + + for (pkt = sync_packets; pkt; pkt = next_pkt) { + next_pkt = pkt->next; - if ((rc = clusterer_send_msg(sync_packet_snd, p->cluster->cluster_id, - p->node_id, 0, 1))<0) - LM_ERR("Failed to send sync packet, rc=%d\n", rc); + if ((rc = clusterer_send_msg(pkt, p->cluster->cluster_id, + p->node_id, 0, 1))<0) + LM_ERR("Failed to send sync packet, rc=%d\n", rc); + + bin_free_packet(pkt); + free(pkt); + } - bin_free_packet(sync_packet_snd); - pkg_free(sync_packet_snd); - sync_packet_snd = NULL; + sync_packets = NULL; + pkt_no = sync_packets_cnt; + sync_packets_cnt = 0; + sync_packet_last = NULL; sync_last_chunk_sz = NULL; } @@ -386,8 +392,8 @@ void send_sync_repl(int sender, void *param) bin_free_packet(&sync_end_pkt); - LM_INFO("Sent all sync packets for capability '%.*s' to node %d, cluster " - "%d\n", p->cap_name.len, p->cap_name.s, p->node_id, cluster_id); + LM_INFO("Sent all sync packets (%d) for capability '%.*s' to node %d, cluster " + "%d\n", pkt_no, p->cap_name.len, p->cap_name.s, p->node_id, cluster_id); shm_free(param); }