Skip to content

Commit

Permalink
client: Add support for user events
Browse files Browse the repository at this point in the history
Also supporting configurable queue size for the internal event loop.

Closes #230
  • Loading branch information
david-cermak committed Jul 29, 2022
1 parent 9186e5f commit 97503cc
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 4 deletions.
14 changes: 14 additions & 0 deletions include/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ typedef enum esp_mqtt_event_id_t {
- Additional context: msg_id (id of the deleted
message).
*/
MQTT_USER_EVENT, /*!< Custom event used to queue tasks into mqtt event handler
All fields from the esp_mqtt_event_t type could be used to pass
an additional context data to the handler.
*/
} esp_mqtt_event_id_t;

/**
Expand Down Expand Up @@ -569,6 +573,16 @@ esp_err_t esp_mqtt_client_unregister_event(esp_mqtt_client_handle_t client, esp_
*/
int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client);

/**
* @brief Dispatch user event to the mqtt internal event loop
*
* @param client *MQTT* client handle
* @param event *MQTT* event handle structure
* @return ESP_OK on success
* ESP_ERR_TIMEOUT if the event couldn't be queued (ref also CONFIG_MQTT_EVENT_QUEUE_SIZE)
*/
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event);

#ifdef __cplusplus
}
#endif //__cplusplus
Expand Down
4 changes: 4 additions & 0 deletions lib/include/mqtt_client_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include "esp_err.h"
#include "platform.h"

Expand Down Expand Up @@ -123,6 +124,9 @@ struct esp_mqtt_client {
EventGroupHandle_t status_bits;
SemaphoreHandle_t api_lock;
TaskHandle_t task_handle;
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_int queued_events;
#endif
};

bool esp_mqtt_set_if_config(char const *const new_config, char **old_config);
Expand Down
5 changes: 5 additions & 0 deletions lib/include/mqtt_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE

#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE
#else
#define MQTT_EVENT_QUEUE_SIZE 10
#endif

#define OUTBOX_MAX_SIZE (4*1024)
#endif
58 changes: 54 additions & 4 deletions mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,14 @@ static bool create_client_data(esp_mqtt_client_handle_t client)

esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
{
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
#if MQTT_EVENT_QUEUE_SIZE > 1
// if supporting multiple queued events, we keep track of them
// using atomic variable, so need to make sure it won't get allocated in PSRAM
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
#else
MALLOC_CAP_DEFAULT);
#endif
ESP_MEM_CHECK(TAG, client, return NULL);
if (!create_client_data(client)) {
goto _mqtt_init_failed;
Expand All @@ -776,10 +783,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = 1,
.queue_size = MQTT_EVENT_QUEUE_SIZE,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_init(&client->queued_events, 0);
#endif
#endif

client->keepalive_tick = platform_tick_get_ms();
Expand Down Expand Up @@ -939,6 +949,17 @@ static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t cli
return esp_mqtt_dispatch_event(client);
}

esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event)
{
esp_err_t ret = esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, MQTT_USER_EVENT, event, sizeof(*event), 0);
#if MQTT_EVENT_QUEUE_SIZE > 1
if (ret == ESP_OK) {
atomic_fetch_add(&client->queued_events, 1);
}
#endif
return ret;
}

static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.client = client;
Expand Down Expand Up @@ -1447,6 +1468,34 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
}
}

/**
* @brief When using multiple queued item, we'd like to reduce the poll timeout to proceed with event loop exacution
*/
static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_timeout)
{
return
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_load(&client->queued_events) > 0 ? 10: max_timeout;
#else
max_timeout;
#endif
}

static inline void run_event_loop(esp_mqtt_client_handle_t client)
{
#if MQTT_EVENT_QUEUE_SIZE > 1
if (atomic_load(&client->queued_events) > 0) {
atomic_fetch_sub(&client->queued_events, 1);
#else
{
#endif
esp_err_t ret = esp_event_loop_run(client->config->event_loop_handle, 0);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Error in running event_loop %d", ret);
}
}
}

static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
Expand All @@ -1470,6 +1519,7 @@ static void esp_mqtt_task(void *pv)
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
MQTT_API_LOCK(client);
run_event_loop(client);
switch (client->state) {
case MQTT_STATE_DISCONNECTED:
break;
Expand Down Expand Up @@ -1571,7 +1621,7 @@ static void esp_mqtt_task(void *pv)
}
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
client->wait_timeout_ms / 2 / portTICK_PERIOD_MS);
max_poll_timeout(client, client->wait_timeout_ms / 2 / portTICK_PERIOD_MS));
// continue the while loop instead of break, as the mutex is unlocked
continue;
default:
Expand All @@ -1580,7 +1630,7 @@ static void esp_mqtt_task(void *pv)
}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) {
if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}
Expand Down

0 comments on commit 97503cc

Please sign in to comment.