Skip to content

Commit

Permalink
fix: merge shenanigans (eclipse-zenoh#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland authored Dec 5, 2024
1 parent 0e5797d commit 08d1f57
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 69 deletions.
3 changes: 1 addition & 2 deletions src/protocol/definitions/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t
/*------------------ Fragment Message ------------------*/
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last,
bool first, bool drop) {
return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last);
return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, first, drop);
return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last, first, drop);
}
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability,
bool is_last, bool first, bool drop) {
Expand Down
4 changes: 2 additions & 2 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc
if (!is_first) {
sn = _z_transport_tx_get_sn(ztc, reliability);
}
is_first = false;
// Serialize fragment
__unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn);
z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn, is_first);
if (ret != _Z_RES_OK) {
_Z_ERROR("Fragment serialization failed with err %d", ret);
return ret;
Expand All @@ -64,6 +63,7 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc
__unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
_Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztc->_link, &ztc->_wbuf));
ztc->_transmitted = true; // Tell session we transmitted data
is_first = false;
}
return _Z_RES_OK;
}
Expand Down
41 changes: 16 additions & 25 deletions src/transport/multicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
// Note that we receive data from the peer
entry->_received = true;

bool consecutive;
_z_wbuf_t *dbuf;
// Check if the SN is correct and select the right defragmentation buffer
uint8_t *dbuf_state;
z_reliability_t tmsg_reliability;
bool consecutive;
// Select the right defragmentation buffer
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) {
tmsg_reliability = Z_RELIABILITY_RELIABLE;
// Check SN
// @TODO: amend once reliability is in place. For the time being only
// monotonic SNs are ensured
if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable,
Expand All @@ -198,62 +202,49 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
t_msg->_body._fragment._sn);
entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn;
dbuf = &entry->_dbuf_reliable;
dbuf_state = &entry->_state_reliable;
} else {
_z_wbuf_clear(&entry->_dbuf_reliable);
entry->_state_reliable = _Z_DBUF_STATE_NULL;
_Z_INFO("Reliable message dropped because it is out of order");
break;
}
} else {
tmsg_reliability = Z_RELIABILITY_BEST_EFFORT;
// Check SN
if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort,
t_msg->_body._fragment._sn)) {
consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort,
t_msg->_body._fragment._sn);
entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn;
dbuf = &entry->_dbuf_best_effort;
dbuf_state = &entry->_state_best_effort;
} else {
_z_wbuf_clear(&entry->_dbuf_best_effort);
entry->_state_best_effort = _Z_DBUF_STATE_NULL;
_Z_INFO("Best effort message dropped because it is out of order");
break;
}
}
if (!consecutive && _z_wbuf_len(dbuf) > 0) {
_Z_DEBUG("Non-consecutive fragments received");
_z_wbuf_reset(dbuf);
_z_wbuf_clear(dbuf);
*dbuf_state = _Z_DBUF_STATE_NULL;
_Z_INFO("Defragmentation buffer dropped because non-consecutive fragments received");
break;
}
// Handle fragment markers
if (_Z_PATCH_HAS_FRAGMENT_MARKERS(entry->_patch)) {
if (t_msg->_body._fragment.first) {
_z_wbuf_reset(dbuf);
} else if (_z_wbuf_len(dbuf) == 0) {
_Z_DEBUG("First fragment received without the first marker");
_Z_INFO("First fragment received without the first marker");
break;
}
if (t_msg->_body._fragment.drop) {
_z_wbuf_reset(dbuf);
break;
}
}

bool drop = false;
if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) {
// Filling the wbuf capacity as a way to signaling the last fragment to reset the dbuf
// Otherwise, last (smaller) fragments can be understood as a complete message
_z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf));
drop = true;
_z_wbuf_t *dbuf;
uint8_t *dbuf_state;
z_reliability_t tmsg_reliability;
// Select the right defragmentation buffer
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) {
tmsg_reliability = Z_RELIABILITY_RELIABLE;
dbuf = &entry->_dbuf_reliable;
dbuf_state = &entry->_state_reliable;
} else {
tmsg_reliability = Z_RELIABILITY_BEST_EFFORT;
dbuf = &entry->_dbuf_best_effort;
dbuf_state = &entry->_state_best_effort;
}
// Allocate buffer if needed
if (*dbuf_state == _Z_DBUF_STATE_NULL) {
*dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);
Expand Down
1 change: 0 additions & 1 deletion src/transport/peer_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans
dst->_state_best_effort = src->_state_best_effort;
_z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable);
_z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort);

dst->_patch = src->_patch;
#endif

