diff --git a/labview/labview_rabbitmq.c b/labview/labview_rabbitmq.c index 21a077ec..c4e1f0d3 100644 --- a/labview/labview_rabbitmq.c +++ b/labview/labview_rabbitmq.c @@ -34,6 +34,18 @@ MgErr copyBufferToLStrHandle(const void *buffer, int len, LStrHandle LVString) return err; } +char* findInBuffer(const uint8_t* headerBuffer, uint32_t headerBufferLen, uint32_t offset, char toFind){ + uint32_t i=offset; + for (i; inum_entries = numEntries; + table->entries = entries; +} + + void getConcatenatedMessageHeaders(amqp_table_t *table, LStrHandle cheaders) { // Calculate required buffer size for concatenated headers keys and values separated by "=", @@ -342,7 +461,7 @@ int lv_amqp_login(int64_t conn_intptr, char *host, int port, int timeout_sec, ch } LABVIEW_PUBLIC_FUNCTION -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description) +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description) { amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr; amqp_basic_properties_t props; @@ -350,38 +469,23 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /*persistent delivery mode */ - int count, count2; - char **headers_key = splitString(cheaders_key, &count); - char **headers_value = splitString(cheaders_value, &count2); - if (headers_key != NULL) + + amqp_table_t *table; + if (headerBufferLen != 0) { // Update flags to use custom headers props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; - // Allocate memory for custom headers - amqp_table_t *table = &props.headers; - props.headers.num_entries = count; - props.headers.entries = calloc(props.headers.num_entries, sizeof(amqp_table_entry_t)); - // update headers content - for (int i = 0; i < count; i++) - { (table->entries[i]).key = amqp_cstring_bytes(headers_key[i]); - ((table->entries[i]).value).kind = AMQP_FIELD_KIND_BYTES; - ((table->entries[i]).value).value.bytes = amqp_cstring_bytes(headers_value[i]); - } + table = &props.headers; + //props.expiration = amqp_cstring_bytes("60000"); + parseHeaders(table, headerBuffer, headerBufferLen); } - int error_code = error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); + int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)); // Dereference headers - if (headers_key != NULL) + if (headerBufferLen != 0) { - for (int i = 0; i < count; i++) - { - free(headers_key[i]); - free(headers_value[i]); - } - - free(headers_key); - free(headers_value); + free(table->entries); } return error_code; @@ -463,4 +567,30 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out amqp_destroy_envelope(&envelope); return status; +} + +LABVIEW_PUBLIC_FUNCTION +void lv_test(LStrHandle outputA, LStrHandle outputB, LStrHandle inputA, uint8_t *headerBuffer, int32_t headerBufferLen){ + char outputA_str[100]; + char outputB_str[100]; + + amqp_table_t headers; + amqp_table_t* headerPtr = &headers; + //parseHeaders(headerPtr, intputB, intputBlen); + char* delimiter = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3B); + char* equal_char = findInBuffer(headerBuffer, headerBufferLen, 0, 0x3D); + + int pos1 = delimiter - (char*)headerBuffer; + int pos2 = equal_char - (char*)headerBuffer; + + //char valueTempBuffer [8]; + int valueLength = delimiter - equal_char - 1; + memcpy(outputA_str, equal_char + 1, valueLength); + + //sprintf(outputA_str, "%d", pos1); + sprintf(outputB_str, "%d", pos2); + + copyStringToLStrHandle(outputA_str, outputA); + copyStringToLStrHandle(outputB_str, outputB); + free(headerPtr->entries); } \ No newline at end of file diff --git a/labview/labview_rabbitmq.h b/labview/labview_rabbitmq.h index 06eea1cb..ded0fd0d 100644 --- a/labview/labview_rabbitmq.h +++ b/labview/labview_rabbitmq.h @@ -44,7 +44,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char* exchan int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description); -int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, char *cheaders_key, char *cheaders_value, char *messagebody, LStrHandle error_description); +int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, char *messagebody, LStrHandle error_description); int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, LStrHandle queue_name_out, LStrHandle error_description);