diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..ba92d194 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,23 @@ +{ + "files.associations": { + "optional": "c", + "istream": "c", + "ostream": "c", + "ratio": "c", + "system_error": "c", + "array": "c", + "functional": "c", + "tuple": "c", + "type_traits": "c", + "utility": "c", + "chrono": "c", + "algorithm": "c", + "sstream": "c", + "random": "c", + "*.tcc": "c", + "memory_resource": "c", + "fstream": "c", + "memory": "c", + "labview_types.h": "c" + } +} \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 34e1b8bb..b24b26a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,7 +132,7 @@ option(BUILD_SHARED_LIBS "Build rabbitmq-c as a shared library" ON) option(BUILD_STATIC_LIBS "Build rabbitmq-c as a static library" ON) option(INSTALL_STATIC_LIBS "Install rabbitmq-c static library" ON) -option(BUILD_EXAMPLES "Build Examples" OFF) +option(BUILD_EXAMPLES "Build Examples" ON) option(BUILD_LABVIEW "Build LabVIEW shared library" ON) option(BUILD_TOOLS "Build Tools (requires POPT Library)" OFF) cmake_dependent_option(BUILD_TOOLS_DOCS "Build man pages for tools (requires xmlto)" OFF "BUILD_TOOLS" OFF) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 127edaa3..d5b78b95 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,9 +1,10 @@ # rabbitmq-c Release Notes -## 0.0.1-dev - 2023-08-07 +## 0.0.1-dev - 2023-11-05 ### Features +- Support message headers (PR #10 by @kwitekrac) - LabVIEW support (PR #8 by @kwitekrac) ### Continuous Integration diff --git a/labview/CMakeLists.txt b/labview/CMakeLists.txt index ddca6b77..608484e1 100644 --- a/labview/CMakeLists.txt +++ b/labview/CMakeLists.txt @@ -3,6 +3,8 @@ set(RABBITMQ_LV_SOURCES labview_rabbitmq.c + labview_types.c labview_types.h + msg_headers.c msg_headers.h utils.c utils.h unix/platform_utils.c ) diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 359f3683..df14e354 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -5,151 +5,67 @@ #include "utils.h" #include "extcode.h" #include "labview_rabbitmq.h" +#include "labview_types.h" +#include "msg_headers.h" #include #include #include -MgErr copyStringToLStrHandle(char* cpString, LStrHandle LVString) -{ - int32 len = strlen(cpString); - MgErr err = NumericArrayResize(uB, 1, (UHandle*)&LVString, len); - if (!err) - { - strncpy((*LVString)->str, cpString, len); // copying the string into the handle - (*LVString)->cnt = len; // telling the Handle what string size to expect - } - return err; -} - -MgErr copyBufferToLStrHandle(const void* buffer, int len, LStrHandle LVString) -{ - - MgErr err = NumericArrayResize(uB, 1, (UHandle*)&LVString, len); - if (!err) - { - memcpy((*LVString)->str, buffer, len); // copying the string into the handle - (*LVString)->cnt = len; // telling the Handle what string size to expect - } - return err; -} - LABVIEW_PUBLIC_FUNCTION -char* lv_rabbitmq_version(void) { - char* VERSION = "0.0.1"; +char *lv_rabbitmq_version(void) +{ + char *VERSION = "0.0.1"; return VERSION; } -/* This function is a modified version of the `die_on_amqp_error` function used in examples, -enhanced with LabVIEW string support.*/ -int lv_report_amqp_error(amqp_rpc_reply_t x, char const *context, LStrHandle error_description) { - unsigned char temp_str[MAX_ERROR_DESCRIPTION_LENGTH]; - int err; - switch (x.reply_type) { - case AMQP_RESPONSE_NORMAL: - return _AMQP_RESPONSE_NORMAL; - - case AMQP_RESPONSE_NONE: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: missing RPC reply type!", context); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_NONE; - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: %s ", context, amqp_error_string2(x.library_error)); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_LIBRARY_EXCEPTION; - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = - (amqp_connection_close_t *)x.reply.decoded; - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server connection error %uh, message: %.*s ", - context, m->reply_code, (int)m->reply_text.len, - (char *)m->reply_text.bytes); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded; - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server channel error %uh, message: %.*s ", - context, m->reply_code, (int)m->reply_text.len, - (char *)m->reply_text.bytes); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - default: - snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: unknown server error, method id 0x%08X ", - context, x.reply.id); - err = copyStringToLStrHandle(temp_str, error_description); - if (err){ - return err; - } - return _AMQP_RESPONSE_SERVER_EXCEPTION; - } - break; - } -} - - LABVIEW_PUBLIC_FUNCTION -int64_t lv_amqp_new_connection() { +int64_t lv_amqp_new_connection() +{ // conn is an opaque struct pointer amqp_connection_state_t conn = amqp_new_connection(); // cast to int64_t so LabVIEW will receive it as // a 64-bit integer transfered by value - int64_t conn_intptr = (int64_t)conn; - return conn_intptr; + int64_t conn_intptr = (int64_t) conn; + return conn_intptr; } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle error_description) { +int lv_amqp_close_connection(int64_t conn_intptr, LStrHandle errorDescription) +{ // cast back to amqp_connection_state_t opaque struct pointer - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; - return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),"Connection close", error_description); + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + return lv_report_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Connection close", errorDescription); } - LABVIEW_PUBLIC_FUNCTION -void lv_amqp_destroy_connection(int64_t conn_intptr) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +void lv_amqp_destroy_connection(int64_t conn_intptr) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_destroy_connection(conn); } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_channel_open(conn, channel); - return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Opening channel", error_description); + return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Opening channel", errorDescription); } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; - return lv_report_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel", error_description); +int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + return lv_report_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel", errorDescription); } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, uint8_t passive, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; - amqp_boolean_t PASSIVE = 0; + amqp_boolean_t PASSIVE = passive; amqp_boolean_t DURABLE = 0; amqp_boolean_t AUTO_DELETE = 0; amqp_boolean_t INTERNAL = 0; @@ -157,99 +73,102 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *excha amqp_cstring_bytes(exchangetype), PASSIVE, DURABLE, AUTO_DELETE, INTERNAL, amqp_empty_table); - return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", error_description); + return lv_report_amqp_error(amqp_get_rpc_reply(conn), "Exchange declare", errorDescription); } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle error_description) { - +int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, char *username, char *password, LStrHandle errorDescription) +{ int status; amqp_socket_t *socket = NULL; - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; struct timeval tval; tval.tv_sec = timeout_sec; tval.tv_usec = 0; socket = amqp_tcp_socket_new(conn); - if (!socket) { + if (!socket) + { return _CREATING_TCP_SOCKET; - } + } + status = amqp_socket_open_noblock(socket, host, port, &tval); - if (status<0) { - return _OPENING_TCP_SOCKET; - } + if (status < 0) + { + return _OPENING_TCP_SOCKET; + } + /*Code explanation: socket is set/stored in the connection state, so we dont need to destroy this socket because - it will be destroyed along with connection state destroy function*/ - - char const *VHOST = "/"; // the virtual host to connect to on the broker. The default on most brokers is "/" - int const CHANNEL_MAX = 0; // the limit for number of channels for the connection. 0 means no limit. - int const FRAME_MAX = 131072; // the maximum size of an AMQP frame. 131072 is the default. - // 4096 is the minimum size, 2^31-1 is the maximum, a good default is 131072 (128KB), - int const HEARTBEAT = 0; // the number of seconds between heartbeat frames to request of the broker. A value of 0 disables heartbeats. - - return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", error_description); -} - - -LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *messagebody, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; - amqp_basic_properties_t props; - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; - props.content_type = amqp_cstring_bytes("text/plain"); - props.delivery_mode = 2; /* persistent delivery mode */ - return amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); - //this function returns amqp_status_enum thats different from amqp_rpc_reply_t + it will be destroyed along with connection state destroy function*/ + + char const *VHOST = "/"; // the virtual host to connect to on the broker. + // The default on most brokers is "/" + int const CHANNEL_MAX = 0; // the limit for number of channels for the connection. 0 means no limit. + int const FRAME_MAX = 131072; // the maximum size of an AMQP frame. 131072 is the default. + // 4096 is the minimum size, 2^31-1 is the maximum, + // a good default is 131072 (128KB), + int const HEARTBEAT = 0; // the number of seconds between heartbeat frames to request + // of the broker. A value of 0 disables heartbeats. + + return lv_report_amqp_error(amqp_login(conn, VHOST, CHANNEL_MAX, FRAME_MAX, HEARTBEAT, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in", errorDescription); } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *bindingkey, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; amqp_bytes_t queuename; - amqp_boolean_t PASSIVE = 0; + if (queue_name_in!=NULL){ + queuename = amqp_cstring_bytes(queue_name_in); + } else { + queuename = amqp_empty_bytes; + } + + amqp_boolean_t PASSIVE = passive; amqp_boolean_t DURABLE = 0; amqp_boolean_t EXCLUSIVE = 0; amqp_boolean_t AUTO_DELETE = 1; - amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_empty_bytes, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, queuename, PASSIVE, DURABLE, EXCLUSIVE, AUTO_DELETE, amqp_empty_table); - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", error_description); - if (status != 1) { - return status; - } - - queuename = amqp_bytes_malloc_dup(r->queue); - if (queuename.bytes == NULL) { - copyStringToLStrHandle("Out of memory while copying queue name", error_description); - return _OUT_OF_MEMORY; + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue", errorDescription); + if (status==1){ + copyBufferToLStrHandle(r->queue.bytes, r->queue.len, queue_name_out); } + return status; +} - amqp_queue_bind(conn, channel, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", error_description); - if (status != 1) { +LABVIEW_PUBLIC_FUNCTION +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + int status; + + amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Binding queue", errorDescription); + if (status != 1) + { return status; } amqp_boolean_t NO_LOCAL = 0; amqp_boolean_t NO_ACK = 1; amqp_boolean_t EXCLUSIVE2 = 0; - amqp_basic_consume(conn, channel, queuename, amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); - /* amqp_basic_consume is used to register a consumer on the queue, + amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename), amqp_empty_bytes, NO_LOCAL, NO_ACK, EXCLUSIVE2, amqp_empty_table); + /*amqp_basic_consume is used to register a consumer on the queue, so that the broker will start delivering messages to it.*/ - status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", error_description); - return status; + status = lv_report_amqp_error(amqp_get_rpc_reply(conn), "Basic consume", errorDescription); + return status; } - LABVIEW_PUBLIC_FUNCTION -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle error_description) { - amqp_connection_state_t conn = (amqp_connection_state_t)conn_intptr; +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle concatenatedHeaders, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; int status; struct timeval tval; @@ -260,16 +179,166 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out //amqp_maybe_release_buffers(conn); - status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0),"Consuming message", error_description); - if (status != 1) { + status = lv_report_amqp_error(amqp_consume_message(conn, &envelope, &tval, 0), "Consuming message", errorDescription); + if (status != 1) + { return status; } - if (envelope.message.body.len > 0) { + if (envelope.message.body.len > 0) + { copyBufferToLStrHandle(envelope.message.body.bytes, envelope.message.body.len, output); } + // Check message headers + amqp_table_t *headers = &envelope.message.properties.headers; + if (headers->num_entries > 0) + { + headersToString(headers, concatenatedHeaders); + } + amqp_destroy_envelope(&envelope); return status; } +LABVIEW_PUBLIC_FUNCTION +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, char *messageBody, LStrHandle errorDescription) +{ + amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = 2; /*persistent delivery mode */ + + + amqp_table_t *table; + if (msgHeaderBufLen > 0) + { + // Update flags to use custom headers + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; + table = &props.headers; + stringToHeaders(table, msgHeaderBuf, msgHeaderBufLen); + } + + int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, amqp_cstring_bytes(messageBody)); + + // Free allocated headers + if (msgHeaderBufLen > 0) + { + free(table->entries); + } + + return error_code; + //this function returns amqp_status_enum thats different from amqp_rpc_reply_t +} + +/** + * This function is defined for testing purposes only. + */ +LABVIEW_PUBLIC_FUNCTION +void lv_test(LStrHandle outputA, LStrHandle outputB, LStrHandle inputA, uint8_t *headerBuffer, int32_t headerBufferLen){ + char outputA_str[100]; + char outputB_str[100]; + + amqp_table_t headers; + amqp_table_t* headerPtr = &headers; + //parseHeaders(headerPtr, intputB, intputBlen); + char* delimiter = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3B); + char* equal_char = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3D); + + int pos1 = delimiter - (char*)headerBuffer; + int pos2 = equal_char - (char*)headerBuffer; + + //char valueTempBuffer [8]; + int valueLength = delimiter - equal_char - 1; + memcpy(outputA_str, equal_char + 1, valueLength); + + //sprintf(outputA_str, "%d", pos1); + sprintf(outputB_str, "%d", pos2); + + copyStringToLStrHandle(outputA_str, outputA); + copyStringToLStrHandle(outputB_str, outputB); + free(headerPtr->entries); +} + +/*This function is a modified version of the `die_on_amqp_error` function used in examples, +enhanced with LabVIEW string support.*/ +int lv_report_amqp_error(amqp_rpc_reply_t x, char + const *context, LStrHandle errorDescription) +{ + unsigned char temp_str[MAX_ERROR_DESCRIPTION_LENGTH]; + int err; + switch (x.reply_type) + { + case AMQP_RESPONSE_NORMAL: + return _AMQP_RESPONSE_NORMAL; + + case AMQP_RESPONSE_NONE: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: missing RPC reply type!", context); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_NONE; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: %s ", context, amqp_error_string2(x.library_error)); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_LIBRARY_EXCEPTION; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) + { + case AMQP_CONNECTION_CLOSE_METHOD: + { + amqp_connection_close_t *m = + (amqp_connection_close_t*) x.reply.decoded; + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server connection error %uh, message: %.*s ", + context, m->reply_code, (int) m->reply_text.len, + (char*) m->reply_text.bytes); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + case AMQP_CHANNEL_CLOSE_METHOD: + { + amqp_channel_close_t *m = (amqp_channel_close_t*) x.reply.decoded; + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: server channel error %uh, message: %.*s ", + context, m->reply_code, (int) m->reply_text.len, + (char*) m->reply_text.bytes); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + default: + snprintf(temp_str, MAX_ERROR_DESCRIPTION_LENGTH, "%s: unknown server error, method id 0x%08X ", + context, x.reply.id); + err = copyStringToLStrHandle(temp_str, errorDescription); + if (err) + { + return err; + } + + return _AMQP_RESPONSE_SERVER_EXCEPTION; + } + + break; + } +} diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index a8058459..70ea9c42 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -40,15 +40,17 @@ int lv_amqp_channel_open(int64_t conn_intptr, uint16_t channel, LStrHandle error int lv_amqp_channel_close(int64_t conn_intptr, uint16_t channel, LStrHandle error_description); -int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char* exchange, char* exchangetype, LStrHandle error_description); +int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchange, char *exchangetype, uint8_t passive, LStrHandle errorDescription); int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char* exchange, char* routingkey, char* messagebody, LStrHandle error_description); +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description); -int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* exchange, char* bindingkey, LStrHandle error_description); +int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription); -int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle error_description); +int lv_amqp_bind_queue(int64_t conn_intptr, uint16_t channel, char *exchange, char *queuename, char *bindingkey, LStrHandle error_description); + +int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle output, LStrHandle cheaders, LStrHandle error_description); #endif /* LABVIEW_RABBITMQ_H */ diff --git a/labview/labview_types.c b/labview/labview_types.c new file mode 100644 index 00000000..13d8c5fd --- /dev/null +++ b/labview/labview_types.c @@ -0,0 +1,92 @@ +#include + +#include "extcode.h" + +/** + * This function copies the contents of a C string into a LabVIEW LStrHandle, + * resizing the handle if necessary. + */ +MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString) { + int32 len = strlen(cpString); + MgErr err = NumericArrayResize(uB, 1, (UHandle *)&LVString, len); + if (!err) { + strncpy((*LVString)->str, cpString, + len); // copying the string into the handle + (*LVString)->cnt = len; // telling the Handle what string size to expect + } + + return err; +} + +/** + * This function copies the contents of a binary buffer into a LabVIEW + * LStrHandle, resizing the handle if necessary. The function is designed for + * handling binary data. + */ +MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) { + MgErr err = NumericArrayResize(uB, 1, (UHandle *)&LVString, len); + if (!err) { + memcpy((*LVString)->str, buffer, + len); // copying the string into the handle + (*LVString)->cnt = len; // telling the Handle what string size to expect + } + + return err; +} + +/** + * This function searches for a specified character within a binary buffer and + * returns a pointer to the first occurrence of the character, starting from the + * given offset. + */ +char *findInBuffer(const uint8_t *buffer, uint32_t len, uint32_t offset, + char toFind) { + uint32_t i = offset; + for (i; i < len; i++) { + if (buffer[i] == toFind) { + return buffer + i; + } + } + return NULL; +} + +/** + * Split a string into an array of tokens based on a delimiter. + * Memory is allocated for the token array, and the caller is responsible for + * freeing the memory when it's no longer needed. + */ +char **splitString(char *input, int *count, char delimiter) { + char **tokens = NULL; + char *token = strtok(input, &delimiter); + *count = 0; + + while (token != NULL) { + tokens = (char **)realloc(tokens, ((*count) + 1) * sizeof(char *)); + if (tokens == NULL) { + perror("Memory allocation error"); + exit(1); + } + + tokens[(*count)] = strdup(token); + (*count)++; + token = strtok(NULL, &delimiter); + } + + return tokens; +} + +/** + * This function frees the memory allocated for an array of tokens created by + * 'splitString'. + */ +void freeTokens(char **tokens, int count) { + if (tokens == NULL) { + return; // Handle NULL input. + } + + for (int i = 0; i < count; i++) { + free(tokens[i]); // Free memory for each token. + } + + free(tokens); // Free memory for the token array itself. +} diff --git a/labview/labview_types.h b/labview/labview_types.h new file mode 100644 index 00000000..496df44b --- /dev/null +++ b/labview/labview_types.h @@ -0,0 +1,39 @@ +#include "extcode.h" + +#ifndef LABVIEW_TYPES_H +#define LABVIEW_TYPES_H + +/** + * This function copies the contents of a C string into a LabVIEW LStrHandle, + * resizing the handle if necessary. + */ +MgErr copyStringToLStrHandle(char *cpString, LStrHandle LVString); + +/** + * This function copies the contents of a binary buffer into a LabVIEW + * LStrHandle, resizing the handle if necessary. The function is designed for + * handling binary data. + */ +MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString); + +/** + * This function searches for a specified character within a binary buffer and + * returns a pointer to the first occurrence of the character, starting from the + * given offset. + */ +char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32_t offset, char toFind); + +/** + * Split a string into an array of tokens based on a delimiter. + * Memory is allocated for the token array, and the caller is responsible for + * freeing the memory when it's no longer needed. + */ +char **splitString(char *input, int *count, char delimiter); + +/** + * This function frees the memory allocated for an array of tokens created by + * 'splitString'. + */ +void freeTokens(char** tokens, int count); + +#endif diff --git a/labview/msg_headers.c b/labview/msg_headers.c new file mode 100644 index 00000000..f2a5dac3 --- /dev/null +++ b/labview/msg_headers.c @@ -0,0 +1,267 @@ +#include + +#include "extcode.h" +#include "labview_types.h" + +/** + * This function parses a string representation of headers and populates an AMQP + * table with the parsed header entries. The headers in the string are expected + * to be in a specific format where key-value pairs are separated by '=' and + * multiple headers are separated by ';'. + * + * @note The `table` parameter should be a initialized `amqp_table_t` structure. + * The `headerBuffer` should point to a null-terminated C string. + * The function modifies the `table` structure in place with the parsed + * entries. + */ +void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer, + uint64_t headerBufferLen) { + amqp_table_entry_t* entries = NULL; + int numEntries = 0; + int finished = FALSE; + int index = 0; + + while (finished == 0) { + // Find the delimiter ';' + char* delimiter = findInBuffer(headerBuffer, headerBufferLen, index, 0x3B); + if (delimiter == NULL) { + delimiter = + headerBuffer + + headerBufferLen; // If no delimiter found, use the end of the string + } + if (delimiter == headerBuffer + headerBufferLen) { + finished = TRUE; + } + // Find the equal_char '=' + char* equal_char = findInBuffer(headerBuffer, headerBufferLen, index, 0x3D); + + if (equal_char != NULL && equal_char < delimiter) { + // Add the entry to the entries array + amqp_table_entry_t* newEntries = + realloc(entries, (numEntries + 1) * sizeof(amqp_table_entry_t)); + if (newEntries == NULL) { + // Error handling for memory allocation failure + perror("Memory allocation failed"); + free(entries); + return table; + } + entries = newEntries; + amqp_field_value_t value; + + // Get the kind (first byte) + value.kind = headerBuffer[index]; + index++; // Move to the next character + + // Parse the key + entries[numEntries].key.bytes = headerBuffer + index; + entries[numEntries].key.len = equal_char - ((char*)headerBuffer + index); + + // Init and parse the value + char valueTempBuffer[8]; + int valueLength = delimiter - equal_char - 1; + memcpy(valueTempBuffer, equal_char + 1, valueLength); + + // Parse the value based on the kind + switch (value.kind) { + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + value.value.bytes.bytes = equal_char + 1; // fist char after "=" + value.value.bytes.len = valueLength; + break; + case AMQP_FIELD_KIND_I8: + value.value.i8 = (int8_t)valueTempBuffer[0]; + break; + case AMQP_FIELD_KIND_U8: + value.value.u8 = (uint8_t)valueTempBuffer[0]; + break; + case AMQP_FIELD_KIND_I16: + value.value.i16 = *((int16_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_U16: + value.value.u16 = *((uint16_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_I32: + value.value.i32 = *((int32_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_U32: + value.value.u32 = *((uint32_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_I64: + value.value.i64 = *((int64_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_U64: + case AMQP_FIELD_KIND_TIMESTAMP: + value.value.u64 = *((uint64_t*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_F32: + value.value.f32 = *((float*)valueTempBuffer); + break; + case AMQP_FIELD_KIND_F64: + value.value.f64 = *((double*)valueTempBuffer); + break; + default: + // Unsupported kind, ignore this header + break; + } + + entries[numEntries].value = value; + numEntries++; + } + + // Move to the next header (skip the delimiter ';') + index = delimiter - (char*)headerBuffer + 1; + } + + // Assign the entries to the table + table->num_entries = numEntries; + table->entries = entries; +} + +/** + * This function takes an AMQP table containing headers and converts them into + * a string format where the headers' keys and values are concatenated together. + * Key-value pairs are separated by '=' and multiple headers are separated by + * ';'. + */ +void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) { + // Calculate required string (char buffer) size + int required_buffer_size = 0; + for (int i = 0; i < table->num_entries; i++) { + required_buffer_size++; // character that indicates a type + required_buffer_size += table->entries[i].key.len; // key + required_buffer_size++; // '=' separator + + switch (table->entries[i].value.kind) { + case AMQP_FIELD_KIND_I8: + required_buffer_size += 1; + break; + case AMQP_FIELD_KIND_I64: + required_buffer_size += 8; + break; + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + required_buffer_size += table->entries[i].value.value.bytes.len; + break; + case AMQP_FIELD_KIND_U8: + required_buffer_size += 1; + break; + case AMQP_FIELD_KIND_I16: + required_buffer_size += 2; + break; + case AMQP_FIELD_KIND_U16: + required_buffer_size += 2; + break; + case AMQP_FIELD_KIND_I32: + required_buffer_size += 4; + break; + case AMQP_FIELD_KIND_U32: + required_buffer_size += 4; + break; + case AMQP_FIELD_KIND_U64: + required_buffer_size += 8; + break; + case AMQP_FIELD_KIND_TIMESTAMP: + required_buffer_size += 8; + break; + case AMQP_FIELD_KIND_F32: + required_buffer_size += 4; + break; + case AMQP_FIELD_KIND_F64: + required_buffer_size += 8; + break; + default: + required_buffer_size += 0; // Unsupported kind, ignore + break; + } + + required_buffer_size++; // ';' separator + } + + // Allocate memory for temp buffer + char* headers = (char*)malloc(required_buffer_size * sizeof(char)); + float f32_value; + double f64_value; + + int index = 0; + for (int i = 0; i < table->num_entries; i++) { + headers[index] = table->entries[i].value.kind; // Character that indicates a type + index++; + + memcpy(headers + index, table->entries[i].key.bytes, table->entries[i].key.len); + index += table->entries[i].key.len; + headers[index] = '='; // Separator + index++; + + switch (table->entries[i].value.kind) { + case AMQP_FIELD_KIND_I8: + headers[index] = table->entries[i].value.value.i8; + index++; + break; + case AMQP_FIELD_KIND_I64: + for (int j = 0; j < 8; j++) { + headers[index + j] = (table->entries[i].value.value.i64 >> (j * 8)) & 0xFF; + } + index += 8; + break; + case AMQP_FIELD_KIND_UTF8: + case AMQP_FIELD_KIND_BYTES: + memcpy(headers + index, table->entries[i].value.value.bytes.bytes, table->entries[i].value.value.bytes.len); + index += table->entries[i].value.value.bytes.len; + break; + case AMQP_FIELD_KIND_U8: + headers[index] = table->entries[i].value.value.u8; + index++; + break; + case AMQP_FIELD_KIND_I16: + headers[index] = table->entries[i].value.value.i16 & 0xFF; + headers[index + 1] = (table->entries[i].value.value.i16 >> 8) & 0xFF; + index += 2; + break; + case AMQP_FIELD_KIND_U16: + headers[index] = table->entries[i].value.value.i16 & 0xFF; + headers[index + 1] = (table->entries[i].value.value.i16 >> 8) & 0xFF; + index += 2; + break; + case AMQP_FIELD_KIND_I32: + for (int j = 0; j < 4; j++) { + headers[index + j] = (table->entries[i].value.value.i32 >> (j * 8)) & 0xFF; + } + index += 4; + break; + case AMQP_FIELD_KIND_U32: + for (int j = 0; j < 4; j++) { + headers[index + j] = (table->entries[i].value.value.u32 >> (j * 8)) & 0xFF; + } + index += 4; + break; + case AMQP_FIELD_KIND_U64: + case AMQP_FIELD_KIND_TIMESTAMP: + for (int j = 0; j < 8; j++) { + headers[index + j] = (table->entries[i].value.value.u64 >> (j * 8)) & 0xFF; + } + index += 8; + break; + case AMQP_FIELD_KIND_F32: + f32_value = table->entries[i].value.value.f32; + memcpy(headers + index, &f32_value, 4); + index += 4; + break; + case AMQP_FIELD_KIND_F64: + f64_value = table->entries[i].value.value.f64; + memcpy(headers + index, &f64_value, 8); + index += 8; + break; + default: + break; // Unsupported kind, ignore + } + + headers[index] = ';'; // Separator + index++; + } + + // Copy the resulting string to the LStrHandle + copyBufferToLStrHandle(headers, required_buffer_size, concatenatedHeaders); + + // Free the temporary buffer + free(headers); +} \ No newline at end of file diff --git a/labview/msg_headers.h b/labview/msg_headers.h new file mode 100644 index 00000000..d72594d6 --- /dev/null +++ b/labview/msg_headers.h @@ -0,0 +1,28 @@ +#include +#include "extcode.h" + +#ifndef MSG_HEADERS_H +#define MSG_HEADERS_H + +/** + * This function takes an AMQP table containing headers and converts them into + * a string format where the headers' keys and values are concatenated together. + * Key-value pairs are separated by '=' and multiple headers are separated by + * ';'. + */ +void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen); + +/** + * This function parses a string representation of headers and populates an AMQP + * table with the parsed header entries. The headers in the string are expected + * to be in a specific format where key-value pairs are separated by '=' and + * multiple headers are separated by ';'. + * + * @note The `table` parameter should be a initialized `amqp_table_t` structure. + * The `headerBuffer` should point to a null-terminated C string. + * The function modifies the `table` structure in place with the parsed + * entries. + */ +void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders); + +#endif