Skip to content

Commit

Permalink
Only serialize key when C function asks for a key
Browse files Browse the repository at this point in the history
For some operations, the DDS API requires a key instead of a sample ("dispose" is one of
those), and does this by invoking the "ddsi_serdata_from_sample" operation with kind set
to "SDK_KEY".

The Python binding is a bit unusual in that it serializes the data before calling into C
because that allows using Python code to serialize the data without having to call back
into Python code from C or being forced to interpret Python objects from C.  A mismatch is
possible, and this causes failures if the serialized form of the key is not a prefix of
the serialized form of the full sample.  For example:

    @DataClass
    class S(idl.IdlStruct, typename="S"):
        v: int
        k: int
        key("k")

    ih0 = dw.register_instance(S(0, 1))

Will misinterpret the bytes and treat the key value as 0 instead of as 1.

It can be dealt with in two ways:

* One is to continue always serializing a full sample in Python, fixing it up in the
  serdata implementation if a serialized key is requested.  This is small change in the
  Python "clayer";

* The other is to serialize a key when a key is expected. This is somewhat fragile, but
  the list of operations is small and the API rather stable.

This commit implements the second option, and mostly because it means the serializer will
not touch the non-key fields.

Signed-off-by: Erik Boasson <[email protected]>
  • Loading branch information
eboasson committed Dec 9, 2024
1 parent 2b5231f commit 9d3fde8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 56 deletions.
5 changes: 4 additions & 1 deletion cyclonedds/idl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from .types import ValidUnionHolder
from ._main import IdlMeta, IdlUnionMeta, IdlBitmaskMeta, IdlEnumMeta
from ._support import Buffer, Endianness
from ._support import Buffer, Endianness, SerializeKind


_TIS = TypeVar('_TIS', bound='IdlStruct')
Expand All @@ -28,6 +28,9 @@ class IdlStruct(metaclass=IdlMeta):
def serialize(self, buffer: Optional[Buffer] = None, endianness: Optional[Endianness] = None, use_version_2: Optional[bool] = None) -> bytes:
return self.__idl__.serialize(self, buffer=buffer, endianness=endianness, use_version_2=use_version_2)

def serialize_key(self, endianness: Optional[Endianness] = None, use_version_2: Optional[bool] = None) -> bytes:
return self.__idl__.serialize(self, endianness=endianness, use_version_2=use_version_2, serialize_kind=SerializeKind.KeyDefinitionOrder)

@classmethod
def deserialize(cls: Type[_TIS], data: bytes, has_header: bool = True, use_version_2: Optional[bool] = None) -> _TIS:
return cls.__idl__.deserialize(data, has_header=has_header, use_version_2=use_version_2)
Expand Down
69 changes: 19 additions & 50 deletions cyclonedds/idl/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def populate(self):
# impossible for another thread to observe a partially populated self
self._populated = True

def serialize(self, object, use_version_2: bool = None, buffer=None, endianness=None) -> bytes:
def serialize(self, object, use_version_2: bool = None, buffer=None, endianness : Endianness = None, serialize_kind: SerializeKind = SerializeKind.DataSample, prepend_header: bool = True) -> bytes:
if not self._populated:
self.populate()

Expand All @@ -148,27 +148,27 @@ def serialize(self, object, use_version_2: bool = None, buffer=None, endianness=
ibuffer.set_endianness(endianness or Endianness.native())
ibuffer._align_max = 4 if use_version_2 else 8

if ibuffer.endianness == Endianness.Big:
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 0 | (self.xcdrv2_head if use_version_2 else 0))
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 0)
else:
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 1 | (self.xcdrv2_head if use_version_2 else 0))
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 0)

ibuffer.set_align_offset(4)
if prepend_header:
if ibuffer.endianness == Endianness.Big:
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 0 | (self.xcdrv2_head if use_version_2 else 0))
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 0)
else:
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 1 | (self.xcdrv2_head if use_version_2 else 0))
ibuffer.write('b', 1, 0)
ibuffer.write('b', 1, 0)
ibuffer.set_align_offset(4)

if use_version_2:
self.v2_machine.serialize(ibuffer, object)
self.v2_machine.serialize(ibuffer, object, serialize_kind)
else:
self.v0_machine.serialize(ibuffer, object)
self.v0_machine.serialize(ibuffer, object, serialize_kind)

return ibuffer.asbytes()

def _deserialize(self, data, has_header=True, use_version_2: bool = None, deserialize_kind: DeserializeKind = None) -> object:
def deserialize(self, data, has_header=True, use_version_2: bool = None, deserialize_kind: DeserializeKind = DeserializeKind.DataSample) -> object:
if not self._populated:
self.populate()

