From 78da22f0be401e35690c84352f1c325c6c526235 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Sun, 22 Oct 2023 18:21:49 +0200 Subject: [PATCH 01/20] message headers supported --- .vscode/settings.json | 18 ++++++ labview/labview_rabbitmq.c | 118 ++++++++++++++++++++++++++++++++++++- labview/labview_rabbitmq.h | 4 +- 3 files changed, 135 insertions(+), 5 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..d89833d0 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,18 @@ +{ + "files.associations": { + "optional": "c", + "istream": "c", + "ostream": "c", + "ratio": "c", + "system_error": "c", + "array": "c", + "functional": "c", + "tuple": "c", + "type_traits": "c", + "utility": "c", + "chrono": "c", + "algorithm": "c", + "sstream": "c", + "random": "c" + } +} \ No newline at end of file diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 359f3683..2ee4f47e 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -34,6 +34,81 @@ MgErr copyBufferToLStrHandle(const void* buffer, int len, LStrHandle LVString) return err; } +// Function to split a string into multiple strings based on the ";" separator +char** splitString(char* input, int* count) { + char** tokens = NULL; + char* token = strtok(input, ";"); + *count = 0; + + while (token != NULL) { + tokens = (char**)realloc(tokens, ((*count) + 1) * sizeof(char*)); + if (tokens == NULL) { + perror("Memory allocation error"); + exit(1); + } + tokens[(*count)] = strdup(token); + (*count)++; + token = strtok(NULL, ";"); + } + + return tokens; +} + +void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, LStrHandle cheaders_value) { + // Calculate required buffer size for concatenated headers keys and values, + // following strings will be separated by ";". Buffer will be finished with Null-terminate the C string. + // Init counters with num of elements-1 (number of separators) + 1 ('\0') + int required_buffer_size_keys = (table->num_entries-1) +1; + int required_buffer_size_values = (table->num_entries-1) +1; + for (int i = 0; i < table->num_entries; i++) { + required_buffer_size_keys += table->entries[i].key.len; + required_buffer_size_values += table->entries[i].value.value.bytes.len; + } + // Allocate memory for temp strings + char *keys = (char *)malloc(required_buffer_size_keys * sizeof(char)); + char *values = (char *)malloc(required_buffer_size_values * sizeof(char)); + + int keys_offset=0; + int values_offset=0; + for (int i = 0; i < table->num_entries; i++) { + memcpy(keys+keys_offset, table->entries[i].key.bytes, table->entries[i].key.len); + keys_offset += table->entries[i].key.len; + keys[keys_offset]=';'; + keys_offset++; + memcpy(values+values_offset, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); + values_offset += table->entries[i].value.value.bytes.len; + values[values_offset]=';'; + values_offset++; + } + // add Null-terminate the C string + keys[keys_offset-1]='\0'; + values[values_offset-1]='\0'; + + copyStringToLStrHandle(keys, cheaders_key); + copyStringToLStrHandle(values, cheaders_value); + + free(keys); + free(values); +} + +char* amqpBytesToString(amqp_bytes_t input) { + // Allocate memory for the C string and a null terminator + char* result = (char*)malloc(input.len + 1); + + if (result == NULL) { + fprintf(stderr, "Memory allocation failed\n"); + exit(1); + } + + // Copy the data from amqp_bytes_t to the C string + memcpy(result, input.bytes, input.len); + + // Null-terminate the C string + result[input.len] = '\0'; + + return result; +} + LABVIEW_PUBLIC_FUNCTION char* lv_rabbitmq_version(void) { char* VERSION = "0.0.1"; @@ -196,13 +271,43 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *messagebody, LStrHandle error_description) { +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /* persistent delivery mode */ - return amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); + + int count, count2; + char** headers_key = splitString(cheaders_key, &count); + char** headers_value = splitString(cheaders_value, &count2); + if (headers_key != NULL) { + // Update flags to use custom headers + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; + // Allocate memory for custom headers + amqp_table_t *table=&props.headers; + props.headers.num_entries=count; + props.headers.entries=calloc(props.headers.num_entries, sizeof(amqp_table_entry_t)); + // update headers content + for (int i = 0; i < count; i++) { + (table->entries[i]).key=amqp_cstring_bytes(headers_key[i]); + ((table->entries[i]).value).kind=AMQP_FIELD_KIND_BYTES; + ((table->entries[i]).value).value.bytes=amqp_cstring_bytes(headers_value[i]); + } + } + + int error_code = error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); + + // Dereference headers + if (headers_key != NULL) { + for (int i = 0; i < count; i++) { + free(headers_key[i]); + free(headers_value[i]); + } + free(headers_key); + free(headers_value); + } + return error_code; //this function returns amqp_status_enum thats different from amqp_rpc_reply_t } @@ -243,12 +348,13 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, /* amqp_basic_consume is used to register a consumer on the queue, so that the broker will start delivering messages to it.*/ status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description); + amqp_bytes_free(queuename); return status; } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle error_description) { +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; int status; @@ -269,6 +375,12 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out copyBufferToLStrHandle(envelope.message.body.bytes, envelope.message.body.len, output); } + // Check message headers + amqp_table_t *headers = &envelope.message.properties.headers; + if (headers->num_entries > 0) { + getConcatenatedMessageHeaders(headers, cheaders_key, cheaders_value); + } + amqp_destroy_envelope(&envelope); return status; } diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index a8058459..021a4157 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -44,11 +44,11 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char* exchan int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char* exchange, char* routingkey, char* messagebody, LStrHandle error_description); +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description); int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* exchange, char* bindingkey, LStrHandle error_description); -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle error_description); +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description); #endif /* LABVIEW_RABBITMQ_H */ From 75487600c5246b2d36c70e022ad82b1afb3a4f35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Sun, 22 Oct 2023 19:09:47 +0200 Subject: [PATCH 02/20] queue create and bind split --- labview/labview_rabbitmq.c | 24 +++++++++++------------- labview/labview_rabbitmq.h | 4 +++- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 2ee4f47e..a575cc74 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -313,7 +313,7 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange LABVIEW_PUBLIC_FUNCTION -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *bindingkey, LStrHandle error_description) { +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; int status; amqp_bytes_t queuename; @@ -325,17 +325,16 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description); - if (status != 1) { - return status; - } - - queuename = amqp_bytes_malloc_dup(r->queue); - if (queuename.bytes == NULL) { - copyStringToLStrHandle("Out of memory while copying queue name", error_description); - return _OUT_OF_MEMORY; - } + copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); + return status; +} + +LABVIEW_PUBLIC_FUNCTION +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description) { + amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; + int status; - amqp_queue_bind(conn, channel, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); + amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", error_description); if (status != 1) { return status; @@ -344,11 +343,10 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, amqp_boolean_t NO_LOCAL = 0; amqp_boolean_t NO_ACK = 1; amqp_boolean_t EXCLUSIVE2 = 0; - amqp_basic_consume(conn, channel, queuename, amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); + amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename), amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); /* amqp_basic_consume is used to register a consumer on the queue, so that the broker will start delivering messages to it.*/ status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description); - amqp_bytes_free(queuename); return status; } diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 021a4157..59d632a3 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -46,7 +46,9 @@ int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, ch int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description); -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* exchange, char* bindingkey, LStrHandle error_description); +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description); + +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description); int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description); From 6642c9fd17f3936f879fd2fad34424505493d069 Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Sun, 22 Oct 2023 18:27:00 +0000 Subject: [PATCH 03/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 127edaa3..50528969 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-08-07 +## 0.0.1-dev - 2023-10-22 ### Features From 24560889c83c5c70e217eb6cd4c647b03e8e0a02 Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Sun, 22 Oct 2023 18:32:33 +0000 Subject: [PATCH 04/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 50528969..328f1616 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -4,6 +4,7 @@ ### Features +- Support message headers (PR #10 by @kwitekrac) - LabVIEW support (PR #8 by @kwitekrac) ### Continuous Integration From e69b42a9bf5ea9fd1266e4220b387977d5c5628b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Mon, 23 Oct 2023 12:00:36 +0200 Subject: [PATCH 05/20] code formatted --- labview/labview_rabbitmq.c | 408 ++++++++++++++++++++++--------------- 1 file changed, 239 insertions(+), 169 deletions(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index a575cc74..8321bbde 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -10,79 +10,99 @@ #include -MgErr copyStringToLStrHandle(char* cpString, LStrHandle LVString) +MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString) { - int32 len = strlen(cpString); - MgErr err = NumericArrayResize(uB, 1, (UHandle*)&LVString, len); - if (!err) - { - strncpy((*LVString)->str, cpString, len); // copying the string into the handle - (*LVString)->cnt = len; // telling the Handle what string size to expect - } - return err; + int32 len = strlen(cpString); + MgErr err = NumericArrayResize(uB, 1, (UHandle*) &LVString, len); + if (!err) + { + strncpy((*LVString)->str, cpString, len); // copying the string into the handle + (*LVString)->cnt = len; // telling the Handle what string size to expect + } + + return err; } -MgErr copyBufferToLStrHandle(const void* buffer, int len, LStrHandle LVString) +MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) { + MgErr err = NumericArrayResize(uB, 1, (UHandle*) &LVString, len); + if (!err) + { + memcpy((*LVString)->str, buffer, len); // copying the string into the handle + (*LVString)->cnt = len; // telling the Handle what string size to expect + } - MgErr err = NumericArrayResize(uB, 1, (UHandle*)&LVString, len); - if (!err) - { - memcpy((*LVString)->str, buffer, len); // copying the string into the handle - (*LVString)->cnt = len; // telling the Handle what string size to expect - } - return err; + + + + + + + return err; } // Function to split a string into multiple strings based on the ";" separator -char** splitString(char* input, int* count) { - char** tokens = NULL; - char* token = strtok(input, ";"); - *count = 0; - - while (token != NULL) { - tokens = (char**)realloc(tokens, ((*count) + 1) * sizeof(char*)); - if (tokens == NULL) { - perror("Memory allocation error"); - exit(1); - } - tokens[(*count)] = strdup(token); - (*count)++; - token = strtok(NULL, ";"); - } - - return tokens; +char **splitString(char *input, int *count) +{ + char **tokens = NULL; + char *token = strtok(input, ";"); + *count = 0; + + while (token != NULL) + { + tokens = (char **) realloc(tokens, ((*count) + 1) *sizeof(char*)); + if (tokens == NULL) + { + perror("Memory allocation error"); + exit(1); + + + + + } + + tokens[(*count)] = strdup(token); + (*count) ++; + token = strtok(NULL, ";"); + } + + return tokens; } -void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, LStrHandle cheaders_value) { +void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, LStrHandle cheaders_value) +{ // Calculate required buffer size for concatenated headers keys and values, // following strings will be separated by ";". Buffer will be finished with Null-terminate the C string. // Init counters with num of elements-1 (number of separators) + 1 ('\0') - int required_buffer_size_keys = (table->num_entries-1) +1; - int required_buffer_size_values = (table->num_entries-1) +1; - for (int i = 0; i < table->num_entries; i++) { + int required_buffer_size_keys = (table->num_entries - 1) + 1; + int required_buffer_size_values = (table->num_entries - 1) + 1; + for (int i = 0; i < table->num_entries; i++) + { required_buffer_size_keys += table->entries[i].key.len; required_buffer_size_values += table->entries[i].value.value.bytes.len; } - // Allocate memory for temp strings - char *keys = (char *)malloc(required_buffer_size_keys * sizeof(char)); - char *values = (char *)malloc(required_buffer_size_values * sizeof(char)); - int keys_offset=0; - int values_offset=0; - for (int i = 0; i < table->num_entries; i++) { - memcpy(keys+keys_offset, table->entries[i].key.bytes, table->entries[i].key.len); + // Allocate memory for temp strings + char *keys = (char*) malloc(required_buffer_size_keys* sizeof(char)); + char *values = (char*) malloc(required_buffer_size_values* sizeof(char)); + + int keys_offset = 0; + int values_offset = 0; + for (int i = 0; i < table->num_entries; i++) + { + memcpy(keys + keys_offset, table->entries[i].key.bytes, table->entries[i].key.len); keys_offset += table->entries[i].key.len; - keys[keys_offset]=';'; + keys[keys_offset] = ';'; keys_offset++; - memcpy(values+values_offset, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); + memcpy(values + values_offset, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); values_offset += table->entries[i].value.value.bytes.len; - values[values_offset]=';'; + values[values_offset] = ';'; values_offset++; } + // add Null-terminate the C string - keys[keys_offset-1]='\0'; - values[values_offset-1]='\0'; + keys[keys_offset - 1] = '\0'; + values[values_offset - 1] = '\0'; copyStringToLStrHandle(keys, cheaders_key); copyStringToLStrHandle(values, cheaders_value); @@ -91,138 +111,167 @@ void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, free(values); } -char* amqpBytesToString(amqp_bytes_t input) { - // Allocate memory for the C string and a null terminator - char* result = (char*)malloc(input.len + 1); +char *amqpBytesToString(amqp_bytes_t input) +{ + // Allocate memory for the C string and a null terminator + char *result = (char*) malloc(input.len + 1); - if (result == NULL) { - fprintf(stderr, "Memory allocation failed\n"); - exit(1); - } + if (result == NULL) + { + fprintf(stderr, "Memory allocation failed\n"); + exit(1); + } - // Copy the data from amqp_bytes_t to the C string - memcpy(result, input.bytes, input.len); + // Copy the data from amqp_bytes_t to the C string + memcpy(result, input.bytes, input.len); - // Null-terminate the C string - result[input.len] = '\0'; + // Null-terminate the C string + result[input.len] = '\0'; - return result; + return result; } LABVIEW_PUBLIC_FUNCTION -char* lv_rabbitmq_version(void) { - char* VERSION = "0.0.1"; +char *lv_rabbitmq_version(void) +{ + char *VERSION = "0.0.1"; return VERSION; } -/* This function is a modified version of the `die_on_amqp_error` function used in examples, +/*This function is a modified version of the `die_on_amqp_error` function used in examples, enhanced with LabVIEW string support.*/ -int lv_report_amqp_error(amqp_rpc_reply_t x, char const *context, LStrHandle error_description) { +int lv_report_amqp_error(amqp_rpc_reply_t x, char + const *context, LStrHandle error_description) +{ unsigned char temp_str[MAX_ERROR_DESCRIPTION_LENGTH]; int err; - switch (x.reply_type) { + switch (x.reply_type) + { case AMQP_RESPONSE_NORMAL: return _AMQP_RESPONSE_NORMAL; case AMQP_RESPONSE_NONE: snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: missing RPC reply type!", context); err = copyStringToLStrHandle(temp_str, error_description); - if (err){ + if (err) + { return err; } + return _AMQP_RESPONSE_NONE; case AMQP_RESPONSE_LIBRARY_EXCEPTION: snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: %s ", context, amqp_error_string2(x.library_error)); err = copyStringToLStrHandle(temp_str, error_description); - if (err){ + if (err) + { return err; } + return _AMQP_RESPONSE_LIBRARY_EXCEPTION; case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = - (amqp_connection_close_t *)x.reply.decoded; - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server connection error %uh, message: %.*s ", - context, m->reply_code, (int)m->reply_text.len, - (char *)m->reply_text.bytes); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_SERVER_EXCEPTION; + switch (x.reply.id) + { + case AMQP_CONNECTION_CLOSE_METHOD: + { + amqp_connection_close_t *m = + (amqp_connection_close_t*) x.reply.decoded; + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server connection error %uh, message: %.*s ", + context, m->reply_code, (int) m->reply_text.len, + (char*) m->reply_text.bytes); + err = copyStringToLStrHandle(temp_str, error_description); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + case AMQP_CHANNEL_CLOSE_METHOD: + { + amqp_channel_close_t *m = (amqp_channel_close_t*) x.reply.decoded; + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server channel error %uh, message: %.*s ", + context, m->reply_code, (int) m->reply_text.len, + (char*) m->reply_text.bytes); + err = copyStringToLStrHandle(temp_str, error_description); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + default: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: unknown server error, method id 0x%08X ", + context, x.reply.id); + err = copyStringToLStrHandle(temp_str, error_description); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded; - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server channel error %uh, message: %.*s ", - context, m->reply_code, (int)m->reply_text.len, - (char *)m->reply_text.bytes); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - default: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: unknown server error, method id 0x%08X ", - context, x.reply.id); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - break; + + + break; } } LABVIEW_PUBLIC_FUNCTION -int64_t lv_amqp_new_connection() { +int64_t lv_amqp_new_connection() +{ // conn is an opaque struct pointer amqp_connection_state_t conn = amqp_new_connection(); // cast to int64_t so LabVIEW will receive it as // a 64-bit integer transfered by value - int64_t conn_intptr = (int64_t)conn; - return conn_intptr; + int64_t conn_intptr = (int64_t) conn; + return conn_intptr; } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle error_description) { +int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle error_description) +{ // cast back to amqp_connection_state_t opaque struct pointer - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; - return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),"Connection close", error_description); + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Connection close", error_description); } LABVIEW_PUBLIC_FUNCTION -void lv_amqp_destroy_connection(int64_t conn_intptr) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +void lv_amqp_destroy_connection(int64_t conn_intptr) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_destroy_connection(conn); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_channel_open(conn, channel); return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Opening channel", error_description); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; return lv_report_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel", error_description); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_boolean_t PASSIVE = 0; amqp_boolean_t DURABLE = 0; @@ -232,111 +281,128 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *excha amqp_cstring_bytes(exchangetype), PASSIVE, DURABLE, AUTO_DELETE, INTERNAL, amqp_empty_table); - return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", error_description); + return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", error_description); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle error_description) { - +int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle error_description) +{ int status; amqp_socket_t *socket = NULL; - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; struct timeval tval; tval.tv_sec = timeout_sec; tval.tv_usec = 0; socket = amqp_tcp_socket_new(conn); - if (!socket) { + if (!socket) + { return _CREATING_TCP_SOCKET; - } + } + status = amqp_socket_open_noblock(socket, host, port, &tval); - if (status<0) { - return _OPENING_TCP_SOCKET; - } + if (status < 0) + { + return _OPENING_TCP_SOCKET; + } + /*Code explanation: socket is set/stored in the connection state, so we dont need to destroy this socket because - it will be destroyed along with connection state destroy function*/ - - char const *VHOST = "/"; // the virtual host to connect to on the broker. The default on most brokers is "/" - int const CHANNEL_MAX = 0; // the limit for number of channels for the connection. 0 means no limit. - int const FRAME_MAX = 131072; // the maximum size of an AMQP frame. 131072 is the default. - // 4096 is the minimum size, 2^31-1 is the maximum, a good default is 131072 (128KB), - int const HEARTBEAT = 0; // the number of seconds between heartbeat frames to request of the broker. A value of 0 disables heartbeats. - - return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", error_description); + it will be destroyed along with connection state destroy function*/ + + char + const *VHOST = "/"; // the virtual host to connect to on the broker. The default on most brokers is "/" + int + const CHANNEL_MAX = 0; // the limit for number of channels for the connection. 0 means no limit. + int + const FRAME_MAX = 131072; // the maximum size of an AMQP frame. 131072 is the default. + // 4096 is the minimum size, 2^31-1 is the maximum, a good default is 131072 (128KB), + int + const HEARTBEAT = 0; // the number of seconds between heartbeat frames to request of the broker. A value of 0 disables heartbeats. + + return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", error_description); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); - props.delivery_mode = 2; /* persistent delivery mode */ + props.delivery_mode = 2; /*persistent delivery mode */ int count, count2; - char** headers_key = splitString(cheaders_key, &count); - char** headers_value = splitString(cheaders_value, &count2); - if (headers_key != NULL) { + char **headers_key = splitString(cheaders_key, &count); + char **headers_value = splitString(cheaders_value, &count2); + if (headers_key != NULL) + { // Update flags to use custom headers props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; // Allocate memory for custom headers - amqp_table_t *table=&props.headers; - props.headers.num_entries=count; - props.headers.entries=calloc(props.headers.num_entries, sizeof(amqp_table_entry_t)); + amqp_table_t *table = &props.headers; + props.headers.num_entries = count; + props.headers.entries = calloc(props.headers.num_entries, sizeof(amqp_table_entry_t)); // update headers content - for (int i = 0; i < count; i++) { - (table->entries[i]).key=amqp_cstring_bytes(headers_key[i]); - ((table->entries[i]).value).kind=AMQP_FIELD_KIND_BYTES; - ((table->entries[i]).value).value.bytes=amqp_cstring_bytes(headers_value[i]); - } + for (int i = 0; i < count; i++) + { (table->entries[i]).key = amqp_cstring_bytes(headers_key[i]); + ((table->entries[i]).value).kind = AMQP_FIELD_KIND_BYTES; + ((table->entries[i]).value).value.bytes = amqp_cstring_bytes(headers_value[i]); + } } - int error_code = error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); + int error_code = error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); // Dereference headers - if (headers_key != NULL) { - for (int i = 0; i < count; i++) { - free(headers_key[i]); + if (headers_key != NULL) + { + for (int i = 0; i < count; i++) + { + free(headers_key[i]); free(headers_value[i]); - } - free(headers_key); + } + + free(headers_key); free(headers_value); } + return error_code; //this function returns amqp_status_enum thats different from amqp_rpc_reply_t } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; amqp_bytes_t queuename; - amqp_boolean_t PASSIVE = 0; + amqp_boolean_t PASSIVE = 0; amqp_boolean_t DURABLE = 0; amqp_boolean_t EXCLUSIVE = 0; amqp_boolean_t AUTO_DELETE = 1; amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description); + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description); copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); return status; } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", error_description); - if (status != 1) { + if (status != 1) + { return status; } @@ -344,16 +410,17 @@ int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, c amqp_boolean_t NO_ACK = 1; amqp_boolean_t EXCLUSIVE2 = 0; amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename), amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); - /* amqp_basic_consume is used to register a consumer on the queue, + /*amqp_basic_consume is used to register a consumer on the queue, so that the broker will start delivering messages to it.*/ status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description); - return status; + return status; } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; struct timeval tval; @@ -364,18 +431,21 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out //amqp_maybe_release_buffers(conn); - status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0),"Consuming message", error_description); - if (status != 1) { + status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0), "Consuming message", error_description); + if (status != 1) + { return status; } - if (envelope.message.body.len > 0) { + if (envelope.message.body.len > 0) + { copyBufferToLStrHandle(envelope.message.body.bytes, envelope.message.body.len, output); } // Check message headers amqp_table_t *headers = &envelope.message.properties.headers; - if (headers->num_entries > 0) { + if (headers->num_entries > 0) + { getConcatenatedMessageHeaders(headers, cheaders_key, cheaders_value); } From 23181329b242ff939fd4255ea25f62f4c93c50f9 Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Mon, 23 Oct 2023 10:01:36 +0000 Subject: [PATCH 06/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 328f1616..3e4be45c 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-10-22 +## 0.0.1-dev - 2023-10-23 ### Features From d74ea7abb07d26bf1f3090d256acbc0ae6718333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Tue, 24 Oct 2023 16:37:55 +0200 Subject: [PATCH 07/20] received headers type decode --- .vscode/settings.json | 6 +++++- labview/labview_rabbitmq.c | 42 +++++++++++++++++++++++++++++++++----- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index d89833d0..3ea57581 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -13,6 +13,10 @@ "chrono": "c", "algorithm": "c", "sstream": "c", - "random": "c" + "random": "c", + "*.tcc": "c", + "memory_resource": "c", + "fstream": "c", + "memory": "c" } } \ No newline at end of file diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 8321bbde..3c21aa6e 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -79,13 +79,27 @@ void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, for (int i = 0; i < table->num_entries; i++) { required_buffer_size_keys += table->entries[i].key.len; - required_buffer_size_values += table->entries[i].value.value.bytes.len; + switch (table->entries[i].value.kind) { + case AMQP_FIELD_KIND_I8: + required_buffer_size_values += 1; + break; + case AMQP_FIELD_KIND_I64: + required_buffer_size_values += 8; + break; + case AMQP_FIELD_KIND_UTF8: + required_buffer_size_values += table->entries[i].value.value.bytes.len; + break; + default: + required_buffer_size_values += 0; + break; + } } // Allocate memory for temp strings char *keys = (char*) malloc(required_buffer_size_keys* sizeof(char)); char *values = (char*) malloc(required_buffer_size_values* sizeof(char)); + char *byteArray; int keys_offset = 0; int values_offset = 0; for (int i = 0; i < table->num_entries; i++) @@ -94,18 +108,36 @@ void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, keys_offset += table->entries[i].key.len; keys[keys_offset] = ';'; keys_offset++; - memcpy(values + values_offset, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); - values_offset += table->entries[i].value.value.bytes.len; + + switch (table->entries[i].value.kind) { + case AMQP_FIELD_KIND_I8: + values[values_offset]=table->entries[i].value.value.i8; //check if not zero! + values_offset++; + break; + case AMQP_FIELD_KIND_I64: + byteArray=&(table->entries[i].value.value.i64); + memcpy(values + values_offset, byteArray, 8); + values_offset+=8; + break; + case AMQP_FIELD_KIND_UTF8: + memcpy(values + values_offset, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); + values_offset+=table->entries[i].value.value.bytes.len; + break; + default: + required_buffer_size_values += 0; + break; + } values[values_offset] = ';'; values_offset++; } // add Null-terminate the C string keys[keys_offset - 1] = '\0'; - values[values_offset - 1] = '\0'; + values[values_offset -1] = '\0'; copyStringToLStrHandle(keys, cheaders_key); - copyStringToLStrHandle(values, cheaders_value); + //copyStringToLStrHandle(values, cheaders_value); + copyBufferToLStrHandle(values, required_buffer_size_values, cheaders_value); free(keys); free(values); From 36632e4addf8698a0aee49a0d411ccb39421744a Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Tue, 24 Oct 2023 14:38:47 +0000 Subject: [PATCH 08/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 3e4be45c..c5959d4c 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-10-23 +## 0.0.1-dev - 2023-10-24 ### Features From f7541d98f84fb174885758f6e765405f80e595c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Tue, 24 Oct 2023 22:15:13 +0200 Subject: [PATCH 09/20] more types of message headers supported in consume --- labview/labview_rabbitmq.c | 117 +++++++++++++++---------------------- labview/labview_rabbitmq.h | 2 +- 2 files changed, 49 insertions(+), 70 deletions(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 3c21aa6e..21a077ec 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -9,7 +9,6 @@ #include #include - MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString) { int32 len = strlen(cpString); @@ -32,12 +31,6 @@ MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) (*LVString)->cnt = len; // telling the Handle what string size to expect } - - - - - - return err; } @@ -55,10 +48,6 @@ char **splitString(char *input, int *count) { perror("Memory allocation error"); exit(1); - - - - } tokens[(*count)] = strdup(token); @@ -69,78 +58,80 @@ char **splitString(char *input, int *count) return tokens; } -void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders_key, LStrHandle cheaders_value) +void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders) { - // Calculate required buffer size for concatenated headers keys and values, - // following strings will be separated by ";". Buffer will be finished with Null-terminate the C string. - // Init counters with num of elements-1 (number of separators) + 1 ('\0') - int required_buffer_size_keys = (table->num_entries - 1) + 1; - int required_buffer_size_values = (table->num_entries - 1) + 1; + // Calculate required buffer size for concatenated headers keys and values separated by "=", + // following headers will be separated by ";". Buffer will be finished with Null-terminate the C string. + int required_buffer_size = 0; for (int i = 0; i < table->num_entries; i++) { - required_buffer_size_keys += table->entries[i].key.len; - switch (table->entries[i].value.kind) { + required_buffer_size++; // character that indicates a type + required_buffer_size += table->entries[i].key.len; //key + required_buffer_size++; //'='separator + switch (table->entries[i].value.kind) + { case AMQP_FIELD_KIND_I8: - required_buffer_size_values += 1; + required_buffer_size += 1; break; case AMQP_FIELD_KIND_I64: - required_buffer_size_values += 8; + required_buffer_size += 8; break; case AMQP_FIELD_KIND_UTF8: - required_buffer_size_values += table->entries[i].value.value.bytes.len; + required_buffer_size += table->entries[i].value.value.bytes.len; break; default: - required_buffer_size_values += 0; + required_buffer_size += 0; break; } + + required_buffer_size++; //';'separator or Null-terminate } - // Allocate memory for temp strings - char *keys = (char*) malloc(required_buffer_size_keys* sizeof(char)); - char *values = (char*) malloc(required_buffer_size_values* sizeof(char)); + // Allocate memory for temp buffer + char *headers = (char*) malloc(required_buffer_size* sizeof(char)); char *byteArray; - int keys_offset = 0; - int values_offset = 0; + int index = 0; for (int i = 0; i < table->num_entries; i++) { - memcpy(keys + keys_offset, table->entries[i].key.bytes, table->entries[i].key.len); - keys_offset += table->entries[i].key.len; - keys[keys_offset] = ';'; - keys_offset++; - - switch (table->entries[i].value.kind) { + headers[index] = table->entries[i].value.kind; // character that indicates a type + index++; + + memcpy(headers + index, table->entries[i].key.bytes, table->entries[i].key.len); + index += table->entries[i].key.len; + headers[index] = '='; + index++; + + switch (table->entries[i].value.kind) + { case AMQP_FIELD_KIND_I8: - values[values_offset]=table->entries[i].value.value.i8; //check if not zero! - values_offset++; + headers[index] = table->entries[i].value.value.i8; + index++; break; case AMQP_FIELD_KIND_I64: - byteArray=&(table->entries[i].value.value.i64); - memcpy(values + values_offset, byteArray, 8); - values_offset+=8; + for (int j = 0; j < 8; j++) + { + headers[index + j] = (table->entries[i].value.value.i64 >> (j *8)) &0xFF; + } + + index += 8; break; case AMQP_FIELD_KIND_UTF8: - memcpy(values + values_offset, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); - values_offset+=table->entries[i].value.value.bytes.len; + memcpy(headers + index, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); + index += table->entries[i].value.value.bytes.len; break; default: - required_buffer_size_values += 0; break; } - values[values_offset] = ';'; - values_offset++; - } - - // add Null-terminate the C string - keys[keys_offset - 1] = '\0'; - values[values_offset -1] = '\0'; - copyStringToLStrHandle(keys, cheaders_key); - //copyStringToLStrHandle(values, cheaders_value); - copyBufferToLStrHandle(values, required_buffer_size_values, cheaders_value); + headers[index] = ';'; + index++; + } - free(keys); - free(values); + // replace last ';' with Null-terminate the C string + headers[index - 1] = '\0'; + copyBufferToLStrHandle(headers, required_buffer_size, cheaders); + free(headers); } char *amqpBytesToString(amqp_bytes_t input) @@ -248,12 +239,10 @@ int lv_report_amqp_error(amqp_rpc_reply_t x, char return _AMQP_RESPONSE_SERVER_EXCEPTION; } - break; } } - LABVIEW_PUBLIC_FUNCTION int64_t lv_amqp_new_connection() { @@ -265,7 +254,6 @@ int64_t lv_amqp_new_connection() return conn_intptr; } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle error_description) { @@ -274,7 +262,6 @@ int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle error_description) return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Connection close", error_description); } - LABVIEW_PUBLIC_FUNCTION void lv_amqp_destroy_connection(int64_t conn_intptr) { @@ -282,7 +269,6 @@ void lv_amqp_destroy_connection(int64_t conn_intptr) amqp_destroy_connection(conn); } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) { @@ -291,7 +277,6 @@ int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Opening channel", error_description); } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) { @@ -299,7 +284,6 @@ int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle erro return lv_report_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel", error_description); } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle error_description) { @@ -316,7 +300,6 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchan return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", error_description); } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle error_description) { @@ -358,7 +341,6 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", error_description); } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description) { @@ -406,7 +388,6 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, //this function returns amqp_status_enum thats different from amqp_rpc_reply_t } - LABVIEW_PUBLIC_FUNCTION int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) { @@ -448,9 +429,8 @@ int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, ch return status; } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description) +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; @@ -478,10 +458,9 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out amqp_table_t *headers = &envelope.message.properties.headers; if (headers->num_entries > 0) { - getConcatenatedMessageHeaders(headers, cheaders_key, cheaders_value); + getConcatenatedMessageHeaders(headers, cheaders); } amqp_destroy_envelope(&envelope); return status; -} - +} \ No newline at end of file diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 59d632a3..06eea1cb 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -50,7 +50,7 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queu int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description); -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders_key, LStrHandle cheaders_value, LStrHandle error_description); +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders, LStrHandle error_description); #endif /* LABVIEW_RABBITMQ_H */ From d7e5c26e9d70fea931542e0df5d8304378142c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Sat, 28 Oct 2023 13:12:15 +0200 Subject: [PATCH 10/20] publish message with headers works --- labview/labview_rabbitmq.c | 180 +++++++++++++++++++++++++++++++------ labview/labview_rabbitmq.h | 2 +- 2 files changed, 156 insertions(+), 26 deletions(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 21a077ec..c4e1f0d3 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -34,6 +34,18 @@ MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) return err; } +char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32_t offset, char toFind){ + uint32_t i=offset; + for (i; inum_entries = numEntries; + table->entries = entries; +} + + void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders) { // Calculate required buffer size for concatenated headers keys and values separated by "=", @@ -342,7 +461,7 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description) +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_basic_properties_t props; @@ -350,38 +469,23 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /*persistent delivery mode */ - int count, count2; - char **headers_key = splitString(cheaders_key, &count); - char **headers_value = splitString(cheaders_value, &count2); - if (headers_key != NULL) + + amqp_table_t *table; + if (headerBufferLen != 0) { // Update flags to use custom headers props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; - // Allocate memory for custom headers - amqp_table_t *table = &props.headers; - props.headers.num_entries = count; - props.headers.entries = calloc(props.headers.num_entries, sizeof(amqp_table_entry_t)); - // update headers content - for (int i = 0; i < count; i++) - { (table->entries[i]).key = amqp_cstring_bytes(headers_key[i]); - ((table->entries[i]).value).kind = AMQP_FIELD_KIND_BYTES; - ((table->entries[i]).value).value.bytes = amqp_cstring_bytes(headers_value[i]); - } + table = &props.headers; + //props.expiration = amqp_cstring_bytes("60000"); + parseHeaders(table, headerBuffer, headerBufferLen); } - int error_code = error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); + int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); // Dereference headers - if (headers_key != NULL) + if (headerBufferLen != 0) { - for (int i = 0; i < count; i++) - { - free(headers_key[i]); - free(headers_value[i]); - } - - free(headers_key); - free(headers_value); + free(table->entries); } return error_code; @@ -463,4 +567,30 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out amqp_destroy_envelope(&envelope); return status; +} + +LABVIEW_PUBLIC_FUNCTION +void lv_test(LStrHandle outputA, LStrHandle outputB, LStrHandle inputA, uint8_t *headerBuffer, int32_t headerBufferLen){ + char outputA_str[100]; + char outputB_str[100]; + + amqp_table_t headers; + amqp_table_t* headerPtr = &headers; + //parseHeaders(headerPtr, intputB, intputBlen); + char* delimiter = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3B); + char* equal_char = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3D); + + int pos1 = delimiter - (char*)headerBuffer; + int pos2 = equal_char - (char*)headerBuffer; + + //char valueTempBuffer [8]; + int valueLength = delimiter - equal_char - 1; + memcpy(outputA_str, equal_char + 1, valueLength); + + //sprintf(outputA_str, "%d", pos1); + sprintf(outputB_str, "%d", pos2); + + copyStringToLStrHandle(outputA_str, outputA); + copyStringToLStrHandle(outputB_str, outputB); + free(headerPtr->entries); } \ No newline at end of file diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 06eea1cb..ded0fd0d 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -44,7 +44,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char* exchan int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description); +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description); int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description); From 736887197cefa6a69c844ef99a54ced0dc287ab9 Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Sat, 28 Oct 2023 11:13:07 +0000 Subject: [PATCH 11/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c5959d4c..ab6b7ae2 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-10-24 +## 0.0.1-dev - 2023-10-28 ### Features From 70c124416dbba69fe46c1f2efa0f5d6344643981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Sat, 28 Oct 2023 21:33:39 +0200 Subject: [PATCH 12/20] code restructured --- .vscode/settings.json | 3 +- labview/CMakeLists.txt | 2 + labview/labview_rabbitmq.c | 550 ++++++++++--------------------------- labview/labview_types.c | 92 +++++++ labview/labview_types.h | 39 +++ labview/msg_headers.c | 270 ++++++++++++++++++ labview/msg_headers.h | 28 ++ 7 files changed, 578 insertions(+), 406 deletions(-) create mode 100644 labview/labview_types.c create mode 100644 labview/labview_types.h create mode 100644 labview/msg_headers.c create mode 100644 labview/msg_headers.h diff --git a/.vscode/settings.json b/.vscode/settings.json index 3ea57581..ba92d194 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -17,6 +17,7 @@ "*.tcc": "c", "memory_resource": "c", "fstream": "c", - "memory": "c" + "memory": "c", + "labview_types.h": "c" } } \ No newline at end of file diff --git a/labview/CMakeLists.txt b/labview/CMakeLists.txt index ddca6b77..608484e1 100644 --- a/labview/CMakeLists.txt +++ b/labview/CMakeLists.txt @@ -3,6 +3,8 @@ set(RABBITMQ_LV_SOURCES labview_rabbitmq.c + labview_types.c labview_types.h + msg_headers.c msg_headers.h utils.c utils.h unix/platform_utils.c ) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index c4e1f0d3..856d0c12 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -5,273 +5,12 @@ #include "utils.h" #include "extcode.h" #include "labview_rabbitmq.h" +#include "labview_types.h" +#include "msg_headers.h" #include #include #include -MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString) -{ - int32 len = strlen(cpString); - MgErr err = NumericArrayResize(uB, 1, (UHandle*) &LVString, len); - if (!err) - { - strncpy((*LVString)->str, cpString, len); // copying the string into the handle - (*LVString)->cnt = len; // telling the Handle what string size to expect - } - - return err; -} - -MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) -{ - MgErr err = NumericArrayResize(uB, 1, (UHandle*) &LVString, len); - if (!err) - { - memcpy((*LVString)->str, buffer, len); // copying the string into the handle - (*LVString)->cnt = len; // telling the Handle what string size to expect - } - - return err; -} - -char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32_t offset, char toFind){ - uint32_t i=offset; - for (i; inum_entries = numEntries; - table->entries = entries; -} - - -void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders) -{ - // Calculate required buffer size for concatenated headers keys and values separated by "=", - // following headers will be separated by ";". Buffer will be finished with Null-terminate the C string. - int required_buffer_size = 0; - for (int i = 0; i < table->num_entries; i++) - { - required_buffer_size++; // character that indicates a type - required_buffer_size += table->entries[i].key.len; //key - required_buffer_size++; //'='separator - switch (table->entries[i].value.kind) - { - case AMQP_FIELD_KIND_I8: - required_buffer_size += 1; - break; - case AMQP_FIELD_KIND_I64: - required_buffer_size += 8; - break; - case AMQP_FIELD_KIND_UTF8: - required_buffer_size += table->entries[i].value.value.bytes.len; - break; - default: - required_buffer_size += 0; - break; - } - - required_buffer_size++; //';'separator or Null-terminate - } - - // Allocate memory for temp buffer - char *headers = (char*) malloc(required_buffer_size* sizeof(char)); - - char *byteArray; - int index = 0; - for (int i = 0; i < table->num_entries; i++) - { - headers[index] = table->entries[i].value.kind; // character that indicates a type - index++; - - memcpy(headers + index, table->entries[i].key.bytes, table->entries[i].key.len); - index += table->entries[i].key.len; - headers[index] = '='; - index++; - - switch (table->entries[i].value.kind) - { - case AMQP_FIELD_KIND_I8: - headers[index] = table->entries[i].value.value.i8; - index++; - break; - case AMQP_FIELD_KIND_I64: - for (int j = 0; j < 8; j++) - { - headers[index + j] = (table->entries[i].value.value.i64 >> (j *8)) &0xFF; - } - - index += 8; - break; - case AMQP_FIELD_KIND_UTF8: - memcpy(headers + index, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); - index += table->entries[i].value.value.bytes.len; - break; - default: - break; - } - - headers[index] = ';'; - index++; - } - - // replace last ';' with Null-terminate the C string - headers[index - 1] = '\0'; - copyBufferToLStrHandle(headers, required_buffer_size, cheaders); - free(headers); -} - -char *amqpBytesToString(amqp_bytes_t input) -{ - // Allocate memory for the C string and a null terminator - char *result = (char*) malloc(input.len + 1); - - if (result == NULL) - { - fprintf(stderr, "Memory allocation failed\n"); - exit(1); - } - - // Copy the data from amqp_bytes_t to the C string - memcpy(result, input.bytes, input.len); - - // Null-terminate the C string - result[input.len] = '\0'; - - return result; -} LABVIEW_PUBLIC_FUNCTION char *lv_rabbitmq_version(void) @@ -280,88 +19,6 @@ char *lv_rabbitmq_version(void) return VERSION; } -/*This function is a modified version of the `die_on_amqp_error` function used in examples, -enhanced with LabVIEW string support.*/ -int lv_report_amqp_error(amqp_rpc_reply_t x, char - const *context, LStrHandle error_description) -{ - unsigned char temp_str[MAX_ERROR_DESCRIPTION_LENGTH]; - int err; - switch (x.reply_type) - { - case AMQP_RESPONSE_NORMAL: - return _AMQP_RESPONSE_NORMAL; - - case AMQP_RESPONSE_NONE: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: missing RPC reply type!", context); - err = copyStringToLStrHandle(temp_str, error_description); - if (err) - { - return err; - } - - return _AMQP_RESPONSE_NONE; - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: %s ", context, amqp_error_string2(x.library_error)); - err = copyStringToLStrHandle(temp_str, error_description); - if (err) - { - return err; - } - - return _AMQP_RESPONSE_LIBRARY_EXCEPTION; - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) - { - case AMQP_CONNECTION_CLOSE_METHOD: - { - amqp_connection_close_t *m = - (amqp_connection_close_t*) x.reply.decoded; - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server connection error %uh, message: %.*s ", - context, m->reply_code, (int) m->reply_text.len, - (char*) m->reply_text.bytes); - err = copyStringToLStrHandle(temp_str, error_description); - if (err) - { - return err; - } - - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - - case AMQP_CHANNEL_CLOSE_METHOD: - { - amqp_channel_close_t *m = (amqp_channel_close_t*) x.reply.decoded; - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server channel error %uh, message: %.*s ", - context, m->reply_code, (int) m->reply_text.len, - (char*) m->reply_text.bytes); - err = copyStringToLStrHandle(temp_str, error_description); - if (err) - { - return err; - } - - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - - default: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: unknown server error, method id 0x%08X ", - context, x.reply.id); - err = copyStringToLStrHandle(temp_str, error_description); - if (err) - { - return err; - } - - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - - break; - } -} - LABVIEW_PUBLIC_FUNCTION int64_t lv_amqp_new_connection() { @@ -374,11 +31,11 @@ int64_t lv_amqp_new_connection() } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle error_description) +int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle errorDescription) { // cast back to amqp_connection_state_t opaque struct pointer amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; - return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Connection close", error_description); + return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Connection close", errorDescription); } LABVIEW_PUBLIC_FUNCTION @@ -389,22 +46,22 @@ void lv_amqp_destroy_connection(int64_t conn_intptr) } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) +int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_channel_open(conn, channel); - return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Opening channel", error_description); + return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Opening channel", errorDescription); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) +int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; - return lv_report_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel", error_description); + return lv_report_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel", errorDescription); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle error_description) +int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; @@ -416,11 +73,11 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchan amqp_cstring_bytes(exchangetype), PASSIVE, DURABLE, AUTO_DELETE, INTERNAL, amqp_empty_table); - return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", error_description); + return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", errorDescription); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle error_description) +int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle errorDescription) { int status; amqp_socket_t *socket = NULL; @@ -447,53 +104,20 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch so we dont need to destroy this socket because it will be destroyed along with connection state destroy function*/ - char - const *VHOST = "/"; // the virtual host to connect to on the broker. The default on most brokers is "/" - int - const CHANNEL_MAX = 0; // the limit for number of channels for the connection. 0 means no limit. - int - const FRAME_MAX = 131072; // the maximum size of an AMQP frame. 131072 is the default. - // 4096 is the minimum size, 2^31-1 is the maximum, a good default is 131072 (128KB), - int - const HEARTBEAT = 0; // the number of seconds between heartbeat frames to request of the broker. A value of 0 disables heartbeats. - - return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", error_description); -} + char const *VHOST = "/"; // the virtual host to connect to on the broker. + // The default on most brokers is "/" + int const CHANNEL_MAX = 0; // the limit for number of channels for the connection. 0 means no limit. + int const FRAME_MAX = 131072; // the maximum size of an AMQP frame. 131072 is the default. + // 4096 is the minimum size, 2^31-1 is the maximum, + // a good default is 131072 (128KB), + int const HEARTBEAT = 0; // the number of seconds between heartbeat frames to request + // of the broker. A value of 0 disables heartbeats. -LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description) -{ - amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; - amqp_basic_properties_t props; - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; - props.content_type = amqp_cstring_bytes("text/plain"); - props.delivery_mode = 2; /*persistent delivery mode */ - - - amqp_table_t *table; - if (headerBufferLen != 0) - { - // Update flags to use custom headers - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; - table = &props.headers; - //props.expiration = amqp_cstring_bytes("60000"); - parseHeaders(table, headerBuffer, headerBufferLen); - } - - int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); - - // Dereference headers - if (headerBufferLen != 0) - { - free(table->entries); - } - - return error_code; - //this function returns amqp_status_enum thats different from amqp_rpc_reply_t + return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", errorDescription); } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description) +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; @@ -505,19 +129,19 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue amqp_boolean_t AUTO_DELETE = 1; amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description); + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", errorDescription); copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); return status; } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description) +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", error_description); + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", errorDescription); if (status != 1) { return status; @@ -529,12 +153,12 @@ int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, ch amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename), amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); /*amqp_basic_consume is used to register a consumer on the queue, so that the broker will start delivering messages to it.*/ - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description); + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", errorDescription); return status; } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders, LStrHandle error_description) +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle concatenatedHeaders, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; @@ -547,7 +171,7 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out //amqp_maybe_release_buffers(conn); - status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0), "Consuming message", error_description); + status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0), "Consuming message", errorDescription); if (status != 1) { return status; @@ -562,13 +186,47 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out amqp_table_t *headers = &envelope.message.properties.headers; if (headers->num_entries > 0) { - getConcatenatedMessageHeaders(headers, cheaders); + headersToString(headers, concatenatedHeaders); } amqp_destroy_envelope(&envelope); return status; } +LABVIEW_PUBLIC_FUNCTION +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, char *messageBody, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = 2; /*persistent delivery mode */ + + + amqp_table_t *table; + if (msgHeaderBufLen > 0) + { + // Update flags to use custom headers + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; + table = &props.headers; + stringToHeaders(table, msgHeaderBuf, msgHeaderBufLen); + } + + int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, amqp_cstring_bytes(messageBody)); + + // Free allocated headers + if (msgHeaderBufLen > 0) + { + free(table->entries); + } + + return error_code; + //this function returns amqp_status_enum thats different from amqp_rpc_reply_t +} + +/** + * This function is defined for testing purposes only. + */ LABVIEW_PUBLIC_FUNCTION void lv_test(LStrHandle outputA, LStrHandle outputB, LStrHandle inputA, uint8_t *headerBuffer, int32_t headerBufferLen){ char outputA_str[100]; @@ -593,4 +251,86 @@ void lv_test(LStrHandle outputA, LStrHandle outputB, LStrHandle inputA, uint8_t copyStringToLStrHandle(outputA_str, outputA); copyStringToLStrHandle(outputB_str, outputB); free(headerPtr->entries); -} \ No newline at end of file +} + +/*This function is a modified version of the `die_on_amqp_error` function used in examples, +enhanced with LabVIEW string support.*/ +int lv_report_amqp_error(amqp_rpc_reply_t x, char + const *context, LStrHandle errorDescription) +{ + unsigned char temp_str[MAX_ERROR_DESCRIPTION_LENGTH]; + int err; + switch (x.reply_type) + { + case AMQP_RESPONSE_NORMAL: + return _AMQP_RESPONSE_NORMAL; + + case AMQP_RESPONSE_NONE: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: missing RPC reply type!", context); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_NONE; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: %s ", context, amqp_error_string2(x.library_error)); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_LIBRARY_EXCEPTION; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) + { + case AMQP_CONNECTION_CLOSE_METHOD: + { + amqp_connection_close_t *m = + (amqp_connection_close_t*) x.reply.decoded; + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server connection error %uh, message: %.*s ", + context, m->reply_code, (int) m->reply_text.len, + (char*) m->reply_text.bytes); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + case AMQP_CHANNEL_CLOSE_METHOD: + { + amqp_channel_close_t *m = (amqp_channel_close_t*) x.reply.decoded; + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server channel error %uh, message: %.*s ", + context, m->reply_code, (int) m->reply_text.len, + (char*) m->reply_text.bytes); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + default: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: unknown server error, method id 0x%08X ", + context, x.reply.id); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + break; + } +} diff --git a/labview/labview_types.c b/labview/labview_types.c new file mode 100644 index 00000000..13d8c5fd --- /dev/null +++ b/labview/labview_types.c @@ -0,0 +1,92 @@ +#include + +#include "extcode.h" + +/** + * This function copies the contents of a C string into a LabVIEW LStrHandle, + * resizing the handle if necessary. + */ +MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString) { + int32 len = strlen(cpString); + MgErr err = NumericArrayResize(uB, 1, (UHandle *)&LVString, len); + if (!err) { + strncpy((*LVString)->str, cpString, + len); // copying the string into the handle + (*LVString)->cnt = len; // telling the Handle what string size to expect + } + + return err; +} + +/** + * This function copies the contents of a binary buffer into a LabVIEW + * LStrHandle, resizing the handle if necessary. The function is designed for + * handling binary data. + */ +MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) { + MgErr err = NumericArrayResize(uB, 1, (UHandle *)&LVString, len); + if (!err) { + memcpy((*LVString)->str, buffer, + len); // copying the string into the handle + (*LVString)->cnt = len; // telling the Handle what string size to expect + } + + return err; +} + +/** + * This function searches for a specified character within a binary buffer and + * returns a pointer to the first occurrence of the character, starting from the + * given offset. + */ +char *findInBuffer(const uint8_t *buffer, uint32_t len, uint32_t offset, + char toFind) { + uint32_t i = offset; + for (i; i < len; i++) { + if (buffer[i] == toFind) { + return buffer + i; + } + } + return NULL; +} + +/** + * Split a string into an array of tokens based on a delimiter. + * Memory is allocated for the token array, and the caller is responsible for + * freeing the memory when it's no longer needed. + */ +char **splitString(char *input, int *count, char delimiter) { + char **tokens = NULL; + char *token = strtok(input, &delimiter); + *count = 0; + + while (token != NULL) { + tokens = (char **)realloc(tokens, ((*count) + 1) * sizeof(char *)); + if (tokens == NULL) { + perror("Memory allocation error"); + exit(1); + } + + tokens[(*count)] = strdup(token); + (*count)++; + token = strtok(NULL, &delimiter); + } + + return tokens; +} + +/** + * This function frees the memory allocated for an array of tokens created by + * 'splitString'. + */ +void freeTokens(char **tokens, int count) { + if (tokens == NULL) { + return; // Handle NULL input. + } + + for (int i = 0; i < count; i++) { + free(tokens[i]); // Free memory for each token. + } + + free(tokens); // Free memory for the token array itself. +} diff --git a/labview/labview_types.h b/labview/labview_types.h new file mode 100644 index 00000000..24fb1945 --- /dev/null +++ b/labview/labview_types.h @@ -0,0 +1,39 @@ +#include "extcode.h" + +#ifndef LABVIEW_TYPES_H +#define LABVIEW_TYPES_H + +/** + * This function copies the contents of a C string into a LabVIEW LStrHandle, + * resizing the handle if necessary. + */ +MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString); + +/** + * This function copies the contents of a binary buffer into a LabVIEW + * LStrHandle, resizing the handle if necessary. The function is designed for + * handling binary data. + */ +MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString); + +/** + * This function searches for a specified character within a binary buffer and + * returns a pointer to the first occurrence of the character, starting from the + * given offset. + */ +char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32_t offset, char toFind); + +/** + * Split a string into an array of tokens based on a delimiter. + * Memory is allocated for the token array, and the caller is responsible for + * freeing the memory when it's no longer needed. + */ +char **splitString(char *input, int *count); + +/** + * This function frees the memory allocated for an array of tokens created by + * 'splitString'. + */ +void freeTokens(char** tokens, int count); + +#endif diff --git a/labview/msg_headers.c b/labview/msg_headers.c new file mode 100644 index 00000000..f7a6c31d --- /dev/null +++ b/labview/msg_headers.c @@ -0,0 +1,270 @@ +#include + +#include "extcode.h" +#include "labview_types.h" + +/** + * This function parses a string representation of headers and populates an AMQP + * table with the parsed header entries. The headers in the string are expected + * to be in a specific format where key-value pairs are separated by '=' and + * multiple headers are separated by ';'. + * + * @note The `table` parameter should be a initialized `amqp_table_t` structure. + * The `headerBuffer` should point to a null-terminated C string. + * The function modifies the `table` structure in place with the parsed + * entries. + */ +void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer, + uint64_t headerBufferLen) { + amqp_table_entry_t* entries = NULL; + int numEntries = 0; + int finished = FALSE; + int index = 0; + + while (finished == 0) { + // Find the delimiter ';' + char* delimiter = findInBuffer(headerBuffer, headerBufferLen, index, 0x3B); + if (delimiter == NULL) { + delimiter = + headerBuffer + + headerBufferLen; // If no delimiter found, use the end of the string + } + if (delimiter == headerBuffer + headerBufferLen) { + finished = TRUE; + } + // Find the equal_char '=' + char* equal_char = findInBuffer(headerBuffer, headerBufferLen, index, 0x3D); + + if (equal_char != NULL && equal_char < delimiter) { + // Add the entry to the entries array + amqp_table_entry_t* newEntries = + realloc(entries, (numEntries + 1) * sizeof(amqp_table_entry_t)); + if (newEntries == NULL) { + // Error handling for memory allocation failure + perror("Memory allocation failed"); + free(entries); + return table; + } + entries = newEntries; + amqp_field_value_t value; + + // Get the kind (first byte) + value.kind = headerBuffer[index]; + index++; // Move to the next character + + // Parse the key + entries[numEntries].key.bytes = headerBuffer + index; + entries[numEntries].key.len = equal_char - ((char*)headerBuffer + index); + + // Init and parse the value + char valueTempBuffer[8]; + int valueLength = delimiter - equal_char - 1; + memcpy(valueTempBuffer, equal_char + 1, valueLength); + + // Parse the value based on the kind + switch (value.kind) { + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + value.value.bytes.bytes = equal_char + 1; // fist char after "=" + value.value.bytes.len = valueLength; + break; + case AMQP_FIELD_KIND_I8: + value.value.i8 = (int8_t)valueTempBuffer[0]; + break; + case AMQP_FIELD_KIND_U8: + value.value.u8 = (uint8_t)valueTempBuffer[0]; + break; + case AMQP_FIELD_KIND_I16: + value.value.i16 = *((int16_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_U16: + value.value.u16 = *((uint16_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_I32: + value.value.i32 = *((int32_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_U32: + value.value.u32 = *((uint32_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_I64: + value.value.i64 = *((int64_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_U64: + case AMQP_FIELD_KIND_TIMESTAMP: + value.value.u64 = *((uint64_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_F32: + value.value.f32 = *((float*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_F64: + value.value.f64 = *((double*)valueTempBuffer); + break; + default: + // Unsupported kind, ignore this header + break; + } + + entries[numEntries].value = value; + numEntries++; + } + + // Move to the next header (skip the delimiter ';') + index = delimiter - (char*)headerBuffer + 1; + } + + // Assign the entries to the table + table->num_entries = numEntries; + table->entries = entries; +} + +/** + * This function takes an AMQP table containing headers and converts them into + * a string format where the headers' keys and values are concatenated together. + * Key-value pairs are separated by '=' and multiple headers are separated by + * ';'. + */ +void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) { + // Calculate required string (char buffer) size + int required_buffer_size = 0; + for (int i = 0; i < table->num_entries; i++) { + required_buffer_size++; // character that indicates a type + required_buffer_size += table->entries[i].key.len; // key + required_buffer_size++; // '=' separator + + switch (table->entries[i].value.kind) { + case AMQP_FIELD_KIND_I8: + required_buffer_size += 1; + break; + case AMQP_FIELD_KIND_I64: + required_buffer_size += 8; + break; + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + required_buffer_size += table->entries[i].value.value.bytes.len; + break; + case AMQP_FIELD_KIND_U8: + required_buffer_size += 1; + break; + case AMQP_FIELD_KIND_I16: + required_buffer_size += 2; + break; + case AMQP_FIELD_KIND_U16: + required_buffer_size += 2; + break; + case AMQP_FIELD_KIND_I32: + required_buffer_size += 4; + break; + case AMQP_FIELD_KIND_U32: + required_buffer_size += 4; + break; + case AMQP_FIELD_KIND_U64: + required_buffer_size += 8; + break; + case AMQP_FIELD_KIND_TIMESTAMP: + required_buffer_size += 8; + break; + case AMQP_FIELD_KIND_F32: + required_buffer_size += 4; + break; + case AMQP_FIELD_KIND_F64: + required_buffer_size += 8; + break; + default: + required_buffer_size += 0; // Unsupported kind, ignore + break; + } + + required_buffer_size++; // ';' separator or Null-terminate + } + + // Allocate memory for temp buffer + char* headers = (char*)malloc(required_buffer_size * sizeof(char)); + float f32_value; + double f64_value; + + int index = 0; + for (int i = 0; i < table->num_entries; i++) { + headers[index] = table->entries[i].value.kind; // Character that indicates a type + index++; + + memcpy(headers + index, table->entries[i].key.bytes, table->entries[i].key.len); + index += table->entries[i].key.len; + headers[index] = '='; // Separator + index++; + + switch (table->entries[i].value.kind) { + case AMQP_FIELD_KIND_I8: + headers[index] = table->entries[i].value.value.i8; + index++; + break; + case AMQP_FIELD_KIND_I64: + for (int j = 0; j < 8; j++) { + headers[index + j] = (table->entries[i].value.value.i64 >> (j * 8)) & 0xFF; + } + index += 8; + break; + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + memcpy(headers + index, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); + index += table->entries[i].value.value.bytes.len; + break; + case AMQP_FIELD_KIND_U8: + headers[index] = table->entries[i].value.value.u8; + index++; + break; + case AMQP_FIELD_KIND_I16: + headers[index] = table->entries[i].value.value.i16 & 0xFF; + headers[index + 1] = (table->entries[i].value.value.i16 >> 8) & 0xFF; + index += 2; + break; + case AMQP_FIELD_KIND_U16: + headers[index] = table->entries[i].value.value.i16 & 0xFF; + headers[index + 1] = (table->entries[i].value.value.i16 >> 8) & 0xFF; + index += 2; + break; + case AMQP_FIELD_KIND_I32: + for (int j = 0; j < 4; j++) { + headers[index + j] = (table->entries[i].value.value.i32 >> (j * 8)) & 0xFF; + } + index += 4; + break; + case AMQP_FIELD_KIND_U32: + for (int j = 0; j < 4; j++) { + headers[index + j] = (table->entries[i].value.value.u32 >> (j * 8)) & 0xFF; + } + index += 4; + break; + case AMQP_FIELD_KIND_U64: + case AMQP_FIELD_KIND_TIMESTAMP: + for (int j = 0; j < 8; j++) { + headers[index + j] = (table->entries[i].value.value.u64 >> (j * 8)) & 0xFF; + } + index += 8; + break; + case AMQP_FIELD_KIND_F32: + f32_value = table->entries[i].value.value.f32; + memcpy(headers + index, &f32_value, 4); + index += 4; + break; + case AMQP_FIELD_KIND_F64: + f64_value = table->entries[i].value.value.f64; + memcpy(headers + index, &f64_value, 8); + index += 8; + break; + default: + break; // Unsupported kind, ignore + } + + headers[index] = ';'; // Separator + index++; + } + + // Replace the last ';' with Null-terminate the C string + //headers[index - 1] = '\0'; + + // Copy the resulting string to the LStrHandle + copyBufferToLStrHandle(headers, required_buffer_size, concatenatedHeaders); + + // Free the temporary buffer + free(headers); +} \ No newline at end of file diff --git a/labview/msg_headers.h b/labview/msg_headers.h new file mode 100644 index 00000000..d72594d6 --- /dev/null +++ b/labview/msg_headers.h @@ -0,0 +1,28 @@ +#include +#include "extcode.h" + +#ifndef MSG_HEADERS_H +#define MSG_HEADERS_H + +/** + * This function takes an AMQP table containing headers and converts them into + * a string format where the headers' keys and values are concatenated together. + * Key-value pairs are separated by '=' and multiple headers are separated by + * ';'. + */ +void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen); + +/** + * This function parses a string representation of headers and populates an AMQP + * table with the parsed header entries. The headers in the string are expected + * to be in a specific format where key-value pairs are separated by '=' and + * multiple headers are separated by ';'. + * + * @note The `table` parameter should be a initialized `amqp_table_t` structure. + * The `headerBuffer` should point to a null-terminated C string. + * The function modifies the `table` structure in place with the parsed + * entries. + */ +void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders); + +#endif From 04ad862007735edf506d586ceb83b3cb8de3361c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Mon, 30 Oct 2023 15:40:44 +0100 Subject: [PATCH 13/20] added passive parameter exported functions --- labview/labview_rabbitmq.c | 20 ++++++++++++++------ labview/labview_rabbitmq.h | 4 ++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 856d0c12..0f93c8ff 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -61,11 +61,11 @@ int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle erro } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle errorDescription) +int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, uint8_t passive, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; - amqp_boolean_t PASSIVE = 0; + amqp_boolean_t PASSIVE = passive; amqp_boolean_t DURABLE = 0; amqp_boolean_t AUTO_DELETE = 0; amqp_boolean_t INTERNAL = 0; @@ -117,20 +117,28 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle errorDescription) +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; amqp_bytes_t queuename; - amqp_boolean_t PASSIVE = 0; + if (queue_name_in!=NULL){ + queuename = amqp_cstring_bytes(queue_name_in); + } else { + queuename = amqp_empty_bytes; + } + + amqp_boolean_t PASSIVE = 1; amqp_boolean_t DURABLE = 0; amqp_boolean_t EXCLUSIVE = 0; amqp_boolean_t AUTO_DELETE = 1; - amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, queuename, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", errorDescription); - copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); + if (status==1){ + copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); + } return status; } diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index ded0fd0d..70ea9c42 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -40,13 +40,13 @@ int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description); -int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char* exchange, char* exchangetype, LStrHandle error_description); +int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, uint8_t passive, LStrHandle errorDescription); int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description); -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description); +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription); int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description); From 5800539d3ac12c74db164059839e1aa04739b542 Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Mon, 30 Oct 2023 14:41:42 +0000 Subject: [PATCH 14/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index ab6b7ae2..72928cda 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-10-28 +## 0.0.1-dev - 2023-10-30 ### Features From 7fdb5f333b8c8b082cc5e94de7373b476f8cd60b Mon Sep 17 00:00:00 2001 From: kdevelle Date: Tue, 31 Oct 2023 11:58:42 +0100 Subject: [PATCH 15/20] Update CMakeLists.txt build the rabbit mq example when building the library --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 34e1b8bb..b24b26a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,7 +132,7 @@ option(BUILD_SHARED_LIBS "Build rabbitmq-c as a shared library" ON) option(BUILD_STATIC_LIBS "Build rabbitmq-c as a static library" ON) option(INSTALL_STATIC_LIBS "Install rabbitmq-c static library" ON) -option(BUILD_EXAMPLES "Build Examples" OFF) +option(BUILD_EXAMPLES "Build Examples" ON) option(BUILD_LABVIEW "Build LabVIEW shared library" ON) option(BUILD_TOOLS "Build Tools (requires POPT Library)" OFF) cmake_dependent_option(BUILD_TOOLS_DOCS "Build man pages for tools (requires xmlto)" OFF "BUILD_TOOLS" OFF) From c65569f6abb2217cb46af12850edd6e985a0139a Mon Sep 17 00:00:00 2001 From: kdevelleZ Date: Tue, 31 Oct 2023 10:59:43 +0000 Subject: [PATCH 16/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 72928cda..c160fa42 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-10-30 +## 0.0.1-dev - 2023-10-31 ### Features From 011471d15fc69d354cf2c6d604911dd8d1ca4083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Thu, 2 Nov 2023 08:34:33 +0100 Subject: [PATCH 17/20] Update labview_rabbitmq.c --- labview/labview_rabbitmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 0f93c8ff..df14e354 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -129,7 +129,7 @@ int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name queuename = amqp_empty_bytes; } - amqp_boolean_t PASSIVE = 1; + amqp_boolean_t PASSIVE = passive; amqp_boolean_t DURABLE = 0; amqp_boolean_t EXCLUSIVE = 0; amqp_boolean_t AUTO_DELETE = 1; From c286d07638155b6499a8f83c11fdd6997907ffbd Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Thu, 2 Nov 2023 07:35:17 +0000 Subject: [PATCH 18/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c160fa42..e27cef3c 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-10-31 +## 0.0.1-dev - 2023-11-02 ### Features From 46ffb2f50bab5df8878574ef13e5190d4b1fcff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kwiatkowski?= Date: Sun, 5 Nov 2023 11:08:19 +0100 Subject: [PATCH 19/20] fixed style issues reported in PR review --- labview/labview_types.h | 2 +- labview/msg_headers.c | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/labview/labview_types.h b/labview/labview_types.h index 24fb1945..496df44b 100644 --- a/labview/labview_types.h +++ b/labview/labview_types.h @@ -28,7 +28,7 @@ char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32 * Memory is allocated for the token array, and the caller is responsible for * freeing the memory when it's no longer needed. */ -char **splitString(char *input, int *count); +char **splitString(char *input, int *count, char delimiter); /** * This function frees the memory allocated for an array of tokens created by diff --git a/labview/msg_headers.c b/labview/msg_headers.c index f7a6c31d..f2a5dac3 100644 --- a/labview/msg_headers.c +++ b/labview/msg_headers.c @@ -174,7 +174,7 @@ void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) { break; } - required_buffer_size++; // ';' separator or Null-terminate + required_buffer_size++; // ';' separator } // Allocate memory for temp buffer @@ -259,9 +259,6 @@ void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) { index++; } - // Replace the last ';' with Null-terminate the C string - //headers[index - 1] = '\0'; - // Copy the resulting string to the LStrHandle copyBufferToLStrHandle(headers, required_buffer_size, concatenatedHeaders); From e2a9bd1669db610211499b7618dbde622adbe8ee Mon Sep 17 00:00:00 2001 From: kwitekrac Date: Sun, 5 Nov 2023 10:09:06 +0000 Subject: [PATCH 20/20] docs(release_notes): update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e27cef3c..d5b78b95 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-11-02 +## 0.0.1-dev - 2023-11-05 ### Features