Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish msg: headers passed by structure #15

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions labview/labview_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out
}

LABVIEW_PUBLIC_FUNCTION
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, LStrHandle messageBody, LStrHandle errorDescription)
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, KeyValuePairArrHdl headers, LStrHandle messageBody, LStrHandle errorDescription)
kwitekrac marked this conversation as resolved.
Show resolved Hide resolved
{
amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr;
amqp_basic_properties_t props;
Expand All @@ -230,22 +230,21 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange,
messageBodyBuffer.len = (*messageBody)->cnt;
messageBodyBuffer.bytes = (void *)((*messageBody)->str);


amqp_table_t *table;
if (msgHeaderBufLen > 0)
if ((*headers)->dimSize > 0)
{
// Update flags to use custom headers
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
table = &props.headers;
stringToHeaders(table, msgHeaderBuf, msgHeaderBufLen);
props._flags |= AMQP_BASIC_HEADERS_FLAG;

amqp_table_t *table = &props.headers;
buildHeaders(table, headers);
}

int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, messageBodyBuffer);

// Free allocated headers
if (msgHeaderBufLen > 0)
//Free allocated headers
if ((*headers)->dimSize > 0)
{
free(table->entries);
free(props.headers.entries);
}

return error_code;
Expand Down
4 changes: 3 additions & 1 deletion labview/labview_rabbitmq.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "labview_types.h"

#ifndef LABVIEW_RABBITMQ_H
#define LABVIEW_RABBITMQ_H

Expand Down Expand Up @@ -44,7 +46,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchan

int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description);

int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, LStrHandle messagebody, LStrHandle error_description);
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, KeyValuePairArrHdl headers, LStrHandle messageBody, LStrHandle errorDescription);

int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription);

Expand Down
20 changes: 20 additions & 0 deletions labview/labview_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
#ifndef LABVIEW_TYPES_H
#define LABVIEW_TYPES_H

/**
* Structure that represents LabVIEW Cluster (items: string key, string value, int32 dataType)
*/
typedef struct {
LStrHandle key;
LStrHandle value;
int32_t dataType;
} KeyValuePairRec;
kwitekrac marked this conversation as resolved.
Show resolved Hide resolved
/**
* Structure that represents LabVIEW Cluster Array
*/
typedef struct {
int32_t dimSize;
KeyValuePairRec elt[1];
} KeyValuePairArr;
/**
* Handle to LabVIEW Cluster Array, used to pass it between LabVIEW and C
*/
typedef KeyValuePairArr **KeyValuePairArrHdl;
kwitekrac marked this conversation as resolved.
Show resolved Hide resolved

/**
* This function copies the contents of a C string into a LabVIEW LStrHandle,
* resizing the handle if necessary.
Expand Down
197 changes: 81 additions & 116 deletions labview/msg_headers.c
Original file line number Diff line number Diff line change
@@ -1,124 +1,9 @@
#include <rabbitmq-c/amqp.h>
#include <stdlib.h>

#include "extcode.h"
#include "labview_types.h"

/**
* This function parses a string representation of headers and populates an AMQP
* table with the parsed header entries. The headers in the string are expected
* to be in a specific format where key-value pairs are separated by '=' and
* multiple headers are separated by ';'.
*
* @note The `table` parameter should be a initialized `amqp_table_t` structure.
* The `headerBuffer` should point to a null-terminated C string.
* The function modifies the `table` structure in place with the parsed
* entries.
*/

const int MAX_HEADER_VALUE_LENGTH=64; // limits strings length sends as header value

