Skip to content

Commit

Permalink
feat: fuse multicast transport code
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Dec 5, 2023
1 parent 40a35b1 commit a7b9b7f
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 321 deletions.
28 changes: 0 additions & 28 deletions include/zenoh-pico/transport/raweth/transport.h

This file was deleted.

23 changes: 2 additions & 21 deletions src/transport/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <stdlib.h>

#include "zenoh-pico/transport/multicast/transport.h"
#include "zenoh-pico/transport/raweth/transport.h"
#include "zenoh-pico/transport/unicast/transport.h"

int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local_zid) {
Expand All @@ -45,6 +44,7 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local
break;
}
// Multicast transport
case Z_LINK_CAP_TRANSPORT_RAWETH:
case Z_LINK_CAP_TRANSPORT_MULTICAST: {
_z_transport_multicast_establish_param_t tp_param;
ret = _z_multicast_open_client(&tp_param, &zl, local_zid);
Expand All @@ -55,16 +55,6 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local
ret = _z_multicast_transport_create(zt, &zl, &tp_param);
break;
}
case Z_LINK_CAP_TRANSPORT_RAWETH: {
_z_transport_multicast_establish_param_t tp_param;
ret = _z_raweth_open_client(&tp_param, &zl, local_zid);
if (ret != _Z_RES_OK) {
_z_link_clear(&zl);
return ret;
}
ret = _z_raweth_transport_create(zt, &zl, &tp_param);
break;
}
default:
ret = _Z_ERR_GENERIC;
break;
Expand Down Expand Up @@ -93,6 +83,7 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z
ret = _z_unicast_transport_create(zt, &zl, &tp_param);
break;
}
case Z_LINK_CAP_TRANSPORT_RAWETH:
case Z_LINK_CAP_TRANSPORT_MULTICAST: {
_z_transport_multicast_establish_param_t tp_param;
ret = _z_multicast_open_peer(&tp_param, &zl, local_zid);
Expand All @@ -103,16 +94,6 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z
ret = _z_multicast_transport_create(zt, &zl, &tp_param);
break;
}
case Z_LINK_CAP_TRANSPORT_RAWETH: {
_z_transport_multicast_establish_param_t tp_param;
ret = _z_raweth_open_peer(&tp_param, &zl, local_zid);
if (ret != _Z_RES_OK) {
_z_link_clear(&zl);
return ret;
}
ret = _z_raweth_transport_create(zt, &zl, &tp_param);
break;
}
default:
ret = _Z_ERR_GENERIC;
break;
Expand Down
92 changes: 55 additions & 37 deletions src/transport/multicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,86 +24,97 @@
#include "zenoh-pico/transport/multicast.h"
#include "zenoh-pico/transport/multicast/rx.h"
#include "zenoh-pico/transport/multicast/tx.h"
#include "zenoh-pico/transport/raweth/tx.h"
#include "zenoh-pico/transport/unicast/rx.h"
#include "zenoh-pico/transport/utils.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_MULTICAST_TRANSPORT == 1
#if Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1

int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl,
_z_transport_multicast_establish_param_t *param) {
int8_t ret = _Z_RES_OK;

zt->_type = _Z_TRANSPORT_MULTICAST_TYPE;
zt->_transport._multicast._send_f = _z_multicast_send_t_msg;

// Transport specific information
_z_transport_multicast_t *ztm = NULL;
switch (zl->_cap._transport) {
case Z_LINK_CAP_TRANSPORT_MULTICAST:
zt->_type = _Z_TRANSPORT_MULTICAST_TYPE;
ztm = &zt->_transport._multicast;
ztm->_send_f = _z_multicast_send_t_msg;
break;
case Z_LINK_CAP_TRANSPORT_RAWETH:
zt->_type = _Z_TRANSPORT_RAWETH_TYPE;
ztm = &zt->_transport._raweth;
ztm->_send_f = _z_raweth_send_t_msg;
break;
default:
return _Z_ERR_GENERIC;
}
#if Z_FEATURE_MULTI_THREAD == 1
// Initialize the mutexes
ret = _z_mutex_init(&zt->_transport._multicast._mutex_tx);
ret = _z_mutex_init(&ztm->_mutex_tx);
if (ret == _Z_RES_OK) {
ret = _z_mutex_init(&zt->_transport._multicast._mutex_rx);
ret = _z_mutex_init(&ztm->_mutex_rx);
if (ret == _Z_RES_OK) {
ret = _z_mutex_init(&zt->_transport._multicast._mutex_peer);
ret = _z_mutex_init(&ztm->_mutex_peer);
if (ret != _Z_RES_OK) {
_z_mutex_free(&zt->_transport._multicast._mutex_tx);
_z_mutex_free(&zt->_transport._multicast._mutex_rx);
_z_mutex_free(&ztm->_mutex_tx);
_z_mutex_free(&ztm->_mutex_rx);
}
} else {
_z_mutex_free(&zt->_transport._multicast._mutex_tx);
_z_mutex_free(&ztm->_mutex_tx);
}
}
#endif // Z_FEATURE_MULTI_THREAD == 1

// Initialize the read and write buffers
if (ret == _Z_RES_OK) {
uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE;
zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, false);
zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);
ztm->_wbuf = _z_wbuf_make(mtu, false);
ztm->_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._multicast._wbuf) != mtu) ||
(_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_MULTICAST_SIZE)) {
if ((_z_wbuf_capacity(&ztm->_wbuf) != mtu) || (_z_zbuf_capacity(&ztm->_zbuf) != Z_BATCH_MULTICAST_SIZE)) {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;

#if Z_FEATURE_MULTI_THREAD == 1
_z_mutex_free(&zt->_transport._multicast._mutex_tx);
_z_mutex_free(&zt->_transport._multicast._mutex_rx);
_z_mutex_free(&zt->_transport._multicast._mutex_peer);
_z_mutex_free(&ztm->_mutex_tx);
_z_mutex_free(&ztm->_mutex_rx);
_z_mutex_free(&ztm->_mutex_peer);
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_wbuf_clear(&zt->_transport._multicast._wbuf);
_z_zbuf_clear(&zt->_transport._multicast._zbuf);
_z_wbuf_clear(&ztm->_wbuf);
_z_zbuf_clear(&ztm->_zbuf);
}
}

