From 28ce1c879b00940c36c1a52fd34815610edb6aa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Tue, 12 Dec 2023 08:42:02 +0100 Subject: [PATCH] msg body transferred by LStrHandle (#12) Msg body transferred from LabVIEW to C library using LStrHandle type. Message header values were limited to 8 chars, now it's defined in MAX_HEADER_VALUE_LENGTH, and it's equal to 64. ## Changes Made ## Related Issues ## Checklist - [x] I have used a PR title that is descriptive enough for a release note. - [x] I have tested these changes locally. - [ ] I have added appropriate tests or updated existing tests. - [ ] I have tested these changes on a dedicated VM or a customer VM [name of the VM] - [ ] I have added appropriate documentation or updated existing documentation. --------- Co-authored-by: kwitekrac --- RELEASE_NOTES.md | 2 +- labview/labview_rabbitmq.c | 8 ++++++-- labview/labview_rabbitmq.h | 2 +- labview/msg_headers.c | 5 ++++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d5b78b95..b5173461 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-11-05 +## 0.0.1-dev - 2023-12-12 ### Features diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index df14e354..8923d917 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -202,13 +202,17 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, char *messageBody, LStrHandle errorDescription) +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, LStrHandle messageBody, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /*persistent delivery mode */ + + amqp_bytes_t messageBodyBuffer; + messageBodyBuffer.len = (*messageBody)->cnt; + messageBodyBuffer.bytes = (void *)((*messageBody)->str); amqp_table_t *table; @@ -220,7 +224,7 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, stringToHeaders(table, msgHeaderBuf, msgHeaderBufLen); } - int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, amqp_cstring_bytes(messageBody)); + int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, messageBodyBuffer); // Free allocated headers if (msgHeaderBufLen > 0) diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 70ea9c42..d6c1a407 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -44,7 +44,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchan int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description); +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, LStrHandle messagebody, LStrHandle error_description); int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription); diff --git a/labview/msg_headers.c b/labview/msg_headers.c index f2a5dac3..77dd03e0 100644 --- a/labview/msg_headers.c +++ b/labview/msg_headers.c @@ -14,6 +14,9 @@ * The function modifies the `table` structure in place with the parsed * entries. */ + +const int MAX_HEADER_VALUE_LENGTH=64; // limits strings length sends as header value + void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen) { amqp_table_entry_t* entries = NULL; @@ -57,7 +60,7 @@ void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer, entries[numEntries].key.len = equal_char - ((char*)headerBuffer + index); // Init and parse the value - char valueTempBuffer[8]; + char valueTempBuffer[MAX_HEADER_VALUE_LENGTH]; int valueLength = delimiter - equal_char - 1; memcpy(valueTempBuffer, equal_char + 1, valueLength);