Skip to content

Commit

Permalink
fix: free mutex when early return
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Dec 15, 2023
1 parent f4d6807 commit dae9f87
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@

#if Z_FEATURE_RAWETH_TRANSPORT == 1

int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) {
#if Z_FEATURE_MULTI_THREAD == 1
static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); }
#else
static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
#endif

static int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) {
int8_t ret = _Z_RES_OK;

if (_ZP_RAWETH_CFG_SIZE < 1) {
Expand Down Expand Up @@ -186,15 +192,15 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth));
_Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth), _zp_raweth_unlock_tx_mutex(ztm));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Encode the session message
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Write the message header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;

Expand Down Expand Up @@ -245,27 +251,29 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth));
_Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth),
_zp_raweth_unlock_tx_mutex(ztm));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Set the frame header
_z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability);
_z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability);
// Encode the frame header
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Encode the network message
if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) {
// Write the eth header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf),
_zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
// Encode the message on the expandable wbuf
_Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Fragment and send the message
_Bool is_first = true;
while (_z_wbuf_len(&fbf) > 0) {
Expand All @@ -279,11 +287,13 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Serialize one fragment
_Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn),
_zp_raweth_unlock_tx_mutex(ztm));
// Write the eth header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf),
_zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
}
Expand Down

0 comments on commit dae9f87

Please sign in to comment.