Skip to content

Commit

Permalink
Merge pull request #281 from eboasson/matched-pub-sub-etc
Browse files Browse the repository at this point in the history
Add set_topic_filter & matched_pub, sub & status getters & statistics
  • Loading branch information
eboasson authored Jan 7, 2025
2 parents 53004e7 + 2fe8509 commit f8b2b1d
Show file tree
Hide file tree
Showing 13 changed files with 941 additions and 196 deletions.
210 changes: 144 additions & 66 deletions clayer/pysertype.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ static inline const ddspy_serdata_t *cserdata (const ddsi_serdata_t *this)

static void typeid_ser (dds_ostream_t *os, const dds_typeid_t *type_id)
{
dds_stream_write (os, &cdrstream_allocator, (char *)type_id, DDS_XTypes_TypeIdentifier_desc.m_ops);
if (!dds_stream_write (os, &cdrstream_allocator, (char *)type_id, DDS_XTypes_TypeIdentifier_desc.m_ops))
abort (); // internally generated data, so should never fail
}

#ifdef DDS_HAS_TYPE_DISCOVERY
Expand All @@ -102,7 +103,8 @@ static void typeid_deser (dds_istream_t *is, dds_typeid_t **type_id)

static void typeobj_ser (dds_ostream_t *os, const dds_typeobj_t *type_obj)
{
dds_stream_write (os, &cdrstream_allocator, (char *)type_obj, DDS_XTypes_TypeObject_desc.m_ops);
if (!dds_stream_write (os, &cdrstream_allocator, (char *)type_obj, DDS_XTypes_TypeObject_desc.m_ops))
abort (); // internally generated data, so should never fail
}

#endif /* DDS_HAS_TYPE_DISCOVERY */
Expand Down Expand Up @@ -1413,6 +1415,88 @@ static PyObject *ddspy_take_participant (PyObject *self, PyObject *args)
return ddspy_readtake_participant (self, args, dds_take);
}

static PyObject *ddspy_construct_endpoint (struct dds_builtintopic_endpoint *endpoint, PyObject *sampleinfo, PyObject *endpoint_constructor, PyObject *cqos_to_qos)
{
PyObject *type_id_bytes = NULL;

dds_ostream_t type_obj_stream;
const dds_typeinfo_t *type_info = NULL;

// Fetch the type id
dds_builtintopic_get_endpoint_type_info (endpoint, &type_info);

// convert to cdr bytes
if (type_info != NULL)
{
dds_ostream_init (&type_obj_stream, &cdrstream_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2);
const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid (type_info);
typeid_ser (&type_obj_stream, type_id);
type_id_bytes = Py_BuildValue ("y#", type_obj_stream.m_buffer, type_obj_stream.m_index);
dds_ostream_fini (&type_obj_stream, &cdrstream_allocator);
}
else
{
type_id_bytes = Py_None;
Py_INCREF (type_id_bytes);
}

PyObject *qos_p, *qos;
if (endpoint->qos != NULL)
{
qos_p = PyLong_FromVoidPtr (endpoint->qos);
if (PyErr_Occurred ())
{
Py_DECREF (type_id_bytes);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "VoidPtr errored.");
return NULL;
}
qos = PyObject_CallFunction (cqos_to_qos, "O", qos_p);
if (PyErr_Occurred ())
{
Py_DECREF (type_id_bytes);
Py_DECREF (qos_p);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc cqos errored.");
return NULL;
}
}
else
{
Py_INCREF (Py_None);
Py_INCREF (Py_None);
qos_p = Py_None;
qos = Py_None;
}

PyObject *item = PyObject_CallFunction (
endpoint_constructor, "y#y#Ks#s#OOO",
endpoint->key.v, (Py_ssize_t) 16,
endpoint->participant_key.v, (Py_ssize_t) 16,
endpoint->participant_instance_handle,
endpoint->topic_name,
endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name),
endpoint->type_name,
endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name),
qos,
sampleinfo,
type_id_bytes);
if (PyErr_Occurred ())
{
Py_DECREF (type_id_bytes);
Py_DECREF (qos_p);
Py_DECREF (qos);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc endpoint constructor errored.");
return NULL;
}

Py_DECREF (type_id_bytes);
Py_DECREF (qos_p);
Py_DECREF (qos);
return item;
}

