From e3752b9bf3cf99380810d0ac175bba619a2b81f1 Mon Sep 17 00:00:00 2001 From: lieven Date: Sun, 10 Mar 2024 23:03:59 +0100 Subject: [PATCH] =?UTF-8?q?espidf=20serial=20working=20correctly=20,=20sti?= =?UTF-8?q?ll=20a=20small=20memory=20leakage=20at=20each=20lease=20and=20r?= =?UTF-8?q?ead=20task=20start=20top=20of=202=20x=2016=20bytes=20-=09-DZ=5F?= =?UTF-8?q?FEATURE=5FSUBSCRIPTION=3D0=20didn=C2=B4t=20compile=20previously?= =?UTF-8?q?=20-=20used=20freeRtos=20in=20espidf=20-=20read=20now=20times?= =?UTF-8?q?=20out=20in=201=20sec=20fixed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/zenoh-pico/api/macros.h | 2 + include/zenoh-pico/system/platform/espidf.h | 17 ++++- src/api/api.c | 7 +- src/session/subscription.c | 7 +- src/system/espidf/network.c | 15 ++--- src/system/espidf/system.c | 73 ++++++++++++++------- src/transport/unicast/read.c | 2 +- 7 files changed, 83 insertions(+), 40 deletions(-) diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index c7cb700a9..f40e5dcf8 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -279,8 +279,10 @@ template<> inline int8_t z_drop(z_owned_publisher_t* v) { return z_undeclare_pub template<> inline void z_drop(z_owned_keyexpr_t* v) { z_keyexpr_drop(v); } template<> inline void z_drop(z_owned_config_t* v) { z_config_drop(v); } template<> inline void z_drop(z_owned_scouting_config_t* v) { z_scouting_config_drop(v); } +#if Z_FEATURE_SUBSCRIPTION==1 template<> inline int8_t z_drop(z_owned_pull_subscriber_t* v) { return z_undeclare_pull_subscriber(v); } template<> inline int8_t z_drop(z_owned_subscriber_t* v) { return z_undeclare_subscriber(v); } +#endif template<> inline int8_t z_drop(z_owned_queryable_t* v) { return z_undeclare_queryable(v); } template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); } template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); } diff --git a/include/zenoh-pico/system/platform/espidf.h b/include/zenoh-pico/system/platform/espidf.h index 6210a9abc..231473726 100644 --- a/include/zenoh-pico/system/platform/espidf.h +++ b/include/zenoh-pico/system/platform/espidf.h @@ -18,14 +18,27 @@ #include #include #include +#include #include "zenoh-pico/config.h" #if Z_FEATURE_MULTI_THREAD == 1 #include -typedef TaskHandle_t zp_task_t; -typedef void *zp_task_attr_t; // Not used in ESP32 +typedef struct { + const char *name; + UBaseType_t priority; + size_t stack_depth; +#if (configSUPPORT_STATIC_ALLOCATION == 1) + _Bool static_allocation; + StackType_t *stack_buffer; + StaticTask_t *task_buffer; +#endif /* SUPPORT_STATIC_ALLOCATION */ +} zp_task_attr_t; +typedef struct { + TaskHandle_t handle; + EventGroupHandle_t join_event; +} zp_task_t; typedef pthread_mutex_t zp_mutex_t; typedef pthread_cond_t zp_condvar_t; #endif // Z_FEATURE_MULTI_THREAD == 1 diff --git a/src/api/api.c b/src/api/api.c index 73350bd23..a2ded70d4 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -647,6 +647,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint #endif ); +#if Z_FEATURE_SUBSCRIPTION == 1 // Trigger local subscriptions _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len, _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority) @@ -655,7 +656,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint opt.attachment #endif ); - +#endif return ret; } @@ -746,7 +747,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l opt.attachment #endif ); - +#if Z_FEATURE_SUBSCRIPTION == 1 // Trigger local subscriptions _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT #if Z_FEATURE_ATTACHMENT == 1 @@ -754,7 +755,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l opt.attachment #endif ); - +#endif return ret; } diff --git a/src/session/subscription.c b/src/session/subscription.c index 2a88a07ff..11373eafc 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -184,10 +184,13 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 +#if Z_FEATURE_SUBSCRIPTION==1 -// _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr)); + _Z_DEBUG(" %x - %s", keyexpr._id, keyexpr._suffix); + _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr)); +#endif _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); - _Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix); + _Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix); if (key._suffix != NULL) { _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key); diff --git a/src/system/espidf/network.c b/src/system/espidf/network.c index d5d7a517e..d7f09a960 100644 --- a/src/system/espidf/network.c +++ b/src/system/espidf/network.c @@ -592,7 +592,7 @@ int8_t _z_open_serial_from_dev(_z_sys_net_socket_t *sock, char *dev, uint32_t ba const int uart_buffer_size = (1024 * 2); QueueHandle_t uart_queue; - uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, &uart_queue, 0); + uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, NULL, 0); uart_flush_input(sock->_serial); return ret; @@ -634,23 +634,19 @@ size_t _z_read_serial(const _z_sys_net_socket_t sock, uint8_t *ptr, size_t len) uint8_t *before_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MAX_COBS_BUF_SIZE); size_t rb = 0; - uint8_t timeout_count = 0; for (size_t i = 0; i < _Z_SERIAL_MAX_COBS_BUF_SIZE; i++) { - int r = uart_read_bytes(sock._serial, &before_cobs[i], 1, 100); + int r = uart_read_bytes(sock._serial, &before_cobs[i], 1, 1000); if (r == 0) { - timeout_count++; - if (timeout_count > 10) { _Z_DEBUG("Timeout reading from serial"); zp_free(before_cobs); return 0; - } } else if (r == 1) { rb = rb + (size_t)1; if (before_cobs[i] == (uint8_t)0x00) { break; } } else { - _Z_DEBUG("Error reading from serial"); + _Z_ERROR("Error reading from serial"); zp_free(before_cobs); return _Z_ERR_GENERIC; } @@ -681,17 +677,16 @@ size_t _z_read_serial(const _z_sys_net_socket_t sock, uint8_t *ptr, size_t len) uint32_t c_crc = _z_crc32(ptr, payload_len); if (c_crc != crc) { - _Z_DEBUG("CRC mismatch: %d != %d ", c_crc, crc); + _Z_ERROR("CRC mismatch: %d != %d ", c_crc, crc); ret = _Z_ERR_GENERIC; } } else { - _Z_DEBUG("length mismatch => %d <> %d ", trb, payload_len + (uint16_t)6); + _Z_ERROR("length mismatch => %d <> %d ", trb, payload_len + (uint16_t)6); ret = _Z_ERR_GENERIC; } zp_free(before_cobs); zp_free(after_cobs); - _Z_DEBUG("payload_len = %d ", payload_len); rb = payload_len; if (ret != _Z_RES_OK) { rb = SIZE_MAX; diff --git a/src/system/espidf/system.c b/src/system/espidf/system.c index 51a9be532..47194c31e 100644 --- a/src/system/espidf/system.c +++ b/src/system/espidf/system.c @@ -50,49 +50,77 @@ void zp_free(void *ptr) { heap_caps_free(ptr); } // In FreeRTOS, tasks created using xTaskCreate must end with vTaskDelete. // A task function should __not__ simply return. typedef struct { - void *(*_fun)(void *); - void *_arg; + void *(*fun)(void *); + void *arg; + EventGroupHandle_t join_event; } z_task_arg; -void z_task_wrapper(z_task_arg *targ) { - targ->_fun(targ->_arg); +static void z_task_wrapper(void *arg) { + z_task_arg *targ = (z_task_arg *)arg; + targ->fun(targ->arg); + xEventGroupSetBits(targ->join_event, 1); vTaskDelete(NULL); - zp_free(targ); } -/*------------------ Task ------------------*/ -int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) { - int ret = 0; +static zp_task_attr_t z_default_task_attr = { + .name = "", + .priority = configMAX_PRIORITIES / 2, + .stack_depth = 5120, +#if (configSUPPORT_STATIC_ALLOCATION == 1) + .static_allocation = false, + .stack_buffer = NULL, + .task_buffer = NULL, +#endif /* SUPPORT_STATIC_ALLOCATION */ +}; +/*------------------ Thread ------------------*/ +int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) { z_task_arg *z_arg = (z_task_arg *)zp_malloc(sizeof(z_task_arg)); - if (z_arg != NULL) { - z_arg->_fun = fun; - z_arg->_arg = arg; - if (xTaskCreate((void *)z_task_wrapper, "", 5120, z_arg, configMAX_PRIORITIES / 2, task) != pdPASS) { - ret = -1; + if (z_arg == NULL) { + return -1; + } + + z_arg->fun = fun; + z_arg->arg = arg; + z_arg->join_event = task->join_event = xEventGroupCreate(); + + if (attr == NULL) { + attr = &z_default_task_attr; + } + +#if (configSUPPORT_STATIC_ALLOCATION == 1) + if (attr->static_allocation) { + task->handle = xTaskCreateStatic(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority, + attr->stack_buffer, attr->task_buffer); + if (task->handle == NULL) { + return -1; } } else { - ret = -1; +#endif /* SUPPORT_STATIC_ALLOCATION */ + if (xTaskCreate(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority, &task->handle) != + pdPASS) { + return -1; + } +#if (configSUPPORT_STATIC_ALLOCATION == 1) } +#endif /* SUPPORT_STATIC_ALLOCATION */ - return ret; + return 0; } int8_t zp_task_join(zp_task_t *task) { - // Note: task/thread join not supported on FreeRTOS API, so we force its deletion instead. - // return zp_task_cancel(task); + xEventGroupWaitBits(task->join_event, 1, pdFALSE, pdFALSE, portMAX_DELAY); return 0; } int8_t zp_task_cancel(zp_task_t *task) { - vTaskDelete(*task); + vTaskDelete(task->handle); return 0; } void zp_task_free(zp_task_t **task) { - zp_task_t *ptr = *task; - zp_free(ptr); - *task = NULL; + zp_free((*task)->join_event); + zp_free(*task); } /*------------------ Mutex ------------------*/ @@ -126,7 +154,8 @@ int zp_sleep_ms(size_t time) { // This may compound, so this approach may make sleeps longer than expected. // This extra check tries to minimize the amount of extra time it might sleep. while (zp_time_elapsed_ms(&start) < time) { - zp_sleep_us(1000); + //zp_sleep_us(1000); + vTaskDelay(1/portTICK_PERIOD_MS); } return 0; diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 3b02ff597..7b5e6083a 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -92,7 +92,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { } // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read); - + // Mark the session that we have received data ztu->_received = true;