void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer,
uint64_t headerBufferLen) {
amqp_table_entry_t* entries = NULL;
int numEntries = 0;
int finished = FALSE;
int index = 0;

while (finished == 0) {
// Find the delimiter ';'
char* delimiter = findInBuffer(headerBuffer, headerBufferLen, index, 0x3B);
if (delimiter == NULL) {
delimiter =
headerBuffer +
headerBufferLen; // If no delimiter found, use the end of the string
}
if (delimiter == headerBuffer + headerBufferLen) {
finished = TRUE;
}
// Find the equal_char '='
char* equal_char = findInBuffer(headerBuffer, headerBufferLen, index, 0x3D);

if (equal_char != NULL && equal_char < delimiter) {
// Add the entry to the entries array
amqp_table_entry_t* newEntries =
realloc(entries, (numEntries + 1) * sizeof(amqp_table_entry_t));
if (newEntries == NULL) {
// Error handling for memory allocation failure
perror("Memory allocation failed");
free(entries);
return table;
}
entries = newEntries;
amqp_field_value_t value;

// Get the kind (first byte)
value.kind = headerBuffer[index];
index++; // Move to the next character

// Parse the key
entries[numEntries].key.bytes = headerBuffer + index;
entries[numEntries].key.len = equal_char - ((char*)headerBuffer + index);

// Init and parse the value
char valueTempBuffer[MAX_HEADER_VALUE_LENGTH];
int valueLength = delimiter - equal_char - 1;
memcpy(valueTempBuffer, equal_char + 1, valueLength);

// Parse the value based on the kind
switch (value.kind) {
case AMQP_FIELD_KIND_UTF8:
case AMQP_FIELD_KIND_BYTES:
value.value.bytes.bytes = equal_char + 1; // fist char after "="
value.value.bytes.len = valueLength;
break;
case AMQP_FIELD_KIND_I8:
value.value.i8 = (int8_t)valueTempBuffer[0];
break;
case AMQP_FIELD_KIND_U8:
value.value.u8 = (uint8_t)valueTempBuffer[0];
break;
case AMQP_FIELD_KIND_I16:
value.value.i16 = *((int16_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U16:
value.value.u16 = *((uint16_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_I32:
value.value.i32 = *((int32_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U32:
value.value.u32 = *((uint32_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_I64:
value.value.i64 = *((int64_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U64:
case AMQP_FIELD_KIND_TIMESTAMP:
value.value.u64 = *((uint64_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_F32:
value.value.f32 = *((float*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_F64:
value.value.f64 = *((double*)valueTempBuffer);
break;
default:
// Unsupported kind, ignore this header
break;
}

entries[numEntries].value = value;
numEntries++;
}

// Move to the next header (skip the delimiter ';')
index = delimiter - (char*)headerBuffer + 1;
}

// Assign the entries to the table
table->num_entries = numEntries;
table->entries = entries;
}

/**
* This function takes an AMQP table containing headers and converts them into
Expand Down Expand Up @@ -267,4 +152,84 @@ void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) {

// Free the temporary buffer
free(headers);
}


amqp_field_value_t createFieldValue(int32_t dataType, LStrHandle value)
{
amqp_field_value_t fieldValue;
fieldValue.kind = dataType;

switch (fieldValue.kind) {
case AMQP_FIELD_KIND_UTF8:
case AMQP_FIELD_KIND_BYTES:
fieldValue.value.bytes.bytes = (void *)(*value)->str;
fieldValue.value.bytes.len = (*value)->cnt;
break;
case AMQP_FIELD_KIND_I8:
fieldValue.value.i8 = *((int8_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U8:
fieldValue.value.u8 = *((uint8_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_I16:
fieldValue.value.i16 = *((int16_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U16:
fieldValue.value.u16 = *((uint16_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_I32:
fieldValue.value.i32 = *((int32_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U32:
fieldValue.value.u32 = *((uint32_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_I64:
fieldValue.value.i64 = *((int64_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U64:
case AMQP_FIELD_KIND_TIMESTAMP:
fieldValue.value.u64 = *((uint64_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_F32:
fieldValue.value.f32 = *((float*)(*value)->str);
break;
case AMQP_FIELD_KIND_F64:
fieldValue.value.f64 = *((double*)(*value)->str);
break;
default:
// Unsupported kind, ignore this header
break;
}
return fieldValue;
}

/**
* This function parses a C representation of array of clusters and populates an AMQP
* table with the parsed header entries.
*
* @note The `table` parameter should be a initialized `amqp_table_t` structure.
* The `KeyValuePairArrHdl` should be a handler to C structure generated by LabVIEW.
* The function modifies the `table` structure in place with the parsed entries.
*/
void buildHeaders(amqp_table_t* table, KeyValuePairArrHdl headers)
{
amqp_table_entry_t* entries = malloc(sizeof(amqp_table_entry_t) * (*headers)->dimSize);

int32_t i = 0;
KeyValuePairRec *p = (*headers)->elt;
for (; i < (*headers)->dimSize; i++, p++)
{
amqp_field_value_t value;
LStrPtr keyPtr = *p->key;

entries[i].key.bytes = (void *)keyPtr->str;
entries[i].key.len = keyPtr->cnt;
entries[i].value = createFieldValue(p->dataType, p->value);
}

// Set the entries to the headers table
table -> entries = entries;
table -> num_entries = (*headers)->dimSize;

}
5 changes: 3 additions & 2 deletions labview/msg_headers.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <rabbitmq-c/amqp.h>
#include "extcode.h"
#include "labview_types.h"

#ifndef MSG_HEADERS_H
#define MSG_HEADERS_H
Expand All @@ -10,7 +11,7 @@
* Key-value pairs are separated by '=' and multiple headers are separated by
* ';'.
*/
void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen);
void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders);

/**
* This function parses a string representation of headers and populates an AMQP
Expand All @@ -23,6 +24,6 @@ void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t hea
* The function modifies the `table` structure in place with the parsed
* entries.
*/
void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders);
void buildHeaders(amqp_table_t* table, KeyValuePairArrHdl headers);

#endif
Loading