Expand Down
6 changes: 3 additions & 3 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_common._link, &ztm->_common._wbuf);
// Serialize one fragment
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn),
_z_transport_tx_mutex_unlock(&ztm->_common));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first),
_Z_CLEAN_RETURN_IF_ERR(
__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn, is_first),
_z_transport_tx_mutex_unlock(&ztm->_common));
// Write the eth header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf),
_z_transport_tx_mutex_unlock(&ztm->_common));
Expand Down
54 changes: 25 additions & 29 deletions src/transport/unicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,69 +140,65 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t
case _Z_MID_T_FRAGMENT: {
_Z_DEBUG("Received Z_FRAGMENT message");
#if Z_FEATURE_FRAGMENTATION == 1
bool consecutive;
_z_wbuf_t *dbuf;
// Check if the SN is correct and select the right defragmentation buffer
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) {
uint8_t *dbuf_state;
z_reliability_t tmsg_reliability;
bool consecutive;

// Select the right defragmentation buffer
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) {
tmsg_reliability = Z_RELIABILITY_RELIABLE;
// Check SN
// @TODO: amend once reliability is in place. For the time being only
// monotonic SNs are ensured
if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) {
consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn);
if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) {
consecutive =
_z_sn_consecutive(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn);
ztu->_sn_rx_reliable = t_msg->_body._fragment._sn;
dbuf = &ztu->_dbuf_reliable;
dbuf_state = &ztu->_state_reliable;
} else {
_z_wbuf_clear(&ztu->_dbuf_reliable);
ztu->_state_reliable = _Z_DBUF_STATE_NULL;
_Z_INFO("Reliable message dropped because it is out of order");
break;
}
} else {
if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) {
consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn);
tmsg_reliability = Z_RELIABILITY_BEST_EFFORT;
// Check SN
if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) {
consecutive =
_z_sn_consecutive(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn);
ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn;
dbuf = &ztu->_dbuf_best_effort;
dbuf_state = &ztu->_state_best_effort;
} else {
_z_wbuf_clear(&ztu->_dbuf_best_effort);
ztu->_state_best_effort = _Z_DBUF_STATE_NULL;
_Z_INFO("Best effort message dropped because it is out of order");
break;
}
}
// Check consecutive SN
if (!consecutive && _z_wbuf_len(dbuf) > 0) {
_Z_DEBUG("Non-consecutive fragments received");
_z_wbuf_reset(dbuf);
_z_wbuf_clear(dbuf);
*dbuf_state = _Z_DBUF_STATE_NULL;
_Z_INFO("Defragmentation buffer dropped because non-consecutive fragments received");
break;
}
// Handle fragment markers
if (_Z_PATCH_HAS_FRAGMENT_MARKERS(ztu->_patch)) {
if (t_msg->_body._fragment.first) {
_z_wbuf_reset(dbuf);
} else if (_z_wbuf_len(dbuf) == 0) {
_Z_DEBUG("First fragment received without the start marker");
_Z_INFO("First fragment received without the start marker");
break;
}
if (t_msg->_body._fragment.drop) {
_z_wbuf_reset(dbuf);
break;
}
}
bool drop = false;
if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) {
// Filling the wbuf capacity as a way to signal the last fragment to reset the dbuf
// Otherwise, last (smaller) fragments can be understood as a complete message
_z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf));
drop = true;
_z_wbuf_t *dbuf;
uint8_t *dbuf_state;
z_reliability_t tmsg_reliability;
// Select the right defragmentation buffer
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) {
tmsg_reliability = Z_RELIABILITY_RELIABLE;
dbuf = &ztu->_dbuf_reliable;
dbuf_state = &ztu->_state_reliable;
} else {
tmsg_reliability = Z_RELIABILITY_BEST_EFFORT;
dbuf = &ztu->_dbuf_best_effort;
dbuf_state = &ztu->_state_best_effort;
}
// Allocate buffer if needed
if (*dbuf_state == _Z_DBUF_STATE_NULL) {
*dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);
Expand Down
20 changes: 13 additions & 7 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par
} else {
ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION;
}
#if Z_FEATURE_FRAGMENTATION == 1
if (iam._body._init._patch <= ism._body._init._patch) {
param->_patch = iam._body._init._patch;
} else {
// TODO: Use a better error code?
ret = _Z_ERR_GENERIC;
}
#endif
if (ret != _Z_RES_OK) {
_z_t_msg_clear(&iam);
return ret;
Expand Down Expand Up @@ -220,13 +228,6 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par
_z_t_msg_clear(&oam);
return _Z_RES_OK;
}
#if Z_FEATURE_FRAGMENTATION == 1
if (iam._body._init._patch > ism._body._init._patch) {
// TODO: Use a better error code?
ret = _Z_ERR_GENERIC;
}
param->_patch = iam._body._init._patch;
#endif

// TODO: Activate if we add peer unicast support
#if 0
Expand Down Expand Up @@ -257,6 +258,11 @@ static z_result_t _z_unicast_handshake_listener(_z_transport_unicast_establish_p
if (iam._body._init._batch_size > tmsg._body._init._batch_size) {
iam._body._init._batch_size = tmsg._body._init._batch_size;
}
#if Z_FEATURE_FRAGMENTATION == 1
if (iam._body._init._patch > tmsg._body._init._patch) {
iam._body._init._patch = tmsg._body._init._patch;
}
#endif
param->_remote_zid = tmsg._body._init._zid;
param->_seq_num_res = iam._body._init._seq_num_res;
param->_req_id_res = iam._body._init._req_id_res;
Expand Down

0 comments on commit 08d1f57

Please sign in to comment.