Skip to content

Commit

Permalink
queue create and bind split
Browse files Browse the repository at this point in the history
  • Loading branch information
kwitekrac committed Oct 22, 2023
1 parent 78da22f commit 7548760
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
24 changes: 11 additions & 13 deletions labview/labview_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange


LABVIEW_PUBLIC_FUNCTION
int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *bindingkey, LStrHandle error_description) {
int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) {
amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr;
int status;
amqp_bytes_t queuename;
Expand All @@ -325,17 +325,16 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange,
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table);

status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description);
if (status != 1) {
return status;
}

queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
copyStringToLStrHandle("Out of memory while copying queue name", error_description);
return _OUT_OF_MEMORY;
}
copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out);
return status;
}

LABVIEW_PUBLIC_FUNCTION
int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description) {
amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr;
int status;

amqp_queue_bind(conn, channel, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table);
amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table);
status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", error_description);
if (status != 1) {
return status;
Expand All @@ -344,11 +343,10 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange,
amqp_boolean_t NO_LOCAL = 0;
amqp_boolean_t NO_ACK = 1;
amqp_boolean_t EXCLUSIVE2 = 0;
amqp_basic_consume(conn, channel, queuename, amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table);
amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename), amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table);
/* amqp_basic_consume is used to register a consumer on the queue,
so that the broker will start delivering messages to it.*/
status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description);
amqp_bytes_free(queuename);
return status;
}

Expand Down
4 changes: 3 additions & 1 deletion labview/labview_rabbitmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, ch

int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description);

int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* exchange, char* bindingkey, LStrHandle error_description);
int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description);

int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description);

int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description);

Expand Down

0 comments on commit 7548760

Please sign in to comment.