Skip to content

Commit

Permalink
publish message with headers works
Browse files Browse the repository at this point in the history
  • Loading branch information
kwitekrac committed Oct 28, 2023
1 parent 1611c57 commit d7e5c26
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 26 deletions.
180 changes: 155 additions & 25 deletions labview/labview_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString)
return err;
}

char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32_t offset, char toFind){
uint32_t i=offset;
for (i; i<headerBufferLen; i++)
{
if (headerBuffer[i]==toFind)
{
return headerBuffer+i;
}
}
return NULL;
}

// Function to split a string into multiple strings based on the ";" separator
char **splitString(char *input, int *count)
{
Expand All @@ -58,6 +70,113 @@ char **splitString(char *input, int *count)
return tokens;
}

// Define a function to parse the header string and create an amqp_table_t
void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen) {
amqp_table_entry_t* entries = NULL;
int numEntries = 0;
int finished = FALSE;
int index = 0;

while (finished==0) {
// Find the delimiter ';'
char* delimiter = findInBuffer(headerBuffer, headerBufferLen, index, 0x3B);
if (delimiter == NULL) {
delimiter = headerBuffer + headerBufferLen; // If no delimiter found, use the end of the string
}
if (delimiter == headerBuffer + headerBufferLen)
{
finished=TRUE;
}
// Find the equal_char '='
char* equal_char = findInBuffer(headerBuffer, headerBufferLen, index, 0x3D);

if (equal_char != NULL && equal_char < delimiter) {

// Add the entry to the entries array
amqp_table_entry_t* newEntries = realloc(entries, (numEntries + 1) * sizeof(amqp_table_entry_t));
if (newEntries == NULL) {
// Error handling for memory allocation failure
perror("Memory allocation failed");
free(entries);
return table;
}
entries = newEntries;
amqp_field_value_t value;

// Get the kind (first byte)
value.kind = headerBuffer[index];
index++; // Move to the next character

// Parse the key
entries[numEntries].key.bytes=headerBuffer + index;
entries[numEntries].key.len= equal_char - ((char*)headerBuffer + index);


// Init and parse the value
char valueTempBuffer [8];
int valueLength = delimiter - equal_char - 1;
memcpy(valueTempBuffer, equal_char + 1, valueLength);

// Parse the value based on the kind
switch (value.kind) {
case AMQP_FIELD_KIND_UTF8:
case AMQP_FIELD_KIND_BYTES:
value.value.bytes.bytes = equal_char+1; //fist char after "="
value.value.bytes.len = valueLength;
break;
case AMQP_FIELD_KIND_I8:
value.value.i8 = (int8_t)valueTempBuffer[0];
break;
case AMQP_FIELD_KIND_U8:
value.value.u8 = (uint8_t)valueTempBuffer[0];
break;
case AMQP_FIELD_KIND_I16:
value.value.i16 = *((int16_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U16:
value.value.u16 = *((uint16_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_I32:
value.value.i32 = *((int32_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U32:
value.value.u32 = *((uint32_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_I64:
value.value.i64 = *((int64_t*)valueTempBuffer);
// for (int i=0; i < 8; ++i) {
// data = (data << 8) | valueStr[i]; // bytes is already UInt8 * so no need to mask it
// }
break;
case AMQP_FIELD_KIND_U64:
case AMQP_FIELD_KIND_TIMESTAMP:
value.value.u64 = *((uint64_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_F32:
value.value.f32 = *((float*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_F64:
value.value.f64 = *((double*)valueTempBuffer);
break;
default:
// Unsupported kind, ignore this header
break;
}

entries[numEntries].value = value;
numEntries++;
}

// Move to the next header (skip the delimiter ';')
index = delimiter - (char*)headerBuffer + 1;
}

// Assign the entries to the table
table->num_entries = numEntries;
table->entries = entries;
}


void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders)
{
// Calculate required buffer size for concatenated headers keys and values separated by "=",
Expand Down Expand Up @@ -342,46 +461,31 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch
}

LABVIEW_PUBLIC_FUNCTION
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description)
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description)
{
amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr;
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /*persistent delivery mode */

int count, count2;
char **headers_key = splitString(cheaders_key, &count);
char **headers_value = splitString(cheaders_value, &count2);
if (headers_key != NULL)

amqp_table_t *table;
if (headerBufferLen != 0)
{
// Update flags to use custom headers
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
// Allocate memory for custom headers
amqp_table_t *table = &props.headers;
props.headers.num_entries = count;
props.headers.entries = calloc(props.headers.num_entries, sizeof(amqp_table_entry_t));
// update headers content
for (int i = 0; i < count; i++)
{ (table->entries[i]).key = amqp_cstring_bytes(headers_key[i]);
((table->entries[i]).value).kind = AMQP_FIELD_KIND_BYTES;
((table->entries[i]).value).value.bytes = amqp_cstring_bytes(headers_value[i]);
}
table = &props.headers;
//props.expiration = amqp_cstring_bytes("60000");
parseHeaders(table, headerBuffer, headerBufferLen);
}

int error_code = error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody));
int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody));

// Dereference headers
if (headers_key != NULL)
if (headerBufferLen != 0)
{
for (int i = 0; i < count; i++)
{
free(headers_key[i]);
free(headers_value[i]);
}

free(headers_key);
free(headers_value);
free(table->entries);
}

return error_code;
Expand Down Expand Up @@ -463,4 +567,30 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out

amqp_destroy_envelope(&envelope);
return status;
}

LABVIEW_PUBLIC_FUNCTION
void lv_test(LStrHandle outputA, LStrHandle outputB, LStrHandle inputA, uint8_t *headerBuffer, int32_t headerBufferLen){
char outputA_str[100];
char outputB_str[100];

amqp_table_t headers;
amqp_table_t* headerPtr = &headers;
//parseHeaders(headerPtr, intputB, intputBlen);
char* delimiter = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3B);
char* equal_char = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3D);

int pos1 = delimiter - (char*)headerBuffer;
int pos2 = equal_char - (char*)headerBuffer;

//char valueTempBuffer [8];
int valueLength = delimiter - equal_char - 1;
memcpy(outputA_str, equal_char + 1, valueLength);

//sprintf(outputA_str, "%d", pos1);
sprintf(outputB_str, "%d", pos2);

copyStringToLStrHandle(outputA_str, outputA);
copyStringToLStrHandle(outputB_str, outputB);
free(headerPtr->entries);
}
2 changes: 1 addition & 1 deletion labview/labview_rabbitmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char* exchan

int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description);

int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description);
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description);

int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description);

Expand Down

0 comments on commit d7e5c26

Please sign in to comment.