Skip to content

Commit

Permalink
espidf serial working correctly , still a small memory leakage at eac…
Browse files Browse the repository at this point in the history
…h lease and read task start top of 2 x 16 bytes

-	-DZ_FEATURE_SUBSCRIPTION=0 didn´t compile previously
- used freeRtos in espidf
- read now times out in 1 sec fixed
  • Loading branch information
vortex314 committed Mar 10, 2024
1 parent e1f4c49 commit e3752b9
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 40 deletions.
2 changes: 2 additions & 0 deletions include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ template<> inline int8_t z_drop(z_owned_publisher_t* v) { return z_undeclare_pub
template<> inline void z_drop(z_owned_keyexpr_t* v) { z_keyexpr_drop(v); }
template<> inline void z_drop(z_owned_config_t* v) { z_config_drop(v); }
template<> inline void z_drop(z_owned_scouting_config_t* v) { z_scouting_config_drop(v); }
#if Z_FEATURE_SUBSCRIPTION==1
template<> inline int8_t z_drop(z_owned_pull_subscriber_t* v) { return z_undeclare_pull_subscriber(v); }
template<> inline int8_t z_drop(z_owned_subscriber_t* v) { return z_undeclare_subscriber(v); }
#endif
template<> inline int8_t z_drop(z_owned_queryable_t* v) { return z_undeclare_queryable(v); }
template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); }
template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); }
Expand Down
17 changes: 15 additions & 2 deletions include/zenoh-pico/system/platform/espidf.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,27 @@
#include <driver/uart.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <freertos/event_groups.h>

#include "zenoh-pico/config.h"

#if Z_FEATURE_MULTI_THREAD == 1
#include <pthread.h>

typedef TaskHandle_t zp_task_t;
typedef void *zp_task_attr_t; // Not used in ESP32
typedef struct {
const char *name;
UBaseType_t priority;
size_t stack_depth;
#if (configSUPPORT_STATIC_ALLOCATION == 1)
_Bool static_allocation;
StackType_t *stack_buffer;
StaticTask_t *task_buffer;
#endif /* SUPPORT_STATIC_ALLOCATION */
} zp_task_attr_t;
typedef struct {
TaskHandle_t handle;
EventGroupHandle_t join_event;
} zp_task_t;
typedef pthread_mutex_t zp_mutex_t;
typedef pthread_cond_t zp_condvar_t;
#endif // Z_FEATURE_MULTI_THREAD == 1
Expand Down
7 changes: 4 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
#endif
);

#if Z_FEATURE_SUBSCRIPTION == 1
// Trigger local subscriptions
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len,
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority)
Expand All @@ -655,7 +656,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
opt.attachment
#endif
);

#endif
return ret;
}

Expand Down Expand Up @@ -746,15 +747,15 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
opt.attachment
#endif
);

#if Z_FEATURE_SUBSCRIPTION == 1
// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
#endif
);

#endif
return ret;
}

Expand Down
7 changes: 5 additions & 2 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,13 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
#if Z_FEATURE_SUBSCRIPTION==1

// _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr));
_Z_DEBUG(" %x - %s", keyexpr._id, keyexpr._suffix);
_Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr));
#endif
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr);
_Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix);
_Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix);
if (key._suffix != NULL) {
_z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key);

Expand Down
15 changes: 5 additions & 10 deletions src/system/espidf/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ int8_t _z_open_serial_from_dev(_z_sys_net_socket_t *sock, char *dev, uint32_t ba

const int uart_buffer_size = (1024 * 2);
QueueHandle_t uart_queue;
uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, &uart_queue, 0);
uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, NULL, 0);
uart_flush_input(sock->_serial);

return ret;
Expand Down Expand Up @@ -634,23 +634,19 @@ size_t _z_read_serial(const _z_sys_net_socket_t sock, uint8_t *ptr, size_t len)

uint8_t *before_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MAX_COBS_BUF_SIZE);
size_t rb = 0;
uint8_t timeout_count = 0;
for (size_t i = 0; i < _Z_SERIAL_MAX_COBS_BUF_SIZE; i++) {
int r = uart_read_bytes(sock._serial, &before_cobs[i], 1, 100);
int r = uart_read_bytes(sock._serial, &before_cobs[i], 1, 1000);
if (r == 0) {
timeout_count++;
if (timeout_count > 10) {
_Z_DEBUG("Timeout reading from serial");
zp_free(before_cobs);
return 0;
}
} else if (r == 1) {
rb = rb + (size_t)1;
if (before_cobs[i] == (uint8_t)0x00) {
break;
}
} else {
_Z_DEBUG("Error reading from serial");
_Z_ERROR("Error reading from serial");
zp_free(before_cobs);
return _Z_ERR_GENERIC;
}
Expand Down Expand Up @@ -681,17 +677,16 @@ size_t _z_read_serial(const _z_sys_net_socket_t sock, uint8_t *ptr, size_t len)

uint32_t c_crc = _z_crc32(ptr, payload_len);
if (c_crc != crc) {
_Z_DEBUG("CRC mismatch: %d != %d ", c_crc, crc);
_Z_ERROR("CRC mismatch: %d != %d ", c_crc, crc);
ret = _Z_ERR_GENERIC;
}
} else {
_Z_DEBUG("length mismatch => %d <> %d ", trb, payload_len + (uint16_t)6);
_Z_ERROR("length mismatch => %d <> %d ", trb, payload_len + (uint16_t)6);
ret = _Z_ERR_GENERIC;
}

