diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 360f8a1b..666b2c7f 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -15,7 +15,7 @@ LABVIEW_PUBLIC_FUNCTION char *lv_rabbitmq_version(void) { - char *VERSION = "0.0.1"; + char *VERSION = "0.0.3"; return VERSION; } @@ -218,7 +218,7 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, LStrHandle messageBody, LStrHandle errorDescription) +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, KeyValuePairArrHdl headers, LStrHandle messageBody, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_basic_properties_t props; @@ -230,22 +230,21 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, messageBodyBuffer.len = (*messageBody)->cnt; messageBodyBuffer.bytes = (void *)((*messageBody)->str); - - amqp_table_t *table; - if (msgHeaderBufLen > 0) + if ((*headers)->dimSize > 0) { // Update flags to use custom headers - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; - table = &props.headers; - stringToHeaders(table, msgHeaderBuf, msgHeaderBufLen); + props._flags |= AMQP_BASIC_HEADERS_FLAG; + + amqp_table_t *table = &props.headers; + buildHeaders(table, headers); } int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, messageBodyBuffer); - // Free allocated headers - if (msgHeaderBufLen > 0) + //Free allocated headers + if ((*headers)->dimSize > 0) { - free(table->entries); + free(props.headers.entries); } return error_code; diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 8c9187ab..b64423ce 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -1,3 +1,5 @@ +#include "labview_types.h" + #ifndef LABVIEW_RABBITMQ_H #define LABVIEW_RABBITMQ_H @@ -44,7 +46,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchan int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, LStrHandle messagebody, LStrHandle error_description); +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, KeyValuePairArrHdl headers, LStrHandle messageBody, LStrHandle errorDescription); int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription); diff --git a/labview/labview_types.h b/labview/labview_types.h index 496df44b..ca5d47d9 100644 --- a/labview/labview_types.h +++ b/labview/labview_types.h @@ -3,6 +3,26 @@ #ifndef LABVIEW_TYPES_H #define LABVIEW_TYPES_H +/** + * Structure that represents LabVIEW Cluster (items: string key, string value, int32 dataType) + */ +typedef struct { + LStrHandle key; + LStrHandle value; + int32_t dataType; +} KeyValuePairRec; +/** + * Structure that represents LabVIEW Cluster Array +*/ +typedef struct { + int32_t dimSize; + KeyValuePairRec elt[1]; +} KeyValuePairArr; +/** + * Handle to LabVIEW Cluster Array, used to pass it between LabVIEW and C +*/ +typedef KeyValuePairArr **KeyValuePairArrHdl; + /** * This function copies the contents of a C string into a LabVIEW LStrHandle, * resizing the handle if necessary. diff --git a/labview/msg_headers.c b/labview/msg_headers.c index 77dd03e0..d2daf82f 100644 --- a/labview/msg_headers.c +++ b/labview/msg_headers.c @@ -1,124 +1,9 @@ #include +#include #include "extcode.h" #include "labview_types.h" -/** - * This function parses a string representation of headers and populates an AMQP - * table with the parsed header entries. The headers in the string are expected - * to be in a specific format where key-value pairs are separated by '=' and - * multiple headers are separated by ';'. - * - * @note The `table` parameter should be a initialized `amqp_table_t` structure. - * The `headerBuffer` should point to a null-terminated C string. - * The function modifies the `table` structure in place with the parsed - * entries. - */ - -const int MAX_HEADER_VALUE_LENGTH=64; // limits strings length sends as header value - -void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer, - uint64_t headerBufferLen) { - amqp_table_entry_t* entries = NULL; - int numEntries = 0; - int finished = FALSE; - int index = 0; - - while (finished == 0) { - // Find the delimiter ';' - char* delimiter = findInBuffer(headerBuffer, headerBufferLen, index, 0x3B); - if (delimiter == NULL) { - delimiter = - headerBuffer + - headerBufferLen; // If no delimiter found, use the end of the string - } - if (delimiter == headerBuffer + headerBufferLen) { - finished = TRUE; - } - // Find the equal_char '=' - char* equal_char = findInBuffer(headerBuffer, headerBufferLen, index, 0x3D); - - if (equal_char != NULL && equal_char < delimiter) { - // Add the entry to the entries array - amqp_table_entry_t* newEntries = - realloc(entries, (numEntries + 1) * sizeof(amqp_table_entry_t)); - if (newEntries == NULL) { - // Error handling for memory allocation failure - perror("Memory allocation failed"); - free(entries); - return table; - } - entries = newEntries; - amqp_field_value_t value; - - // Get the kind (first byte) - value.kind = headerBuffer[index]; - index++; // Move to the next character - - // Parse the key - entries[numEntries].key.bytes = headerBuffer + index; - entries[numEntries].key.len = equal_char - ((char*)headerBuffer + index); - - // Init and parse the value - char valueTempBuffer[MAX_HEADER_VALUE_LENGTH]; - int valueLength = delimiter - equal_char - 1; - memcpy(valueTempBuffer, equal_char + 1, valueLength); - - // Parse the value based on the kind - switch (value.kind) { - case AMQP_FIELD_KIND_UTF8: - case AMQP_FIELD_KIND_BYTES: - value.value.bytes.bytes = equal_char + 1; // fist char after "=" - value.value.bytes.len = valueLength; - break; - case AMQP_FIELD_KIND_I8: - value.value.i8 = (int8_t)valueTempBuffer[0]; - break; - case AMQP_FIELD_KIND_U8: - value.value.u8 = (uint8_t)valueTempBuffer[0]; - break; - case AMQP_FIELD_KIND_I16: - value.value.i16 = *((int16_t*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_U16: - value.value.u16 = *((uint16_t*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_I32: - value.value.i32 = *((int32_t*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_U32: - value.value.u32 = *((uint32_t*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_I64: - value.value.i64 = *((int64_t*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_U64: - case AMQP_FIELD_KIND_TIMESTAMP: - value.value.u64 = *((uint64_t*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_F32: - value.value.f32 = *((float*)valueTempBuffer); - break; - case AMQP_FIELD_KIND_F64: - value.value.f64 = *((double*)valueTempBuffer); - break; - default: - // Unsupported kind, ignore this header - break; - } - - entries[numEntries].value = value; - numEntries++; - } - - // Move to the next header (skip the delimiter ';') - index = delimiter - (char*)headerBuffer + 1; - } - - // Assign the entries to the table - table->num_entries = numEntries; - table->entries = entries; -} /** * This function takes an AMQP table containing headers and converts them into @@ -267,4 +152,84 @@ void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) { // Free the temporary buffer free(headers); +} + + +amqp_field_value_t createFieldValue(int32_t dataType, LStrHandle value) +{ + amqp_field_value_t fieldValue; + fieldValue.kind = dataType; + + switch (fieldValue.kind) { + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + fieldValue.value.bytes.bytes = (void *)(*value)->str; + fieldValue.value.bytes.len = (*value)->cnt; + break; + case AMQP_FIELD_KIND_I8: + fieldValue.value.i8 = *((int8_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_U8: + fieldValue.value.u8 = *((uint8_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_I16: + fieldValue.value.i16 = *((int16_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_U16: + fieldValue.value.u16 = *((uint16_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_I32: + fieldValue.value.i32 = *((int32_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_U32: + fieldValue.value.u32 = *((uint32_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_I64: + fieldValue.value.i64 = *((int64_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_U64: + case AMQP_FIELD_KIND_TIMESTAMP: + fieldValue.value.u64 = *((uint64_t*)(*value)->str); + break; + case AMQP_FIELD_KIND_F32: + fieldValue.value.f32 = *((float*)(*value)->str); + break; + case AMQP_FIELD_KIND_F64: + fieldValue.value.f64 = *((double*)(*value)->str); + break; + default: + // Unsupported kind, ignore this header + break; + } + return fieldValue; +} + +/** + * This function parses a C representation of array of clusters and populates an AMQP + * table with the parsed header entries. + * + * @note The `table` parameter should be a initialized `amqp_table_t` structure. + * The `KeyValuePairArrHdl` should be a handler to C structure generated by LabVIEW. + * The function modifies the `table` structure in place with the parsed entries. + */ +void buildHeaders(amqp_table_t* table, KeyValuePairArrHdl headers) +{ + amqp_table_entry_t* entries = malloc(sizeof(amqp_table_entry_t) * (*headers)->dimSize); + + int32_t i = 0; + KeyValuePairRec *p = (*headers)->elt; + for (; i < (*headers)->dimSize; i++, p++) + { + amqp_field_value_t value; + LStrPtr keyPtr = *p->key; + + entries[i].key.bytes = (void *)keyPtr->str; + entries[i].key.len = keyPtr->cnt; + entries[i].value = createFieldValue(p->dataType, p->value); + } + + // Set the entries to the headers table + table -> entries = entries; + table -> num_entries = (*headers)->dimSize; + } \ No newline at end of file diff --git a/labview/msg_headers.h b/labview/msg_headers.h index d72594d6..226f1079 100644 --- a/labview/msg_headers.h +++ b/labview/msg_headers.h @@ -1,5 +1,6 @@ #include #include "extcode.h" +#include "labview_types.h" #ifndef MSG_HEADERS_H #define MSG_HEADERS_H @@ -10,7 +11,7 @@ * Key-value pairs are separated by '=' and multiple headers are separated by * ';'. */ -void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen); +void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders); /** * This function parses a string representation of headers and populates an AMQP @@ -23,6 +24,6 @@ void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t hea * The function modifies the `table` structure in place with the parsed * entries. */ -void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders); +void buildHeaders(amqp_table_t* table, KeyValuePairArrHdl headers); #endif