From 08d1f574a641fdf9692555c220f1a55967970bb7 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Thu, 5 Dec 2024 13:02:33 +0100 Subject: [PATCH] fix: merge shenanigans (#25) --- src/protocol/definitions/transport.c | 3 +- src/transport/common/tx.c | 4 +-- src/transport/multicast/rx.c | 41 +++++++++------------ src/transport/peer_entry.c | 1 - src/transport/raweth/tx.c | 6 ++-- src/transport/unicast/rx.c | 54 +++++++++++++--------------- src/transport/unicast/transport.c | 20 +++++++---- 7 files changed, 60 insertions(+), 69 deletions(-) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index cd74a8ded..69b1cc502 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -279,8 +279,7 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t /*------------------ Fragment Message ------------------*/ _z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool first, bool drop) { - return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last); - return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, first, drop); + return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last, first, drop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, bool is_last, bool first, bool drop) { diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index aeebe1daa..9479059f3 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -52,10 +52,9 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc if (!is_first) { sn = _z_transport_tx_get_sn(ztc, reliability); } - is_first = false; // Serialize fragment __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); - z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn); + z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn, is_first); if (ret != _Z_RES_OK) { _Z_ERROR("Fragment serialization failed with err %d", ret); return ret; @@ -64,6 +63,7 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztc->_link, &ztc->_wbuf)); ztc->_transmitted = true; // Tell session we transmitted data + is_first = false; } return _Z_RES_OK; } diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index cb047bf54..26170ea02 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -186,10 +186,14 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, // Note that we receive data from the peer entry->_received = true; - bool consecutive; _z_wbuf_t *dbuf; - // Check if the SN is correct and select the right defragmentation buffer + uint8_t *dbuf_state; + z_reliability_t tmsg_reliability; + bool consecutive; + // Select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { + tmsg_reliability = Z_RELIABILITY_RELIABLE; + // Check SN // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, @@ -198,27 +202,34 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_reliable; + dbuf_state = &entry->_state_reliable; } else { _z_wbuf_clear(&entry->_dbuf_reliable); + entry->_state_reliable = _Z_DBUF_STATE_NULL; _Z_INFO("Reliable message dropped because it is out of order"); break; } } else { + tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; + // Check SN if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn)) { consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_best_effort; + dbuf_state = &entry->_state_best_effort; } else { _z_wbuf_clear(&entry->_dbuf_best_effort); + entry->_state_best_effort = _Z_DBUF_STATE_NULL; _Z_INFO("Best effort message dropped because it is out of order"); break; } } if (!consecutive && _z_wbuf_len(dbuf) > 0) { - _Z_DEBUG("Non-consecutive fragments received"); - _z_wbuf_reset(dbuf); + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + _Z_INFO("Defragmentation buffer dropped because non-consecutive fragments received"); break; } // Handle fragment markers @@ -226,7 +237,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (t_msg->_body._fragment.first) { _z_wbuf_reset(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { - _Z_DEBUG("First fragment received without the first marker"); + _Z_INFO("First fragment received without the first marker"); break; } if (t_msg->_body._fragment.drop) { @@ -234,26 +245,6 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, break; } } - - bool drop = false; - if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { - // Filling the wbuf capacity as a way to signaling the last fragment to reset the dbuf - // Otherwise, last (smaller) fragments can be understood as a complete message - _z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf)); - drop = true; - _z_wbuf_t *dbuf; - uint8_t *dbuf_state; - z_reliability_t tmsg_reliability; - // Select the right defragmentation buffer - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { - tmsg_reliability = Z_RELIABILITY_RELIABLE; - dbuf = &entry->_dbuf_reliable; - dbuf_state = &entry->_state_reliable; - } else { - tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; - dbuf = &entry->_dbuf_best_effort; - dbuf_state = &entry->_state_best_effort; - } // Allocate buffer if needed if (*dbuf_state == _Z_DBUF_STATE_NULL) { *dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index 4dc1ceeaa..38180e5ed 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -32,7 +32,6 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans dst->_state_best_effort = src->_state_best_effort; _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); - dst->_patch = src->_patch; #endif diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index f78399e3a..e8f9be4c4 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -279,9 +279,9 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_common._link, &ztm->_common._wbuf); // Serialize one fragment - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn), - _z_transport_tx_mutex_unlock(&ztm->_common)); - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first), + _Z_CLEAN_RETURN_IF_ERR( + __unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn, is_first), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), _z_transport_tx_mutex_unlock(&ztm->_common)); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 515ee7091..3f374f5af 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -140,35 +140,50 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t case _Z_MID_T_FRAGMENT: { _Z_DEBUG("Received Z_FRAGMENT message"); #if Z_FEATURE_FRAGMENTATION == 1 - bool consecutive; _z_wbuf_t *dbuf; - // Check if the SN is correct and select the right defragmentation buffer - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { + uint8_t *dbuf_state; + z_reliability_t tmsg_reliability; + bool consecutive; + + // Select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { + tmsg_reliability = Z_RELIABILITY_RELIABLE; + // Check SN // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { - consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { + consecutive = + _z_sn_consecutive(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_reliable; + dbuf_state = &ztu->_state_reliable; } else { _z_wbuf_clear(&ztu->_dbuf_reliable); + ztu->_state_reliable = _Z_DBUF_STATE_NULL; _Z_INFO("Reliable message dropped because it is out of order"); break; } } else { - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { - consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; + // Check SN + if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { + consecutive = + _z_sn_consecutive(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_best_effort; + dbuf_state = &ztu->_state_best_effort; } else { _z_wbuf_clear(&ztu->_dbuf_best_effort); + ztu->_state_best_effort = _Z_DBUF_STATE_NULL; _Z_INFO("Best effort message dropped because it is out of order"); break; } } + // Check consecutive SN if (!consecutive && _z_wbuf_len(dbuf) > 0) { - _Z_DEBUG("Non-consecutive fragments received"); - _z_wbuf_reset(dbuf); + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + _Z_INFO("Defragmentation buffer dropped because non-consecutive fragments received"); break; } // Handle fragment markers @@ -176,7 +191,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t if (t_msg->_body._fragment.first) { _z_wbuf_reset(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { - _Z_DEBUG("First fragment received without the start marker"); + _Z_INFO("First fragment received without the start marker"); break; } if (t_msg->_body._fragment.drop) { @@ -184,25 +199,6 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } } - bool drop = false; - if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { - // Filling the wbuf capacity as a way to signal the last fragment to reset the dbuf - // Otherwise, last (smaller) fragments can be understood as a complete message - _z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf)); - drop = true; - _z_wbuf_t *dbuf; - uint8_t *dbuf_state; - z_reliability_t tmsg_reliability; - // Select the right defragmentation buffer - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { - tmsg_reliability = Z_RELIABILITY_RELIABLE; - dbuf = &ztu->_dbuf_reliable; - dbuf_state = &ztu->_state_reliable; - } else { - tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; - dbuf = &ztu->_dbuf_best_effort; - dbuf_state = &ztu->_state_best_effort; - } // Allocate buffer if needed if (*dbuf_state == _Z_DBUF_STATE_NULL) { *dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index b6c62c46f..7ab1578fa 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -177,6 +177,14 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par } else { ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; } +#if Z_FEATURE_FRAGMENTATION == 1 + if (iam._body._init._patch <= ism._body._init._patch) { + param->_patch = iam._body._init._patch; + } else { + // TODO: Use a better error code? + ret = _Z_ERR_GENERIC; + } +#endif if (ret != _Z_RES_OK) { _z_t_msg_clear(&iam); return ret; @@ -220,13 +228,6 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par _z_t_msg_clear(&oam); return _Z_RES_OK; } -#if Z_FEATURE_FRAGMENTATION == 1 - if (iam._body._init._patch > ism._body._init._patch) { - // TODO: Use a better error code? - ret = _Z_ERR_GENERIC; - } - param->_patch = iam._body._init._patch; -#endif // TODO: Activate if we add peer unicast support #if 0 @@ -257,6 +258,11 @@ static z_result_t _z_unicast_handshake_listener(_z_transport_unicast_establish_p if (iam._body._init._batch_size > tmsg._body._init._batch_size) { iam._body._init._batch_size = tmsg._body._init._batch_size; } +#if Z_FEATURE_FRAGMENTATION == 1 + if (iam._body._init._patch > tmsg._body._init._patch) { + iam._body._init._patch = tmsg._body._init._patch; + } +#endif param->_remote_zid = tmsg._body._init._zid; param->_seq_num_res = iam._body._init._seq_num_res; param->_req_id_res = iam._body._init._req_id_res;