Expand Down Expand Up @@ -209,45 +209,14 @@ def _deserialize(self, data, has_header=True, use_version_2: bool = None, deseri

return machine.deserialize(buffer, deserialize_kind=deserialize_kind)

def deserialize(self, data, has_header=True, use_version_2: bool = None) -> object:
return self._deserialize(data, has_header, use_version_2, DeserializeKind.DataSample)

def deserialize_key(self, data, has_header=True, use_version_2: bool = None) -> object:
return self._deserialize(data, has_header, use_version_2, DeserializeKind.KeySample)

def _serialize_key(self, object, use_version_2: bool = None, endianness: Endianness = Endianness.Little, serialize_kind: SerializeKind = SerializeKind.KeyNormalized) -> bytes:
if not self._populated:
self.populate()

if self.version_support.SupportsBasic & self.version_support:
use_version_2 = False if use_version_2 is None else use_version_2
else:
# version 0 not supported
if use_version_2 is not None and not use_version_2:
raise Exception("Cannot encode this type with version 0, contains xcdrv2-type structures")
use_version_2 = True

if self.keyless:
return b''

self.buffer.seek(0)
self.buffer.zero_out()
self.buffer.set_align_offset(0)
self.buffer.set_endianness(endianness)
self.buffer._align_max = 4 if use_version_2 else 8

if use_version_2:
self.v2_machine.serialize(self.buffer, object, serialize_kind)
else:
self.v0_machine.serialize(self.buffer, object, serialize_kind)

return self.buffer.asbytes()
return self.deserialize(data, has_header, use_version_2, DeserializeKind.KeySample)

def serialize_key(self, object, use_version_2: bool = None, endianness: Endianness = Endianness.Little) -> bytes:
return self._serialize_key(object, use_version_2, endianness, SerializeKind.KeyDefinitionOrder)
return self.serialize(object, use_version_2, None, endianness, SerializeKind.KeyDefinitionOrder, False)

def serialize_key_normalized(self, object, use_version_2: bool = None, endianness: Endianness = Endianness.Little) -> bytes:
return self._serialize_key(object, use_version_2, endianness, SerializeKind.KeyNormalized)
return self.serialize(object, use_version_2, None, endianness, SerializeKind.KeyNormalized, False)

def cdr_key_machine(self, skip: bool = False, use_version_2: bool = None):
if self.re_entrancy_protection:
Expand Down
9 changes: 5 additions & 4 deletions cyclonedds/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def dispose(self, sample: _T, timestamp: Optional[int] = None):
timestamp
The sample's source_timestamp (in nanoseconds since the UNIX Epoch)
"""
ser = sample.serialize(use_version_2=self._use_version_2)
ser = sample.serialize_key(use_version_2=self._use_version_2)
ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0')

if timestamp is not None:
Expand Down Expand Up @@ -258,8 +258,9 @@ def dispose_instance_handle(self, handle: int, timestamp: Optional[int] = None):
raise DDSException(ret, f"Occurred while disposing in {repr(self)}")

def register_instance(self, sample: _T) -> int:
ser = sample.serialize(use_version_2=self._use_version_2)
ser = sample.serialize_key(use_version_2=self._use_version_2)
ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0')
print(f"ser={ser}")

ret = ddspy_register_instance(self._ref, ser)
if ret < 0:
Expand All @@ -275,7 +276,7 @@ def unregister_instance(self, sample: _T, timestamp: Optional[int] = None):
timestamp
The timestamp used at registration (in nanoseconds since the UNIX Epoch)
"""
ser = sample.serialize(use_version_2=self._use_version_2)
ser = sample.serialize_key(use_version_2=self._use_version_2)
ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0')

if timestamp is not None:
Expand Down Expand Up @@ -326,7 +327,7 @@ def lookup_instance(self, sample: _T) -> Optional[int]:
"""
This operation takes a sample and returns an instance handle to be used for subsequent operations.
"""
ser = sample.serialize(use_version_2=self._use_version_2)
ser = sample.serialize_key(use_version_2=self._use_version_2)
ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0')

ret = ddspy_lookup_instance(self._ref, ser)
Expand Down
2 changes: 1 addition & 1 deletion cyclonedds/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def wait_for_historical_data(self, timeout: int) -> bool:
raise DDSException(ret, f"Occured while waiting for historical data in {repr(self)}")

def lookup_instance(self, sample: _T) -> Optional[int]:
ret = ddspy_lookup_instance(self._ref, sample.serialize())
ret = ddspy_lookup_instance(self._ref, sample.serialize_key())
if ret < 0:
raise DDSException(ret, f"Occurred while lookup up instance from {repr(self)}")
if ret == 0:
Expand Down

0 comments on commit 9d3fde8

Please sign in to comment.