if (ret == _Z_RES_OK) {
// Set default SN resolution
zt->_transport._multicast._sn_res = _z_sn_max(param->_seq_num_res);
ztm->_sn_res = _z_sn_max(param->_seq_num_res);

// The initial SN at TX side
zt->_transport._multicast._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable;
zt->_transport._multicast._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort;
ztm->_sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable;
ztm->_sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort;

// Initialize peer list
zt->_transport._multicast._peers = _z_transport_peer_entry_list_new();
ztm->_peers = _z_transport_peer_entry_list_new();

#if Z_FEATURE_MULTI_THREAD == 1
// Tasks
zt->_transport._multicast._read_task_running = false;
zt->_transport._multicast._read_task = NULL;
zt->_transport._multicast._lease_task_running = false;
zt->_transport._multicast._lease_task = NULL;
ztm->_read_task_running = false;
ztm->_read_task = NULL;
ztm->_lease_task_running = false;
ztm->_lease_task = NULL;
#endif // Z_FEATURE_MULTI_THREAD == 1

zt->_transport._multicast._lease = Z_TRANSPORT_LEASE;
ztm->_lease = Z_TRANSPORT_LEASE;

// Notifiers
zt->_transport._multicast._transmitted = false;
ztm->_transmitted = false;

// Transport link for multicast
zt->_transport._multicast._link = *zl;
ztm->_link = *zl;
}

return ret;
}

Expand All @@ -125,14 +136,22 @@ int8_t _z_multicast_open_peer(_z_transport_multicast_establish_param_t *param, c

// Encode and send the message
_Z_INFO("Sending Z_JOIN message\n");
ret = _z_link_send_t_msg(zl, &jsm);
switch (zl->_cap._transport) {
case Z_LINK_CAP_TRANSPORT_MULTICAST:
ret = _z_link_send_t_msg(zl, &jsm);
break;
case Z_LINK_CAP_TRANSPORT_RAWETH:
ret = _z_raweth_link_send_t_msg(zl, &jsm);
break;
default:
return _Z_ERR_GENERIC;
}
_z_t_msg_clear(&jsm);

if (ret == _Z_RES_OK) {
param->_seq_num_res = jsm._body._join._seq_num_res;
param->_initial_sn_tx = next_sn;
}

return ret;
}

Expand All @@ -150,7 +169,7 @@ int8_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _B
int8_t ret = _Z_RES_OK;
// Send and clear message
_z_transport_message_t cm = _z_t_msg_make_close(reason, link_only);
ret = _z_multicast_send_t_msg(ztm, &cm);
ret = ztm->_send_f(ztm, &cm);
_z_t_msg_clear(&cm);
return ret;
}
Expand All @@ -171,7 +190,6 @@ void _z_multicast_transport_clear(_z_transport_t *zt) {
_z_task_join(ztm->_lease_task);
_z_task_free(&ztm->_lease_task);
}

// Clean up the mutexes
_z_mutex_free(&ztm->_mutex_tx);
_z_mutex_free(&ztm->_mutex_rx);
Expand Down Expand Up @@ -228,4 +246,4 @@ int8_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reaso
}

void _z_multicast_transport_clear(_z_transport_t *zt) { _ZP_UNUSED(zt); }
#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1
#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1
Loading

0 comments on commit a7b9b7f

Please sign in to comment.