Skip to content

Commit

Permalink
Improve throughput performance #2 (refcount focus) (#737)
Browse files Browse the repository at this point in the history
* feat: more static inline

* feat: skip null string copy in encoding

* feat: add no-weak refcount

* feat: streamline vec_make

* feat: add publisher check session config token

* fix: simple rc memory leak

* feat: switch arc_slice to simple rc

* feat: add valid flag to timestamp

* feat: add timestamp_move function

* feat: switch sample_create to pass by reference

* fix: missing token on publisher_delete

* feat: update trigger subscription calls

* fix: dummy sample_create prototype

* feat: remove null timestamp value in trigger subs

* feat: optimize timestamps cost

* feat: check string before encoding move

* feat: alias instead of copy payload on decode

* doc: explicit read task errors

* doc: rework transport/codec logs

* feat: keep going on multicast message processing error

* fix: revert payload aliasing

* fix: set timestamp valid only if decode successful

* fix: review comment
  • Loading branch information
jean-roland authored Oct 14, 2024
1 parent f8df508 commit 4bd2ca2
Show file tree
Hide file tree
Showing 36 changed files with 410 additions and 163 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ set(Z_FEATURE_UNICAST_TRANSPORT 1 CACHE STRING "Toggle unicast transport")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport")
set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")

add_compile_definitions("Z_BUILD_DEBUG=$<CONFIG:Debug>")
message(STATUS "Building with feature confing:\n\
Expand Down
12 changes: 6 additions & 6 deletions include/zenoh-pico/collections/arc_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@
#include "slice.h"
#include "zenoh-pico/system/platform_common.h"

_Z_REFCOUNT_DEFINE(_z_slice, _z_slice)
_Z_SIMPLE_REFCOUNT_DEFINE(_z_slice, _z_slice)

/*-------- ArcSlice --------*/
/**
* An atomically reference counted subslice.
*
* Members:
* _z_slice_rc_t len: Rc counted slice.
* _z_slice_simple_rc_t len: Rc counted slice.
* size_t start: Offset to the subslice start.
* size_t len: Length of the subslice.
*/

typedef struct {
_z_slice_rc_t slice;
_z_slice_simple_rc_t slice;
size_t start;
size_t len;
} _z_arc_slice_t;

_z_arc_slice_t _z_arc_slice_empty(void);
static inline _z_arc_slice_t _z_arc_slice_empty(void) { return (_z_arc_slice_t){0}; }
static inline size_t _z_arc_slice_len(const _z_arc_slice_t* s) { return s->len; }
static inline bool _z_arc_slice_is_empty(const _z_arc_slice_t* s) { return _z_arc_slice_len(s) == 0; }
_z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len);
_z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len);
size_t _z_arc_slice_len(const _z_arc_slice_t* s);
bool _z_arc_slice_is_empty(const _z_arc_slice_t* s);
const uint8_t* _z_arc_slice_data(const _z_arc_slice_t* s);
z_result_t _z_arc_slice_copy(_z_arc_slice_t* dst, const _z_arc_slice_t* src);
z_result_t _z_arc_slice_move(_z_arc_slice_t* dst, _z_arc_slice_t* src);
Expand Down
98 changes: 86 additions & 12 deletions include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ z_result_t _z_rc_weak_upgrade(void *cnt);
size_t _z_rc_weak_count(void *cnt);
size_t _z_rc_strong_count(void *cnt);

z_result_t _z_simple_rc_init(void **cnt);
z_result_t _z_simple_rc_increase(void *cnt);
bool _z_simple_rc_decrease(void **cnt);

size_t _z_simple_rc_strong_count(void *cnt);

/*------------------ Internal Array Macros ------------------*/
#define _Z_REFCOUNT_DEFINE(name, type) \
typedef struct name##_rc_t { \
Expand All @@ -46,18 +52,8 @@ size_t _z_rc_strong_count(void *cnt);
void *_cnt; \
} name##_weak_t; \
\
static inline name##_rc_t name##_rc_null(void) { \
name##_rc_t p; \
p._val = NULL; \
p._cnt = NULL; \
return p; \
} \
static inline name##_weak_t name##_weak_null(void) { \
name##_weak_t p; \
p._val = NULL; \
p._cnt = NULL; \
return p; \
} \
static inline name##_rc_t name##_rc_null(void) { return (name##_rc_t){0}; } \
static inline name##_weak_t name##_weak_null(void) { return (name##_weak_t){0}; } \
\
static inline name##_rc_t name##_rc_new(type##_t *val) { \
name##_rc_t p = name##_rc_null(); \
Expand Down Expand Up @@ -175,6 +171,84 @@ size_t _z_rc_strong_count(void *cnt);
return sizeof(name##_rc_t); \
}

