From da14e5b17004e1924e22b727ca916439a595dff6 Mon Sep 17 00:00:00 2001 From: yixianlv Date: Thu, 18 Nov 2021 15:54:32 +0100 Subject: [PATCH 1/3] Add set_topic_filter & matched_pub, sub & status getters & statistics api Signed-off-by: yixianlv --- clayer/pysertype.c | 140 +++++++++++++++++++++++++ cyclonedds/core.py | 87 +++++++++++++++- cyclonedds/internal.py | 41 ++++++++ cyclonedds/pub.py | 167 +++++++++++++++++++++++++++++- cyclonedds/sub.py | 218 ++++++++++++++++++++++++++++++++++++++- cyclonedds/topic.py | 75 +++++++++++++- tests/test_reader.py | 28 +++++ tests/test_statistics.py | 30 ++++++ tests/test_topic.py | 30 +++++- tests/test_writer.py | 30 ++++++ 10 files changed, 838 insertions(+), 8 deletions(-) create mode 100644 tests/test_statistics.py diff --git a/clayer/pysertype.c b/clayer/pysertype.c index 2ef2f581..177c0212 100644 --- a/clayer/pysertype.c +++ b/clayer/pysertype.c @@ -1670,6 +1670,138 @@ ddspy_take_endpoint(PyObject *self, PyObject *args) /* end builtin topic */ +static PyObject * +ddspy_get_matched_subscription_data(PyObject *self, PyObject *args) +{ + dds_entity_t writer; + dds_instance_handle_t handle; + dds_builtintopic_endpoint_t* endpoint = NULL; + + PyObject* endpoint_constructor; + PyObject* cqos_to_qos; + (void)self; + + if (!PyArg_ParseTuple(args, "iKOO", &writer, &handle, &endpoint_constructor, &cqos_to_qos)) + return NULL; + + endpoint = dds_get_matched_subscription_data(writer, handle); + + if (endpoint == NULL) { + Py_INCREF(Py_None); + return Py_None; + } + + PyObject* qos_p, *qos; + + if (endpoint->qos != NULL) { + qos_p = PyLong_FromVoidPtr(endpoint->qos); + if (PyErr_Occurred()) { + PyErr_Clear(); + PyErr_SetString(PyExc_Exception, "VoidPtr errored."); + return NULL; + } + qos = PyObject_CallFunction(cqos_to_qos, "O", qos_p); + if (PyErr_Occurred()) { + PyErr_Clear(); + PyErr_SetString(PyExc_Exception, "Callfunc cqos errored."); + return NULL; + } + } else { + Py_INCREF(Py_None); + Py_INCREF(Py_None); + qos_p = Py_None; + qos = Py_None; + } + + PyObject* item = PyObject_CallFunction( \ + endpoint_constructor, "y#y#Ks#s#O", \ + endpoint->key.v, (Py_ssize_t) 16, \ + endpoint->participant_key.v, (Py_ssize_t) 16, \ + endpoint->participant_instance_handle, + endpoint->topic_name, endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name), + endpoint->type_name, endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name), + qos + ); + if (PyErr_Occurred()) { + PyErr_Clear(); + PyErr_SetString(PyExc_Exception, "Callfunc endpoint constructor errored."); + return NULL; + } + Py_DECREF(qos_p); + Py_DECREF(qos); + + dds_builtintopic_free_endpoint(endpoint); + + return item; +} + + +static PyObject * +ddspy_get_matched_publication_data(PyObject *self, PyObject *args) +{ + dds_entity_t reader; + dds_instance_handle_t handle; + dds_builtintopic_endpoint_t* endpoint = NULL; + + PyObject* endpoint_constructor; + PyObject* cqos_to_qos; + (void)self; + + if (!PyArg_ParseTuple(args, "iKOO", &reader, &handle, &endpoint_constructor, &cqos_to_qos)) + return NULL; + + endpoint = dds_get_matched_publication_data(reader, handle); + + if (endpoint == NULL) { + Py_INCREF(Py_None); + return Py_None; + } + + PyObject* qos_p, *qos; + + if (endpoint->qos != NULL) { + qos_p = PyLong_FromVoidPtr(endpoint->qos); + if (PyErr_Occurred()) { + PyErr_Clear(); + PyErr_SetString(PyExc_Exception, "VoidPtr errored."); + return NULL; + } + qos = PyObject_CallFunction(cqos_to_qos, "O", qos_p); + if (PyErr_Occurred()) { + PyErr_Clear(); + PyErr_SetString(PyExc_Exception, "Callfunc cqos errored."); + return NULL; + } + } else { + Py_INCREF(Py_None); + Py_INCREF(Py_None); + qos_p = Py_None; + qos = Py_None; + } + + PyObject* item = PyObject_CallFunction( \ + endpoint_constructor, "y#y#Ks#s#O", \ + endpoint->key.v, (Py_ssize_t) 16, \ + endpoint->participant_key.v, (Py_ssize_t) 16, \ + endpoint->participant_instance_handle, + endpoint->topic_name, endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name), + endpoint->type_name, endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name), + qos + ); + if (PyErr_Occurred()) { + PyErr_Clear(); + PyErr_SetString(PyExc_Exception, "Callfunc endpoint constructor errored."); + return NULL; + } + Py_DECREF(qos_p); + Py_DECREF(qos); + + dds_builtintopic_free_endpoint(endpoint); + + return item; +} + + char ddspy_docs[] = "DDSPY module"; PyMethodDef ddspy_funcs[] = { @@ -1777,6 +1909,14 @@ PyMethodDef ddspy_funcs[] = { (PyCFunction)ddspy_take_endpoint, METH_VARARGS, ddspy_docs}, + { "ddspy_get_matched_subscription_data", + (PyCFunction)ddspy_get_matched_subscription_data, + METH_VARARGS, + ddspy_docs}, + { "ddspy_get_matched_publication_data", + (PyCFunction)ddspy_get_matched_publication_data, + METH_VARARGS, + ddspy_docs}, { NULL} }; diff --git a/cyclonedds/core.py b/cyclonedds/core.py index 60ea5a5f..e1f34fd9 100644 --- a/cyclonedds/core.py +++ b/cyclonedds/core.py @@ -17,8 +17,9 @@ import ctypes as ct from weakref import WeakValueDictionary from typing import Any, Callable, Dict, Optional, List, TYPE_CHECKING +from datetime import datetime, time, timedelta -from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS +from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS, stat_keyvalue, stat_kind from .qos import Qos, Policy, _CQos @@ -1529,6 +1530,7 @@ async def wait_async(self, timeout: Optional[int] = None) -> int: with concurrent.futures.ThreadPoolExecutor() as pool: return await loop.run_in_executor(pool, self.wait, timeout) + @c_call("dds_create_waitset") def _create_waitset(self, domain_participant: dds_c_t.entity) -> dds_c_t.entity: pass @@ -1556,6 +1558,87 @@ def _waitset_set_trigger(self, waitset: dds_c_t.entity, value: ct.c_bool) -> dds pass +class Statistics(DDS): + """Statistics object for entity. + + Attributes + ---------- + entity: Entity + The handle of entity to which this set of statistics applies. + opaque: int + The internal data. + time: datatime + Time stamp of lastest call to `Statistics(entity).refresh()` in nanoseconds since epoch. + count: int + Number of key-value pairs. + data: dict + Data. + """ + entity: Entity + opaque: int + time: datetime + count: int + data: Dict[str, int] + + def __init__(self, entity: Entity): + self.entity = entity + self._c_statistics = self._create_statistics(entity._ref) + if not self._c_statistics: + raise DDSException(DDSException.DDS_RETCODE_ERROR, msg="Could not initialize statistics.") + self._c_statistics = ct.cast( + self._c_statistics, ct.POINTER(dds_c_t.stat_factory(self._c_statistics[0].count)) + ) + self._update() + + def __del__(self): + self._delete_statistics(ct.cast(self._c_statistics, ct.POINTER(dds_c_t.statistics))) + + def _update(self): + self.data = {} + self.opaque = self._c_statistics[0].opaque + self.time = self._c_statistics[0].time + self.count = self._c_statistics[0].count + self.kv = self._c_statistics[0].kv + + for i in range(self.count): + name = self.kv[i].name.decode('utf8') # ct.c_char_p + value = None + if self.kv[i].kind == stat_kind.DDS_STAT_KIND_UINT32: + value = self.kv[i].u.u32 + elif self.kv[i].kind == stat_kind.DDS_STAT_KIND_UINT64: + value = self.kv[i].u.u64 + elif self.kv[i].kind == stat_kind.DDS_STAT_KIND_LENGTHTIME: + value = self.kv[i].u.lengthtime + self.data[name] = value + + def refresh(self): + """Update a previously created statistics structure with current values. + + Only the time stamp and the values (and "opaque") may change. + The set of keys and the types of the values do not change. + """ + self._refresh_statistics(ct.cast(self._c_statistics, ct.POINTER(dds_c_t.statistics))) + self._c_statistics = ct.cast(self._c_statistics, ct.POINTER(dds_c_t.stat_factory(self._c_statistics[0].count))) + self._update() + + def __str__(self): + return f"Statistics({self.entity}, opaque={self.opaque}, time={self.time}, data={self.data})" + + @c_call("dds_create_statistics") + def _create_statistics(self, entity: dds_c_t.entity) -> ct.POINTER(dds_c_t.statistics): + pass + + @c_call("dds_refresh_statistics") + def _refresh_statistics(self, stat: ct.POINTER(dds_c_t.statistics)) -> dds_c_t.returnv: + pass + + @c_call("dds_delete_statistics") + def _delete_statistics(self, stat: ct.POINTER(dds_c_t.statistics)) -> None: + pass + + __repr__ = __str__ + + __all__ = ["DDSException", "Entity", "Qos", "Policy", "Listener", "DDSStatus", "ViewState", "InstanceState", "SampleState", "ReadCondition", "QueryCondition", "GuardCondition", - "WaitSet"] + "WaitSet", "Statistics"] diff --git a/cyclonedds/internal.py b/cyclonedds/internal.py index 4651e10c..8b2e9960 100644 --- a/cyclonedds/internal.py +++ b/cyclonedds/internal.py @@ -18,6 +18,7 @@ from ctypes.util import find_library from functools import wraps from dataclasses import dataclass +from enum import IntEnum class CycloneDDSLoaderException(Exception): @@ -235,6 +236,26 @@ class InvalidSample: sample_info: SampleInfo +class stat_kind(IntEnum): + DDS_STAT_KIND_UINT32 = 0 + DDS_STAT_KIND_UINT64 = 1 + DDS_STAT_KIND_LENGTHTIME = 2 + + +class stat_value(ct.Union): + _fields_ = [ + ('u32', ct.c_uint32), + ('u64', ct.c_uint64), + ('lengthtime', ct.c_uint64) + ] + +class stat_keyvalue(ct.Structure): + _fields_ = [ + ('name', ct.c_char_p), + ('kind', ct.c_int), + ('u', stat_value) + ] + class dds_c_t: # noqa N801 entity = ct.c_int32 time = ct.c_int64 @@ -345,6 +366,26 @@ class sample_buffer(ct.Structure): # noqa N801 ('len', ct.c_size_t) ] + class statistics(ct.Structure): + _fields_ = [ + ('entity', ct.c_int32), + ('opaque', ct.c_uint64), + ('time', ct.c_int64), + ('count', ct.c_size_t), + ('kv', ct.c_void_p) + ] + + def stat_factory(n_kv: int) -> ct.Structure: + class vstatistics(ct.Structure): + _fields_ = [ + ('entity', ct.c_int32), + ('opaque', ct.c_uint64), + ('time', ct.c_int64), + ('count', ct.c_size_t), + ('kv', stat_keyvalue * n_kv) + ] + return vstatistics + import cyclonedds._clayer as _clayer diff --git a/cyclonedds/pub.py b/cyclonedds/pub.py index 17daca74..230ee279 100644 --- a/cyclonedds/pub.py +++ b/cyclonedds/pub.py @@ -10,18 +10,21 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause """ -from typing import Optional, Union, TYPE_CHECKING +from typing import Optional, Union, TYPE_CHECKING, List +import ctypes as ct +import uuid from .internal import c_call, dds_c_t from .core import Entity, DDSException, Listener from .domain import DomainParticipant from .topic import Topic from .qos import _CQos, Qos, LimitedScopeQos, PublisherQos, DataWriterQos +from .builtin import DcpsEndpoint from cyclonedds._clayer import ddspy_write, ddspy_write_ts, ddspy_dispose, ddspy_writedispose, ddspy_writedispose_ts, \ ddspy_dispose_handle, ddspy_dispose_handle_ts, ddspy_register_instance, ddspy_unregister_instance, \ ddspy_unregister_instance_handle, ddspy_unregister_instance_ts, ddspy_unregister_instance_handle_ts, \ - ddspy_lookup_instance, ddspy_dispose_ts + ddspy_lookup_instance, ddspy_dispose_ts, ddspy_get_matched_subscription_data if TYPE_CHECKING: @@ -139,6 +142,7 @@ def __init__(self, self._topic = topic self.data_type = topic.data_type self._keepalive_entities = [self.publisher, self.topic] + self._constructor = None @property def topic(self) -> 'cyclonedds.topic.Topic': @@ -223,6 +227,144 @@ def lookup_instance(self, sample): return None return ret + def get_matched_subscriptions(self) -> List[int]: + """Get instance handles of the data readers matching a writer. + + Raises + ------ + DDSException: When the number of matching readers < 0. + + Returns + ------- + List[int]: + A list of instance handles of the matching data readers. + """ + num_matched_sub = self._get_matched_subscriptions(self._ref, None, 0) + if num_matched_sub < 0: + raise DDSException(num_matched_sub, f"Occurred when getting the number of matched subscriptions of {repr(self)}") + if num_matched_sub == 0: + return [] + + matched_sub_list = (dds_c_t.instance_handle * int(num_matched_sub))() + matched_sub_list_pt = ct.cast(matched_sub_list, ct.POINTER(dds_c_t.instance_handle)) + + ret = self._get_matched_subscriptions(self._ref, matched_sub_list_pt, num_matched_sub) + if ret >= 0: + return [matched_sub_list[i] for i in range(ret)] + + raise DDSException(ret, f"Occurred when getting the matched subscriptions of {repr(self)}") + + matched_sub = property(get_matched_subscriptions) + + def _make_constructors(self): + if self._constructor is not None: + return + + def endpoint_constructor(keyhash, participant_keyhash, instance_handle, topic_name, type_name, qos): + return DcpsEndpoint( + key=uuid.UUID(bytes=keyhash), + participant_key=uuid.UUID(bytes=participant_keyhash), + participant_instance_handle=instance_handle, + topic_name=topic_name, + type_name=type_name, + qos=qos + ) + + def cqos_to_qos(pointer): + p = ct.cast(pointer, dds_c_t.qos_p) + return _CQos.cqos_to_qos(p) + + self._constructor = endpoint_constructor + self._cqos = cqos_to_qos + + def get_matched_subscription_data(self, handle) -> Optional['cyclonedds.builtin.DcpsEndpoint']: + """Get a description of a reader matched with the provided writer + + Parameters + ---------- + handle: Int + The instance handle of a reader. + + Returns + ------- + DcpsEndpoint: + The sample of the DcpsEndpoint built-in topic. + """ + self._make_constructors() + return ddspy_get_matched_subscription_data(self._ref, handle, self._constructor, self._cqos) + + def get_liveliness_lost_status(self): + """Get LIVELINESS_LOST status + + Raises + ------ + DDSException + + Returns + ------- + liveness_lost_status: + The class 'liveness_lost_status' value. + """ + status = dds_c_t.liveliness_lost_status() + ret = self._get_liveliness_lost_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the liveliness lost status for {repr(self)}") + + def get_offered_deadline_missed_status(self): + """Get OFFERED DEADLINE MISSED status + + Raises + ------ + DDSException + + Returns + ------- + offered_deadline_missed_status: + The class 'offered_deadline_missed_status' value. + """ + status = dds_c_t.offered_deadline_missed_status() + ret = self._get_offered_deadline_missed_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the offered deadline missed status for {repr(self)}") + + def get_offered_incompatible_qos_status(self): + """Get OFFERED INCOMPATIBLE QOS status + + Raises + ------ + DDSException + + Returns + ------- + offered_incompatible_qos_status: + The class 'offered_incompatible_qos_status' value. + """ + status = dds_c_t.offered_incompatible_qos_status() + ret = self._get_offered_incompatible_qos_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the offered incompatible qos status for {repr(self)}") + + def get_publication_matched_status(self): + """Get PUBLICATION MATCHED status + + Raises + ------ + DDSException + + Returns + ------- + publication_matched_status: + The class 'publication_matched_status' value. + """ + status = dds_c_t.publication_matched_status() + ret = self._get_publication_matched_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the publication matched status for {repr(self)}") + @c_call("dds_create_writer") def _create_writer(self, publisher: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: @@ -231,3 +373,24 @@ def _create_writer(self, publisher: dds_c_t.entity, topic: dds_c_t.entity, qos: @c_call("dds_wait_for_acks") def _wait_for_acks(self, publisher: dds_c_t.entity, timeout: dds_c_t.duration) -> dds_c_t.returnv: pass + + @c_call("dds_get_matched_subscriptions") + def _get_matched_subscriptions(self, writer: dds_c_t.entity, handle: ct.POINTER(dds_c_t.instance_handle), + size: ct.c_size_t) -> dds_c_t.returnv: + pass + + @c_call("dds_get_liveliness_lost_status") + def _get_liveliness_lost_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.liveliness_lost_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_offered_deadline_missed_status") + def _get_offered_deadline_missed_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.offered_deadline_missed_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_offered_incompatible_qos_status") + def _get_offered_incompatible_qos_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.offered_incompatible_qos_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_publication_matched_status") + def _get_publication_matched_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.publication_matched_status)) -> dds_c_t.returnv: + pass \ No newline at end of file diff --git a/cyclonedds/sub.py b/cyclonedds/sub.py index 0756c803..a0bcd28c 100644 --- a/cyclonedds/sub.py +++ b/cyclonedds/sub.py @@ -10,9 +10,12 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause """ +import ctypes as ct import asyncio import concurrent.futures from typing import AsyncGenerator, List, Optional, Union, Generator, TYPE_CHECKING +import uuid +from dataclasses import dataclass from .core import Entity, Listener, DDSException, WaitSet, ReadCondition, SampleState, InstanceState, ViewState from .domain import DomainParticipant @@ -21,13 +24,23 @@ from .qos import _CQos, Qos, LimitedScopeQos, SubscriberQos, DataReaderQos from .util import duration -from cyclonedds._clayer import ddspy_read, ddspy_take, ddspy_read_handle, ddspy_take_handle, ddspy_lookup_instance +from cyclonedds._clayer import ddspy_read, ddspy_take, ddspy_read_handle, ddspy_take_handle, ddspy_lookup_instance, ddspy_get_matched_publication_data if TYPE_CHECKING: import cyclonedds +@dataclass +class DcpsEndpoint: + key: uuid.UUID + participant_key: uuid.UUID + participant_instance_handle: int + topic_name: str + type_name: str + qos: Qos + + class Subscriber(Entity): def __init__( self, @@ -140,6 +153,7 @@ def __init__( self._topic_ref = topic._ref self._next_condition = None self._keepalive_entities = [self.subscriber, topic] + self._constructor = None @property def topic(self) -> 'cyclonedds.topic.Topic': @@ -356,6 +370,180 @@ def lookup_instance(self, sample): return None return ret + def get_matched_publications(self) -> List[int]: + """Get instance handles of the data writers matching a reader. + + Raises + ------ + DDSException: When the number of matching writers < 0. + + Returns + ------- + List[int]: + A list of instance handles of the matching data writers. + """ + num_matched_pub = self._get_matched_publications(self._ref, None, 0) + if num_matched_pub < 0: + raise DDSException(num_matched_pub, f"Occurred when getting the number of matched publications of {repr(self)}") + if num_matched_pub == 0: + return [] + + matched_pub_list = (dds_c_t.instance_handle * int(num_matched_pub))() + matched_pub_list_pt = ct.cast(matched_pub_list, ct.POINTER(dds_c_t.instance_handle)) + + ret = self._get_matched_publications(self._ref, matched_pub_list_pt, num_matched_pub) + if ret >= 0: + return [matched_pub_list[i] for i in range(ret)] + + raise DDSException(ret, f"Occurred when getting the matched publications of {repr(self)}") + + matched_pub = property(get_matched_publications) + + def _make_constructors(self): + if self._constructor is not None: + return + + def endpoint_constructor(keyhash, participant_keyhash, instance_handle, topic_name, type_name, qos): + return DcpsEndpoint( + key=uuid.UUID(bytes=keyhash), + participant_key=uuid.UUID(bytes=participant_keyhash), + participant_instance_handle=instance_handle, + topic_name=topic_name, + type_name=type_name, + qos=qos + ) + + def cqos_to_qos(pointer): + p = ct.cast(pointer, dds_c_t.qos_p) + return _CQos.cqos_to_qos(p) + + self._constructor = endpoint_constructor + self._cqos = cqos_to_qos + + def get_matched_publication_data(self, handle) -> Optional['cyclonedds.builtin.DcpsEndpoint']: + """Get a description of a writer matched with the provided reader. + + Parameters + ---------- + handle: Int + The instance handle of a writer. + + Returns + ------- + DcpsEndpoint: + The sample of the DcpsEndpoint built-in topic. + """ + self._make_constructors() + return ddspy_get_matched_publication_data(self._ref, handle, self._constructor, self._cqos) + + def get_liveliness_changed_status(self): + """Get LIVELINESS_CHANGED status + + Raises + ------ + DDSException + + Returns + ------- + liveness_changed_status: + The class 'liveness_changed_status' value. + """ + status = dds_c_t.liveliness_changed_status() + ret = self._get_liveliness_changed_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the liveliness changed status for {repr(self)}") + + def get_requested_deadline_missed_status(self): + """Get REQUESTED DEALINE MISSED status + + Raises + ------ + DDSException + + Returns + ------- + requested_deadline_missed_status: + The class 'requested_deadline_missed_status' value. + """ + status = dds_c_t.requested_deadline_missed_status() + ret = self._get_requested_deadline_missed_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the requested deadline missed status for {repr(self)}") + + def get_requested_incompatible_qos_status(self): + """Get REQUESTED INCOMPATIBLE QOS status + + Raises + ------ + DDSException + + Returns + ------- + requested_incompatible_qos_status: + The class 'requested_incompatible_qos_status' value. + """ + status = dds_c_t.requested_incompatible_qos_status() + ret = self._get_requested_incompatible_qos_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the requested incompatible qos status for {repr(self)}") + + def get_sample_lost_status(self): + """Get SAMPLE LOST status + + Raises + ------ + DDSException + + Returns + ------- + sample_lost_status: + The class 'sample_lost_status' value. + """ + status = dds_c_t.sample_lost_status() + ret = self._get_sample_lost_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the sample lost status for {repr(self)}") + + def get_sample_rejected_status(self): + """Get SAMPLE REJECTED status + + Raises + ------ + DDSException + + Returns + ------- + sample_rejected_status: + The class 'sample_rejected_status' value. + """ + status = dds_c_t.sample_rejected_status() + ret = self._get_sample_rejected_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the sample rejected status for {repr(self)}") + + def get_subscription_matched_status(self): + """Get SUBSCRIPTION MATCHED status + + Raises + ------ + DDSException + + Returns + ------- + subscription_matched_status: + The class 'subscription_matched_status' value. + """ + status = dds_c_t.subscription_matched_status() + ret = self._get_subscription_matched_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the subscription matched status for {repr(self)}") + @c_call("dds_create_reader") def _create_reader(self, subscriber: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: @@ -365,5 +553,33 @@ def _create_reader(self, subscriber: dds_c_t.entity, topic: dds_c_t.entity, qos: def _wait_for_historical_data(self, reader: dds_c_t.entity, max_wait: dds_c_t.duration) -> dds_c_t.returnv: pass + @c_call("dds_get_matched_publications") + def _get_matched_publications(self, reader: dds_c_t.entity, handle: ct.POINTER(dds_c_t.instance_handle), + size: ct.c_size_t) -> dds_c_t.returnv: + pass + + @c_call("dds_get_liveliness_changed_status") + def _get_liveliness_changed_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.liveliness_changed_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_requested_deadline_missed_status") + def _get_requested_deadline_missed_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.requested_deadline_missed_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_requested_incompatible_qos_status") + def _get_requested_incompatible_qos_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.requested_incompatible_qos_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_sample_lost_status") + def _get_sample_lost_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.sample_lost_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_sample_rejected_status") + def _get_sample_rejected_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.sample_rejected_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_subscription_matched_status") + def _get_subscription_matched_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.subscription_matched_status)) -> dds_c_t.returnv: + pass __all__ = ["Subscriber", "DataReader"] diff --git a/cyclonedds/topic.py b/cyclonedds/topic.py index d19453d3..851f3834 100644 --- a/cyclonedds/topic.py +++ b/cyclonedds/topic.py @@ -11,9 +11,9 @@ """ import ctypes as ct -from typing import Any, AnyStr, Optional, TYPE_CHECKING +from typing import Any, AnyStr, Callable, Optional, TYPE_CHECKING -from .internal import c_call, dds_c_t +from .internal import DDS, c_call, c_callable, dds_c_t from .core import Entity, DDSException, Listener from .qos import _CQos, Qos, LimitedScopeQos, TopicQos @@ -24,6 +24,16 @@ import cyclonedds +class Sample(ct.Structure): + _fields_ = [ + ('usample', ct.c_void_p), + ('usample_size', ct.c_size_t) + ] + + +_filter_fn = c_callable(ct.c_bool, [ct.POINTER(Sample), ct.c_void_p]) + + class Topic(Entity): """Representing a Topic""" @@ -88,6 +98,59 @@ def get_type_name(self, max_size=256) -> str: typename = property(get_type_name, doc="Get topic type name") + def set_topic_filter(self, callable: Callable[['cyclonedds.topic', Sample], bool]): + """Sets a filter and filter argument on a topic. + + Parameters + ---------- + callable : filter + The filter function used to filter topic samples. + topic: Topic + The topic to set the filter function. + Sample: Sample + The sample that needs to be checked whether to be filtered. + + Returns + ------- + bool + Whether this sample is filtered. + """ + if callable is None: + return self._set_topic_filter(self._ref, None, None) + + def call(csample, args): + return callable(self, self.data_type.deserialize( + ct.string_at(csample[0].usample, csample[0].usample_size))) + + self._topic_filter = _filter_fn(call) + self._set_topic_filter(self._ref, self._topic_filter, None) + + def set_c_topic_filter(self, c_callable): + self._c_topic_filter = c_callable + self._set_topic_filter(self._ref, self._c_topic_filter, None) + + def get_inconsistent_topic_status(self): + """Get INCONSISTENT_TOPIC status + + Raises + ------ + DDSException: + If any error code is returned by the DDS API it is converted into an exception. + + Returns + ------- + inconsistent_topic_status: + The class 'inconsistent_topic_status` value. + """ + status = dds_c_t.inconsistent_topic_status() + ret = self._get_inconsistent_topic_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the inconsistent topic status for {repr(self)}") + + + + @c_call("dds_get_name") def _get_name(self, topic: dds_c_t.entity, name: ct.c_char_p, size: ct.c_size_t) -> dds_c_t.returnv: pass @@ -95,3 +158,11 @@ def _get_name(self, topic: dds_c_t.entity, name: ct.c_char_p, size: ct.c_size_t) @c_call("dds_get_type_name") def _get_type_name(self, topic: dds_c_t.entity, name: ct.c_char_p, size: ct.c_size_t) -> dds_c_t.returnv: pass + + @c_call("dds_set_topic_filter_and_arg") + def _set_topic_filter(self, topic: dds_c_t.entity, callback: _filter_fn, args: ct.c_void_p) -> dds_c_t.returnv: + pass + + @c_call("dds_get_inconsistent_topic_status") + def _get_inconsistent_topic_status(self, topic: dds_c_t.entity, status: ct.POINTER(dds_c_t.inconsistent_topic_status)) -> dds_c_t.returnv: + pass diff --git a/tests/test_reader.py b/tests/test_reader.py index 172df615..70576568 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1,10 +1,12 @@ import pytest +import random from cyclonedds.domain import Domain, DomainParticipant from cyclonedds.topic import Topic from cyclonedds.sub import Subscriber, DataReader from cyclonedds.pub import Publisher, DataWriter from cyclonedds.util import duration, isgoodentity +from cyclonedds.core import Qos, Policy from testtopics import Message @@ -145,3 +147,29 @@ def test_reader_keepalive_parents(): dw.write(msg) assert dr.read_next() == msg + + +def test_get_matched_publications(): + dp = DomainParticipant(0) + tp = Topic(dp, "Message", Message) + dr = DataReader(dp, tp) + + rand_dw = random.randint(0, 20) + dw = [] + for i in range(rand_dw): + dw.append(DataWriter(dp, tp)) + + matched = dr.get_matched_publications() + assert len(matched) == rand_dw + + +def test_get_matched_publication_data(): + dp = DomainParticipant(0) + tp = Topic(dp, "Message", Message) + dr = DataReader(dp, tp) + dw = DataWriter(dp, tp) + + matched_handles = dr.get_matched_publications() + for handle in matched_handles: + matched_data = dr.get_matched_publication_data(handle) + assert matched_data is not None diff --git a/tests/test_statistics.py b/tests/test_statistics.py new file mode 100644 index 00000000..ac455da9 --- /dev/null +++ b/tests/test_statistics.py @@ -0,0 +1,30 @@ +import pytest +import time + +from cyclonedds.core import Statistics +from cyclonedds.domain import DomainParticipant +from cyclonedds.topic import Topic +from cyclonedds.sub import Subscriber, DataReader +from cyclonedds.pub import Publisher, DataWriter + +from testtopics import Message + + +def test_create_statistics(): + dp = DomainParticipant(0) + tp = Topic(dp, "statistics", Message) + dw = DataWriter(dp, tp) + stat = Statistics(dw) + print(f"stat = {stat}") + assert stat.data + + +def test_refresh_statistics(): + dp = DomainParticipant(0) + tp = Topic(dp, "statistics", Message) + dw = DataWriter(dp, tp) + stat = Statistics(dw) + assert stat.data + time.sleep(0.5) + stat.refresh() + assert stat.time != 0 diff --git a/tests/test_topic.py b/tests/test_topic.py index b008cebc..79ed7107 100644 --- a/tests/test_topic.py +++ b/tests/test_topic.py @@ -4,8 +4,12 @@ from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.util import isgoodentity +from cyclonedds.pub import DataWriter +from cyclonedds.sub import DataReader +from cyclonedds.qos import Policy, Qos +from dataclasses import dataclass -from testtopics import Message +from testtopics import Message, MessageAlt def test_create_topic(): @@ -21,8 +25,32 @@ def test_get_name(): assert tp.name == tp.get_name() == 'MessageTopic' + def test_get_type_name(): dp = DomainParticipant(0) tp = Topic(dp, 'MessageTopic', Message) assert tp.typename == tp.get_type_name() == 'Message' + + +def filter(topic: Topic, sample: Message) -> bool: + if "Filter" in sample.message: + return False + return True + + +def test_topic_filter(): + dp = DomainParticipant(0) + tp = Topic(dp, "MessageTopic", Message, qos=Qos(Policy.History.KeepLast(5))) + tp.set_topic_filter(filter) + dw = DataWriter(dp, tp) + dr = DataReader(dp, tp) + + dw.write(Message("Nice Message")) + dw.write(Message("Test Filtering")) + dw.write(Message("Hello")) + dw.write(Message("lower case filter")) + + data = str(dr.read(5)) + assert "Filter" not in data + assert "filter" and "Hello" and "Nice" in data diff --git a/tests/test_writer.py b/tests/test_writer.py index 902d74aa..f6fe35fe 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1,10 +1,13 @@ import pytest +import random from cyclonedds.core import DDSException from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.pub import Publisher, DataWriter from cyclonedds.util import duration, isgoodentity +from cyclonedds.sub import DataReader +from cyclonedds.core import Qos, Policy from testtopics import Message, MessageKeyed @@ -86,3 +89,30 @@ def test_writer_lookup(): assert handle1 > 0 and handle2 > 0 and handle1 != handle2 assert handle1 == dw.lookup_instance(keymsg1) assert handle2 == dw.lookup_instance(keymsg2) + + +def test_get_matched_subscriptions(): + dp = DomainParticipant(0) + tp = Topic(dp, "Message", Message) + dw = DataWriter(dp, tp) + + rand_dr = random.randint(0, 20) + dr = [] + for i in range(rand_dr): + dr.append(DataReader(dp, tp)) + + matched = dw.get_matched_subscriptions() + assert len(matched) == rand_dr + + +def test_get_matched_subscription_data(): + dp = DomainParticipant(0) + tp = Topic(dp, "Message", Message) + dr = DataReader(dp, tp) + dw = DataWriter(dp, tp) + + matched_handles = dw.get_matched_subscriptions() + for handle in matched_handles: + matched_data = dw.get_matched_subscription_data(handle) + print(f"matched data = {matched_data.key}") + assert matched_data is not None From 998bdb34ab5868af9feab73aa89697020e7f783c Mon Sep 17 00:00:00 2001 From: yixianlv Date: Tue, 7 Dec 2021 11:24:12 +0100 Subject: [PATCH 2/3] Add docs for statistics --- cyclonedds/core.py | 6 ++++++ docs/source/cyclonedds.core.rst | 3 +++ 2 files changed, 9 insertions(+) diff --git a/cyclonedds/core.py b/cyclonedds/core.py index e1f34fd9..4a80b388 100644 --- a/cyclonedds/core.py +++ b/cyclonedds/core.py @@ -1573,7 +1573,13 @@ class Statistics(DDS): Number of key-value pairs. data: dict Data. + + Examples + -------- + >>> Statistics(datawriter) + >>> Statistics(datawriter).refresh() """ + entity: Entity opaque: int time: datetime diff --git a/docs/source/cyclonedds.core.rst b/docs/source/cyclonedds.core.rst index e5fc12a8..4813025a 100644 --- a/docs/source/cyclonedds.core.rst +++ b/docs/source/cyclonedds.core.rst @@ -58,3 +58,6 @@ core :undoc-members: :show-inheritance: +.. autoclass:: cyclonedds.core.Statistics + :members: + \ No newline at end of file From 2fe8509bb34f7bd9c40404038ee26ccbf694a1b3 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 9 Dec 2024 14:07:57 +0100 Subject: [PATCH 3/3] Make get_matched tests use (likely) unique topics Signed-off-by: Erik Boasson --- tests/test_reader.py | 4 ++-- tests/test_writer.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_reader.py b/tests/test_reader.py index 68addac9..f3b40f42 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -187,7 +187,7 @@ def test_reader_wrong_usage_errors(): def test_get_matched_publications(): dp = DomainParticipant(0) - tp = Topic(dp, "Message", Message) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) dr = DataReader(dp, tp) rand_dw = random.randint(0, 20) @@ -201,7 +201,7 @@ def test_get_matched_publications(): def test_get_matched_publication_data(): dp = DomainParticipant(0) - tp = Topic(dp, "Message", Message) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) dr = DataReader(dp, tp) dw = DataWriter(dp, tp) diff --git a/tests/test_writer.py b/tests/test_writer.py index 00254b40..0ee2ae6c 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -93,7 +93,7 @@ def test_writer_lookup(): def test_get_matched_subscriptions(): dp = DomainParticipant(0) - tp = Topic(dp, "Message", Message) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) dw = DataWriter(dp, tp) rand_dr = random.randint(0, 20) @@ -107,7 +107,7 @@ def test_get_matched_subscriptions(): def test_get_matched_subscription_data(): dp = DomainParticipant(0) - tp = Topic(dp, "Message", Message) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) dr = DataReader(dp, tp) dw = DataWriter(dp, tp)