static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_return_t (*readtake) (dds_entity_t, void **, dds_sample_info_t *, size_t, uint32_t))
{
uint32_t Nu32;
Expand Down Expand Up @@ -1442,29 +1526,6 @@ static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_re
PyObject *list = PyList_New (sts);
for (uint32_t i = 0; i < (uint32_t)sts; ++i)
{
PyObject *type_id_bytes = NULL;

dds_ostream_t type_obj_stream;
const dds_typeinfo_t *type_info = NULL;

// Fetch the type id
dds_builtintopic_get_endpoint_type_info (rcontainer[i], &type_info);

// convert to cdr bytes
if (type_info != NULL)
{
dds_ostream_init (&type_obj_stream, &cdrstream_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2);
const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid (type_info);
typeid_ser (&type_obj_stream, type_id);
type_id_bytes = Py_BuildValue ("y#", type_obj_stream.m_buffer, type_obj_stream.m_index);
dds_ostream_fini (&type_obj_stream, &cdrstream_allocator);
}
else
{
type_id_bytes = Py_None;
Py_INCREF (type_id_bytes);
}

PyObject *sampleinfo = get_sampleinfo_pyobject (&info[i]);
if (PyErr_Occurred ())
{
Expand All @@ -1473,54 +1534,17 @@ static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_re
return NULL;
}

PyObject *qos_p, *qos;
if (rcontainer[i]->qos != NULL)
{
qos_p = PyLong_FromVoidPtr (rcontainer[i]->qos);
if (PyErr_Occurred ())
{
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "VoidPtr errored.");
return NULL;
}
qos = PyObject_CallFunction (cqos_to_qos, "O", qos_p);
if (PyErr_Occurred ())
{
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc cqos errored.");
return NULL;
}
}
else
{
Py_INCREF (Py_None);
Py_INCREF (Py_None);
qos_p = Py_None;
qos = Py_None;
}

PyObject *item = PyObject_CallFunction (
endpoint_constructor, "y#y#Ks#s#OOO",
rcontainer[i]->key.v, (Py_ssize_t) 16,
rcontainer[i]->participant_key.v, (Py_ssize_t) 16,
rcontainer[i]->participant_instance_handle,
rcontainer[i]->topic_name,
rcontainer[i]->topic_name == NULL ? 0 : strlen(rcontainer[i]->topic_name),
rcontainer[i]->type_name,
rcontainer[i]->type_name == NULL ? 0 : strlen(rcontainer[i]->type_name),
qos,
sampleinfo,
type_id_bytes);
PyObject *item = ddspy_construct_endpoint (rcontainer[i], sampleinfo, endpoint_constructor, cqos_to_qos);
if (PyErr_Occurred ())
{
Py_DECREF (sampleinfo);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc endpoint constructor errored.");
return NULL;
}
PyList_SetItem (list, i, item); // steals ref
Py_DECREF (sampleinfo);
Py_DECREF (qos_p);
Py_DECREF (qos);

PyList_SetItem (list, i, item); // steals ref
}

dds_return_loan (reader, (void **)rcontainer, sts);
Expand Down Expand Up @@ -1719,6 +1743,58 @@ static PyObject *ddspy_get_typeobj (PyObject *self, PyObject *args)

#endif

static PyObject *
ddspy_get_matched_subscription_data(PyObject *self, PyObject *args)
{
dds_entity_t writer;
dds_instance_handle_t handle;
dds_builtintopic_endpoint_t* endpoint = NULL;

PyObject* endpoint_constructor;
PyObject* cqos_to_qos;
(void)self;

if (!PyArg_ParseTuple(args, "iKOO", &writer, &handle, &endpoint_constructor, &cqos_to_qos))
return NULL;

endpoint = dds_get_matched_subscription_data(writer, handle);
if (endpoint == NULL) {
Py_INCREF(Py_None);
return Py_None;
}

PyObject *item = ddspy_construct_endpoint (endpoint, Py_None, endpoint_constructor, cqos_to_qos);
dds_builtintopic_free_endpoint(endpoint);
return item;
}


static PyObject *
ddspy_get_matched_publication_data(PyObject *self, PyObject *args)
{
dds_entity_t reader;
dds_instance_handle_t handle;
dds_builtintopic_endpoint_t* endpoint = NULL;

PyObject* endpoint_constructor;
PyObject* cqos_to_qos;
(void)self;

if (!PyArg_ParseTuple(args, "iKOO", &reader, &handle, &endpoint_constructor, &cqos_to_qos))
return NULL;

endpoint = dds_get_matched_publication_data(reader, handle);
if (endpoint == NULL) {
Py_INCREF(Py_None);
return Py_None;
}

PyObject *item = ddspy_construct_endpoint (endpoint, Py_None, endpoint_constructor, cqos_to_qos);
dds_builtintopic_free_endpoint(endpoint);
return item;
}


char ddspy_docs[] = "DDSPY module";

PyMethodDef ddspy_funcs[] = {
Expand Down Expand Up @@ -1753,6 +1829,8 @@ PyMethodDef ddspy_funcs[] = {
#ifdef DDS_HAS_TYPE_DISCOVERY
{ "ddspy_get_typeobj", (PyCFunction)ddspy_get_typeobj, METH_VARARGS, ddspy_docs },
#endif
{ "ddspy_get_matched_subscription_data", (PyCFunction)ddspy_get_matched_subscription_data, METH_VARARGS, ddspy_docs },
{ "ddspy_get_matched_publication_data", (PyCFunction)ddspy_get_matched_publication_data, METH_VARARGS, ddspy_docs },
{ NULL }
};

Expand Down
Loading

0 comments on commit f8b2b1d

Please sign in to comment.