diff --git a/can/interfaces/kvaser/canlib.py b/can/interfaces/kvaser/canlib.py index 38949137d..0983e28dc 100644 --- a/can/interfaces/kvaser/canlib.py +++ b/can/interfaces/kvaser/canlib.py @@ -10,11 +10,13 @@ import logging import sys import time +from typing import Optional, Union -from can import BusABC, CanProtocol, Message -from can.util import time_perfcounter_correlation +from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message +from can.exceptions import CanError, CanInitializationError, CanOperationError +from can.typechecking import CanFilters +from can.util import check_or_adjust_timing_clock, time_perfcounter_correlation -from ...exceptions import CanError, CanInitializationError, CanOperationError from . import constants as canstat from . import structures @@ -199,6 +201,17 @@ def __check_bus_handle_validity(handle, function, arguments): errcheck=__check_status_initialization, ) + canSetBusParamsC200 = __get_canlib_function( + "canSetBusParamsC200", + argtypes=[ + c_canHandle, + ctypes.c_byte, + ctypes.c_byte, + ], + restype=canstat.c_canStatus, + errcheck=__check_status_initialization, + ) + canSetBusParamsFd = __get_canlib_function( "canSetBusParamsFd", argtypes=[ @@ -360,7 +373,13 @@ class KvaserBus(BusABC): The CAN Bus implemented for the Kvaser interface. """ - def __init__(self, channel, can_filters=None, **kwargs): + def __init__( + self, + channel: int, + can_filters: Optional[CanFilters] = None, + timing: Optional[Union[BitTiming, BitTimingFd]] = None, + **kwargs, + ): """ :param int channel: The Channel id to create this bus with. @@ -370,6 +389,12 @@ def __init__(self, channel, can_filters=None, **kwargs): Backend Configuration + :param timing: + An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd` + to specify the bit timing parameters for the Kvaser interface. If provided, it + takes precedence over the all other timing-related parameters. + Note that the `f_clock` property of the `timing` instance must be 16_000_000 (16MHz) + for standard CAN or 80_000_000 (80MHz) for CAN FD. :param int bitrate: Bitrate of channel in bit/s :param bool accept_virtual: @@ -427,7 +452,7 @@ def __init__(self, channel, can_filters=None, **kwargs): exclusive = kwargs.get("exclusive", False) override_exclusive = kwargs.get("override_exclusive", False) accept_virtual = kwargs.get("accept_virtual", True) - fd = kwargs.get("fd", False) + fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False) data_bitrate = kwargs.get("data_bitrate", None) try: @@ -468,22 +493,43 @@ def __init__(self, channel, can_filters=None, **kwargs): ctypes.byref(ctypes.c_long(TIMESTAMP_RESOLUTION)), 4, ) - - if fd: - if "tseg1" not in kwargs and bitrate in BITRATE_FD: - # Use predefined bitrate for arbitration - bitrate = BITRATE_FD[bitrate] - if data_bitrate in BITRATE_FD: - # Use predefined bitrate for data - data_bitrate = BITRATE_FD[data_bitrate] - elif not data_bitrate: - # Use same bitrate for arbitration and data phase - data_bitrate = bitrate - canSetBusParamsFd(self._read_handle, data_bitrate, tseg1, tseg2, sjw) + if isinstance(timing, BitTimingFd): + timing = check_or_adjust_timing_clock(timing, [80_000_000]) + canSetBusParams( + self._read_handle, + timing.nom_bitrate, + timing.nom_tseg1, + timing.nom_tseg2, + timing.nom_sjw, + 1, + 0, + ) + canSetBusParamsFd( + self._read_handle, + timing.data_bitrate, + timing.data_tseg1, + timing.data_tseg2, + timing.data_sjw, + ) + elif isinstance(timing, BitTiming): + timing = check_or_adjust_timing_clock(timing, [16_000_000]) + canSetBusParamsC200(self._read_handle, timing.btr0, timing.btr1) else: - if "tseg1" not in kwargs and bitrate in BITRATE_OBJS: - bitrate = BITRATE_OBJS[bitrate] - canSetBusParams(self._read_handle, bitrate, tseg1, tseg2, sjw, no_samp, 0) + if fd: + if "tseg1" not in kwargs and bitrate in BITRATE_FD: + # Use predefined bitrate for arbitration + bitrate = BITRATE_FD[bitrate] + if data_bitrate in BITRATE_FD: + # Use predefined bitrate for data + data_bitrate = BITRATE_FD[data_bitrate] + elif not data_bitrate: + # Use same bitrate for arbitration and data phase + data_bitrate = bitrate + canSetBusParamsFd(self._read_handle, data_bitrate, tseg1, tseg2, sjw) + else: + if "tseg1" not in kwargs and bitrate in BITRATE_OBJS: + bitrate = BITRATE_OBJS[bitrate] + canSetBusParams(self._read_handle, bitrate, tseg1, tseg2, sjw, no_samp, 0) # By default, use local echo if single handle is used (see #160) local_echo = single_handle or receive_own_messages diff --git a/test/test_kvaser.py b/test/test_kvaser.py index 1254f2fc7..abaf7b38f 100644 --- a/test/test_kvaser.py +++ b/test/test_kvaser.py @@ -21,6 +21,7 @@ def setUp(self): canlib.canIoCtl = Mock(return_value=0) canlib.canIoCtlInit = Mock(return_value=0) canlib.kvReadTimer = Mock() + canlib.canSetBusParamsC200 = Mock() canlib.canSetBusParams = Mock() canlib.canSetBusParamsFd = Mock() canlib.canBusOn = Mock() @@ -179,6 +180,37 @@ def test_canfd_default_data_bitrate(self): 0, constants.canFD_BITRATE_500K_80P, 0, 0, 0 ) + def test_can_timing(self): + canlib.canSetBusParams.reset_mock() + canlib.canSetBusParamsFd.reset_mock() + timing = can.BitTiming.from_bitrate_and_segments( + f_clock=16_000_000, + bitrate=125_000, + tseg1=13, + tseg2=2, + sjw=1, + ) + can.Bus(channel=0, interface="kvaser", timing=timing) + canlib.canSetBusParamsC200.assert_called_once_with(0, timing.btr0, timing.btr1) + + def test_canfd_timing(self): + canlib.canSetBusParams.reset_mock() + canlib.canSetBusParamsFd.reset_mock() + timing = can.BitTimingFd.from_bitrate_and_segments( + f_clock=80_000_000, + nom_bitrate=500_000, + nom_tseg1=68, + nom_tseg2=11, + nom_sjw=10, + data_bitrate=2_000_000, + data_tseg1=10, + data_tseg2=9, + data_sjw=8, + ) + can.Bus(channel=0, interface="kvaser", timing=timing) + canlib.canSetBusParams.assert_called_once_with(0, 500_000, 68, 11, 10, 1, 0) + canlib.canSetBusParamsFd.assert_called_once_with(0, 2_000_000, 10, 9, 8) + def test_canfd_nondefault_data_bitrate(self): canlib.canSetBusParams.reset_mock() canlib.canSetBusParamsFd.reset_mock()