Skip to content

Commit

Permalink
Add BitTiming/BitTimingFd support to KvaserBus (#1510)
Browse files Browse the repository at this point in the history
* add BitTiming parameter to KvaserBus

* implement tests for bittiming classes with kvaser

* set default number of samples to 1

* undo last change
  • Loading branch information
zariiii9003 authored Oct 16, 2023
1 parent 7b353ca commit 2d60900
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 20 deletions.
86 changes: 66 additions & 20 deletions can/interfaces/kvaser/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import logging
import sys
import time
from typing import Optional, Union

from can import BusABC, CanProtocol, Message
from can.util import time_perfcounter_correlation
from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message
from can.exceptions import CanError, CanInitializationError, CanOperationError
from can.typechecking import CanFilters
from can.util import check_or_adjust_timing_clock, time_perfcounter_correlation

from ...exceptions import CanError, CanInitializationError, CanOperationError
from . import constants as canstat
from . import structures

Expand Down Expand Up @@ -199,6 +201,17 @@ def __check_bus_handle_validity(handle, function, arguments):
errcheck=__check_status_initialization,
)

canSetBusParamsC200 = __get_canlib_function(
"canSetBusParamsC200",
argtypes=[
c_canHandle,
ctypes.c_byte,
ctypes.c_byte,
],
restype=canstat.c_canStatus,
errcheck=__check_status_initialization,
)

canSetBusParamsFd = __get_canlib_function(
"canSetBusParamsFd",
argtypes=[
Expand Down Expand Up @@ -360,7 +373,13 @@ class KvaserBus(BusABC):
The CAN Bus implemented for the Kvaser interface.
"""

def __init__(self, channel, can_filters=None, **kwargs):
def __init__(
self,
channel: int,
can_filters: Optional[CanFilters] = None,
timing: Optional[Union[BitTiming, BitTimingFd]] = None,
**kwargs,
):
"""
:param int channel:
The Channel id to create this bus with.
Expand All @@ -370,6 +389,12 @@ def __init__(self, channel, can_filters=None, **kwargs):
Backend Configuration
:param timing:
An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
to specify the bit timing parameters for the Kvaser interface. If provided, it
takes precedence over the all other timing-related parameters.
Note that the `f_clock` property of the `timing` instance must be 16_000_000 (16MHz)
for standard CAN or 80_000_000 (80MHz) for CAN FD.
:param int bitrate:
Bitrate of channel in bit/s
:param bool accept_virtual:
Expand Down Expand Up @@ -427,7 +452,7 @@ def __init__(self, channel, can_filters=None, **kwargs):
exclusive = kwargs.get("exclusive", False)
override_exclusive = kwargs.get("override_exclusive", False)
accept_virtual = kwargs.get("accept_virtual", True)
fd = kwargs.get("fd", False)
fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False)
data_bitrate = kwargs.get("data_bitrate", None)

try:
Expand Down Expand Up @@ -468,22 +493,43 @@ def __init__(self, channel, can_filters=None, **kwargs):
ctypes.byref(ctypes.c_long(TIMESTAMP_RESOLUTION)),
4,
)

if fd:
if "tseg1" not in kwargs and bitrate in BITRATE_FD:
# Use predefined bitrate for arbitration
bitrate = BITRATE_FD[bitrate]
if data_bitrate in BITRATE_FD:
# Use predefined bitrate for data
data_bitrate = BITRATE_FD[data_bitrate]
elif not data_bitrate:
# Use same bitrate for arbitration and data phase
data_bitrate = bitrate
canSetBusParamsFd(self._read_handle, data_bitrate, tseg1, tseg2, sjw)
if isinstance(timing, BitTimingFd):
timing = check_or_adjust_timing_clock(timing, [80_000_000])
canSetBusParams(
self._read_handle,
timing.nom_bitrate,
timing.nom_tseg1,
timing.nom_tseg2,
timing.nom_sjw,
1,
0,
)
canSetBusParamsFd(
self._read_handle,
timing.data_bitrate,
timing.data_tseg1,
timing.data_tseg2,
timing.data_sjw,
)
elif isinstance(timing, BitTiming):
timing = check_or_adjust_timing_clock(timing, [16_000_000])
canSetBusParamsC200(self._read_handle, timing.btr0, timing.btr1)
else:
if "tseg1" not in kwargs and bitrate in BITRATE_OBJS:
bitrate = BITRATE_OBJS[bitrate]
canSetBusParams(self._read_handle, bitrate, tseg1, tseg2, sjw, no_samp, 0)
if fd:
if "tseg1" not in kwargs and bitrate in BITRATE_FD:
# Use predefined bitrate for arbitration
bitrate = BITRATE_FD[bitrate]
if data_bitrate in BITRATE_FD:
# Use predefined bitrate for data
data_bitrate = BITRATE_FD[data_bitrate]
elif not data_bitrate:
# Use same bitrate for arbitration and data phase
data_bitrate = bitrate
canSetBusParamsFd(self._read_handle, data_bitrate, tseg1, tseg2, sjw)
else:
if "tseg1" not in kwargs and bitrate in BITRATE_OBJS:
bitrate = BITRATE_OBJS[bitrate]
canSetBusParams(self._read_handle, bitrate, tseg1, tseg2, sjw, no_samp, 0)

# By default, use local echo if single handle is used (see #160)
local_echo = single_handle or receive_own_messages
Expand Down
32 changes: 32 additions & 0 deletions test/test_kvaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def setUp(self):
canlib.canIoCtl = Mock(return_value=0)
canlib.canIoCtlInit = Mock(return_value=0)
canlib.kvReadTimer = Mock()
canlib.canSetBusParamsC200 = Mock()
canlib.canSetBusParams = Mock()
canlib.canSetBusParamsFd = Mock()
canlib.canBusOn = Mock()
Expand Down Expand Up @@ -179,6 +180,37 @@ def test_canfd_default_data_bitrate(self):
0, constants.canFD_BITRATE_500K_80P, 0, 0, 0
)

def test_can_timing(self):
canlib.canSetBusParams.reset_mock()
canlib.canSetBusParamsFd.reset_mock()
timing = can.BitTiming.from_bitrate_and_segments(
f_clock=16_000_000,
bitrate=125_000,
tseg1=13,
tseg2=2,
sjw=1,
)
can.Bus(channel=0, interface="kvaser", timing=timing)
canlib.canSetBusParamsC200.assert_called_once_with(0, timing.btr0, timing.btr1)

def test_canfd_timing(self):
canlib.canSetBusParams.reset_mock()
canlib.canSetBusParamsFd.reset_mock()
timing = can.BitTimingFd.from_bitrate_and_segments(
f_clock=80_000_000,
nom_bitrate=500_000,
nom_tseg1=68,
nom_tseg2=11,
nom_sjw=10,
data_bitrate=2_000_000,
data_tseg1=10,
data_tseg2=9,
data_sjw=8,
)
can.Bus(channel=0, interface="kvaser", timing=timing)
canlib.canSetBusParams.assert_called_once_with(0, 500_000, 68, 11, 10, 1, 0)
canlib.canSetBusParamsFd.assert_called_once_with(0, 2_000_000, 10, 9, 8)

def test_canfd_nondefault_data_bitrate(self):
canlib.canSetBusParams.reset_mock()
canlib.canSetBusParamsFd.reset_mock()
Expand Down

0 comments on commit 2d60900

Please sign in to comment.