#define _Z_SIMPLE_REFCOUNT_DEFINE(name, type) \
typedef struct name##_simple_rc_t { \
type##_t *_val; \
void *_cnt; \
} name##_simple_rc_t; \
\
static inline name##_simple_rc_t name##_simple_rc_null(void) { return (name##_simple_rc_t){0}; } \
\
static inline name##_simple_rc_t name##_simple_rc_new(type##_t *val) { \
name##_simple_rc_t p = name##_simple_rc_null(); \
if (_z_simple_rc_init(&p._cnt) == _Z_RES_OK) { \
p._val = val; \
} \
return p; \
} \
static inline name##_simple_rc_t name##_simple_rc_new_from_val(const type##_t *val) { \
type##_t *v = (type##_t *)z_malloc(sizeof(type##_t)); \
if (v == NULL) { \
return name##_simple_rc_null(); \
} \
*v = *val; \
name##_simple_rc_t p = name##_simple_rc_new(v); \
if (p._cnt == NULL) { \
z_free(v); \
return name##_simple_rc_null(); \
} \
return p; \
} \
static inline name##_simple_rc_t name##_simple_rc_clone(const name##_simple_rc_t *p) { \
name##_simple_rc_t c = name##_simple_rc_null(); \
if (_z_simple_rc_increase(p->_cnt) == _Z_RES_OK) { \
c = *p; \
} \
return c; \
} \
static inline name##_simple_rc_t *name##_simple_rc_clone_as_ptr(const name##_simple_rc_t *p) { \
name##_simple_rc_t *c = (name##_simple_rc_t *)z_malloc(sizeof(name##_simple_rc_t)); \
if (c != NULL) { \
*c = name##_simple_rc_clone(p); \
if (c->_cnt == NULL) { \
z_free(c); \
} \
} \
return c; \
} \
static inline void name##_simple_rc_copy(name##_simple_rc_t *dst, const name##_simple_rc_t *p) { \
*dst = name##_simple_rc_clone(p); \
} \
static inline bool name##_simple_rc_eq(const name##_simple_rc_t *left, const name##_simple_rc_t *right) { \
return (left->_val == right->_val); \
} \
static inline bool name##_simple_rc_decr(name##_simple_rc_t *p) { \
if ((p == NULL) || (p->_cnt == NULL)) { \
return false; \
} \
if (_z_simple_rc_decrease(&p->_cnt)) { \
return true; \
} \
return false; \
} \
static inline bool name##_simple_rc_drop(name##_simple_rc_t *p) { \
if (p == NULL) { \
return false; \
} \
bool res = false; \
if (name##_simple_rc_decr(p) && p->_val != NULL) { \
type##_clear(p->_val); \
z_free(p->_val); \
res = true; \
} \
*p = name##_simple_rc_null(); \
return res; \
} \
static inline size_t name##_simple_rc_size(name##_simple_rc_t *p) { \
_ZP_UNUSED(p); \
return sizeof(name##_simple_rc_t); \
}

#ifdef __cplusplus
}
#endif
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/collections/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static inline _z_delete_context_t _z_delete_context_null(void) { return (_z_dele
static inline _z_delete_context_t _z_delete_context_create(void (*deleter)(void *context, void *data), void *context) {
return (_z_delete_context_t){.deleter = deleter, .context = context};
}
bool _z_delete_context_is_null(const _z_delete_context_t *c);
static inline bool _z_delete_context_is_null(const _z_delete_context_t *c) { return c->deleter == NULL; }
_z_delete_context_t _z_delete_context_default(void);
void _z_delete_context_delete(_z_delete_context_t *c, void *data);

Expand All @@ -52,6 +52,7 @@ typedef struct {
} _z_slice_t;

static inline _z_slice_t _z_slice_empty(void) { return (_z_slice_t){0}; }
static inline void _z_slice_reset(_z_slice_t *bs) { *bs = _z_slice_empty(); }
static inline bool _z_slice_is_empty(const _z_slice_t *bs) { return bs->len == 0; }
static inline bool _z_slice_check(const _z_slice_t *slice) { return slice->start != NULL; }
static inline _z_slice_t _z_slice_alias(const _z_slice_t bs) {
Expand All @@ -67,7 +68,6 @@ z_result_t _z_slice_copy(_z_slice_t *dst, const _z_slice_t *src);
z_result_t _z_slice_n_copy(_z_slice_t *dst, const _z_slice_t *src, size_t offset, size_t len);
_z_slice_t _z_slice_duplicate(const _z_slice_t *src);
void _z_slice_move(_z_slice_t *dst, _z_slice_t *src);
void _z_slice_reset(_z_slice_t *bs);
bool _z_slice_eq(const _z_slice_t *left, const _z_slice_t *right);
void _z_slice_clear(_z_slice_t *bs);
void _z_slice_free(_z_slice_t **bs);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#define Z_FEATURE_ENCODING_VALUES 1
#define Z_FEATURE_TCP_NODELAY 1
#define Z_FEATURE_LOCAL_SUBSCRIBER 0
#define Z_FEATURE_PUBLISHER_SESSION_CHECK 1
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#define Z_FEATURE_ENCODING_VALUES @Z_FEATURE_ENCODING_VALUES@
#define Z_FEATURE_TCP_NODELAY @Z_FEATURE_TCP_NODELAY@
#define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ void _z_sample_free(_z_sample_t **sample);
z_result_t _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src);
_z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t *payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment, z_reliability_t reliability);
const _z_bytes_t *attachment, z_reliability_t reliability);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
4 changes: 3 additions & 1 deletion include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ _z_id_t _z_id_empty(void);
* A zenoh timestamp.
*/
typedef struct {
bool valid;
_z_id_t id;
uint64_t time;
} _z_timestamp_t;

// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_timestamp_t _z_timestamp_null(void) { return (_z_timestamp_t){0}; }
static inline bool _z_timestamp_check(const _z_timestamp_t *stamp) { return stamp->valid; }
_z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp);
void _z_timestamp_clear(_z_timestamp_t *tstamp);
bool _z_timestamp_check(const _z_timestamp_t *stamp);
void _z_timestamp_move(_z_timestamp_t *dst, _z_timestamp_t *src);
uint64_t _z_timestamp_ntp64_from_time(uint32_t seconds, uint32_t nanos);

/**
Expand Down
8 changes: 4 additions & 4 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
#include "zenoh-pico/net/session.h"

/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment, z_reliability_t reliability);
const _z_bytes_t *attachment, z_reliability_t reliability);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *keyexpr);

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability);
const _z_n_qos_t qos, const _z_bytes_t *attachment, z_reliability_t reliability);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
#endif
Expand Down
Loading

0 comments on commit 4bd2ca2

Please sign in to comment.