diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index b5173461..d5ad645c 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-12-12 +## 0.0.1-dev - 2024-05-06 ### Features diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 8923d917..360f8a1b 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -150,6 +150,22 @@ int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, ch amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", errorDescription); + return status; +} + +LABVIEW_PUBLIC_FUNCTION +int lv_amqp_basic_consume(int64_t conn_intptr, uint16_t channel, char *queuename, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + int status; + + // First check if the queue exists + amqp_boolean_t PASSIVE = 1; + amqp_boolean_t DURABLE = 0; + amqp_boolean_t EXCLUSIVE = 0; + amqp_boolean_t AUTO_DELETE = 1; + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_cstring_bytes(queuename), PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Checking queue", errorDescription); if (status != 1) { return status; @@ -177,7 +193,7 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out amqp_rpc_reply_t res; amqp_envelope_t envelope; - //amqp_maybe_release_buffers(conn); + amqp_maybe_release_buffers(conn); status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0), "Consuming message", errorDescription); if (status != 1) diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index d6c1a407..8c9187ab 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -48,6 +48,8 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription); +int lv_amqp_basic_consume(int64_t conn_intptr, uint16_t channel, char *queuename, LStrHandle errorDescription); + int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description); int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders, LStrHandle error_description);