zp_free(before_cobs);
zp_free(after_cobs);
_Z_DEBUG("payload_len = %d ", payload_len);
rb = payload_len;
if (ret != _Z_RES_OK) {
rb = SIZE_MAX;
Expand Down
73 changes: 51 additions & 22 deletions src/system/espidf/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,77 @@ void zp_free(void *ptr) { heap_caps_free(ptr); }
// In FreeRTOS, tasks created using xTaskCreate must end with vTaskDelete.
// A task function should __not__ simply return.
typedef struct {
void *(*_fun)(void *);
void *_arg;
void *(*fun)(void *);
void *arg;
EventGroupHandle_t join_event;
} z_task_arg;

void z_task_wrapper(z_task_arg *targ) {
targ->_fun(targ->_arg);
static void z_task_wrapper(void *arg) {
z_task_arg *targ = (z_task_arg *)arg;
targ->fun(targ->arg);
xEventGroupSetBits(targ->join_event, 1);
vTaskDelete(NULL);
zp_free(targ);
}

/*------------------ Task ------------------*/
int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) {
int ret = 0;
static zp_task_attr_t z_default_task_attr = {
.name = "",
.priority = configMAX_PRIORITIES / 2,
.stack_depth = 5120,
#if (configSUPPORT_STATIC_ALLOCATION == 1)
.static_allocation = false,
.stack_buffer = NULL,
.task_buffer = NULL,
#endif /* SUPPORT_STATIC_ALLOCATION */
};

/*------------------ Thread ------------------*/
int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) {
z_task_arg *z_arg = (z_task_arg *)zp_malloc(sizeof(z_task_arg));
if (z_arg != NULL) {
z_arg->_fun = fun;
z_arg->_arg = arg;
if (xTaskCreate((void *)z_task_wrapper, "", 5120, z_arg, configMAX_PRIORITIES / 2, task) != pdPASS) {
ret = -1;
if (z_arg == NULL) {
return -1;
}

z_arg->fun = fun;
z_arg->arg = arg;
z_arg->join_event = task->join_event = xEventGroupCreate();

if (attr == NULL) {
attr = &z_default_task_attr;
}

#if (configSUPPORT_STATIC_ALLOCATION == 1)
if (attr->static_allocation) {
task->handle = xTaskCreateStatic(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority,
attr->stack_buffer, attr->task_buffer);
if (task->handle == NULL) {
return -1;
}
} else {
ret = -1;
#endif /* SUPPORT_STATIC_ALLOCATION */
if (xTaskCreate(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority, &task->handle) !=
pdPASS) {
return -1;
}
#if (configSUPPORT_STATIC_ALLOCATION == 1)
}
#endif /* SUPPORT_STATIC_ALLOCATION */

return ret;
return 0;
}

int8_t zp_task_join(zp_task_t *task) {
// Note: task/thread join not supported on FreeRTOS API, so we force its deletion instead.
// return zp_task_cancel(task);
xEventGroupWaitBits(task->join_event, 1, pdFALSE, pdFALSE, portMAX_DELAY);
return 0;
}

int8_t zp_task_cancel(zp_task_t *task) {
vTaskDelete(*task);
vTaskDelete(task->handle);
return 0;
}

void zp_task_free(zp_task_t **task) {
zp_task_t *ptr = *task;
zp_free(ptr);
*task = NULL;
zp_free((*task)->join_event);
zp_free(*task);
}

/*------------------ Mutex ------------------*/
Expand Down Expand Up @@ -126,7 +154,8 @@ int zp_sleep_ms(size_t time) {
// This may compound, so this approach may make sleeps longer than expected.
// This extra check tries to minimize the amount of extra time it might sleep.
while (zp_time_elapsed_ms(&start) < time) {
zp_sleep_us(1000);
//zp_sleep_us(1000);
vTaskDelay(1/portTICK_PERIOD_MS);
}

return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void *_zp_unicast_read_task(void *ztu_arg) {
}
// Wrap the main buffer for to_read bytes
_z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read);

// Mark the session that we have received data
ztu->_received = true;

Expand Down

0 comments on commit e3752b9

Please sign in to comment.