From 75487600c5246b2d36c70e022ad82b1afb3a4f35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Sun, 22 Oct 2023 19:09:47 +0200 Subject: [PATCH] queue create and bind split --- labview/labview_rabbitmq.c | 24 +++++++++++------------- labview/labview_rabbitmq.h | 4 +++- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 2ee4f47e..a575cc74 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -313,7 +313,7 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange LABVIEW_PUBLIC_FUNCTION -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *bindingkey, LStrHandle error_description) { +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; int status; amqp_bytes_t queuename; @@ -325,17 +325,16 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description); - if (status != 1) { - return status; - } - - queuename = amqp_bytes_malloc_dup(r->queue); - if (queuename.bytes == NULL) { - copyStringToLStrHandle("Out of memory while copying queue name", error_description); - return _OUT_OF_MEMORY; - } + copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); + return status; +} + +LABVIEW_PUBLIC_FUNCTION +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description) { + amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; + int status; - amqp_queue_bind(conn, channel, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); + amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", error_description); if (status != 1) { return status; @@ -344,11 +343,10 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, amqp_boolean_t NO_LOCAL = 0; amqp_boolean_t NO_ACK = 1; amqp_boolean_t EXCLUSIVE2 = 0; - amqp_basic_consume(conn, channel, queuename, amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); + amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename), amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); /* amqp_basic_consume is used to register a consumer on the queue, so that the broker will start delivering messages to it.*/ status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description); - amqp_bytes_free(queuename); return status; } diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 021a4157..59d632a3 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -46,7 +46,9 @@ int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, ch int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description); -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* exchange, char* bindingkey, LStrHandle error_description); +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description); + +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description); int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description);