Skip to content

Commit

Permalink
Adds "try_" overloads that return false if the message could not be q…
Browse files Browse the repository at this point in the history
…ueued for send, and adds an onDisconnect callback (#43)
  • Loading branch information
nkinnan authored Oct 8, 2024
1 parent dd1bbae commit 9410d19
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
44 changes: 37 additions & 7 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,28 @@ AsyncEventSourceClient::~AsyncEventSourceClient(){
}

void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
if(!_tryQueueMessage(dataMessage)){
ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event");
}
}

bool AsyncEventSourceClient::_tryQueueMessage(AsyncEventSourceMessage *dataMessage){
if(dataMessage == NULL)
return;
return true;
if(!connected()){
delete dataMessage;
return;
return true;
}

bool ret = true;
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event");
delete dataMessage;
ret = false;
} else {
_messageQueue.add(dataMessage);
}
if(_client->canSend())
_runQueue();
return ret;
}

void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
Expand Down Expand Up @@ -233,7 +240,11 @@ void AsyncEventSourceClient::close(){
}

void AsyncEventSourceClient::write(const char * message, size_t len){
_queueMessage(new AsyncEventSourceMessage(message, len));
try_write(message, len);
}

bool AsyncEventSourceClient::try_write(const char * message, size_t len){
return _tryQueueMessage(new AsyncEventSourceMessage(message, len));
}

void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
Expand Down Expand Up @@ -288,6 +299,8 @@ AsyncEventSource::AsyncEventSource(const String& url)
: _url(url)
, _clients(LinkedList<AsyncEventSourceClient *>([](AsyncEventSourceClient *c){ delete c; }))
, _connectcb(NULL)
, _connectcb2(NULL)
, _disconnectcb(NULL)
{}

AsyncEventSource::~AsyncEventSource(){
Expand All @@ -298,6 +311,14 @@ void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
_connectcb = cb;
}

void AsyncEventSource::onConnect(ArEventHandlerFunction2 cb){
_connectcb2 = cb;
}

void AsyncEventSource::onDisconnect(ArEventHandlerFunction2 cb){
_disconnectcb = cb;
}

void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
/*char * temp = (char *)malloc(2054);
if(temp != NULL){
Expand All @@ -315,10 +336,14 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
_clients.add(client);
if(_connectcb)
_connectcb(client);
if(_connectcb2)
_connectcb2(this, client);
}

void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
_clients.remove(client);
if(_disconnectcb)
_disconnectcb(this, client);
}

void AsyncEventSource::close(){
Expand Down Expand Up @@ -347,14 +372,19 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
}

void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
try_send(message, event, id, reconnect);
}


bool AsyncEventSource::try_send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect);
bool succeeded = false;
for(const auto &c: _clients){
if(c->connected()) {
c->write(ev.c_str(), ev.length());
if(c->try_write(ev.c_str(), ev.length()))
succeeded = true;
}
}
return succeeded;
}

size_t AsyncEventSource::count() const {
Expand Down
8 changes: 8 additions & 0 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class AsyncEventSource;
class AsyncEventSourceResponse;
class AsyncEventSourceClient;
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction;
typedef std::function<void(AsyncEventSource *source, AsyncEventSourceClient *client)> ArEventHandlerFunction2;

class AsyncEventSourceMessage {
private:
Expand Down Expand Up @@ -88,6 +89,7 @@ class AsyncEventSourceClient {
#endif // ESP32
LinkedList<AsyncEventSourceMessage *> _messageQueue;
void _queueMessage(AsyncEventSourceMessage *dataMessage);
bool _tryQueueMessage(AsyncEventSourceMessage *dataMessage);
void _runQueue();

public:
Expand All @@ -98,6 +100,7 @@ class AsyncEventSourceClient {
AsyncClient* client(){ return _client; }
void close();
void write(const char * message, size_t len);
bool try_write(const char * message, size_t len);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
bool connected() const { return (_client != NULL) && _client->connected(); }
uint32_t lastId() const { return _lastId; }
Expand All @@ -115,14 +118,19 @@ class AsyncEventSource: public AsyncWebHandler {
String _url;
LinkedList<AsyncEventSourceClient *> _clients;
ArEventHandlerFunction _connectcb;
ArEventHandlerFunction2 _connectcb2;
ArEventHandlerFunction2 _disconnectcb;
public:
AsyncEventSource(const String& url);
~AsyncEventSource();

const char * url() const { return _url.c_str(); }
void close();
void onConnect(ArEventHandlerFunction cb);
void onConnect(ArEventHandlerFunction2 cb);
void onDisconnect(ArEventHandlerFunction2 cb);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
bool try_send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
size_t count() const; //number clinets connected
size_t avgPacketsWaiting() const;

Expand Down

0 comments on commit 9410d19

Please sign in to comment.