Skip to content

Commit

Permalink
Merge pull request #1023 from hardbyte/refactor-util-py
Browse files Browse the repository at this point in the history
Slightly refactor util.py
  • Loading branch information
mergify[bot] authored Apr 21, 2021
2 parents 53b9a5a + f4eabd7 commit 4048765
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 50 deletions.
2 changes: 1 addition & 1 deletion can/typechecking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
StringPathLike = typing.Union[str, "os.PathLike[str]"]
AcceptedIOType = typing.Optional[typing.Union[FileLike, StringPathLike]]

BusConfig = typing.NewType("BusConfig", dict)
BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any])

AutoDetectedConfig = mypy_extensions.TypedDict(
"AutoDetectedConfig", {"interface": str, "channel": Channel}
Expand Down
89 changes: 43 additions & 46 deletions can/util.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""
Utilities and configuration file parsing.
"""

import functools
import warnings
from typing import Dict, Optional, Union
from typing import Any, Callable, cast, Dict, Iterable, Tuple, Optional, Union
from time import time, perf_counter, get_clock_info

from can import typechecking

import json
import os
import os.path
Expand All @@ -18,6 +16,7 @@

import can
from can.interfaces import VALID_INTERFACES
from can import typechecking

log = logging.getLogger("can.util")

Expand Down Expand Up @@ -89,9 +88,8 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]:
"bitrate": "CAN_BITRATE",
}

context_suffix = "_{}".format(context) if context else ""

can_config_key = "CAN_CONFIG" + context_suffix
context_suffix = f"_{context}" if context else ""
can_config_key = f"CAN_CONFIG{context_suffix}"
config: Dict[str, str] = json.loads(os.environ.get(can_config_key, "{}"))

for key, val in mapper.items():
Expand All @@ -104,7 +102,7 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]:

def load_config(
path: Optional[typechecking.AcceptedIOType] = None,
config=None,
config: Optional[Dict[str, Any]] = None,
context: Optional[str] = None,
) -> typechecking.BusConfig:
"""
Expand Down Expand Up @@ -158,16 +156,19 @@ def load_config(
config = {}

# use the given dict for default values
config_sources = [
given_config,
can.rc,
lambda _context: load_environment_config( # pylint: disable=unnecessary-lambda
_context
),
lambda _context: load_environment_config(),
lambda _context: load_file_config(path, _context),
lambda _context: load_file_config(path),
]
config_sources = cast(
Iterable[Union[Dict[str, Any], Callable[[Any], Dict[str, Any]]]],
[
given_config,
can.rc,
lambda _context: load_environment_config( # pylint: disable=unnecessary-lambda
_context
),
lambda _context: load_environment_config(),
lambda _context: load_file_config(path, _context),
lambda _context: load_file_config(path),
],
)

# Slightly complex here to only search for the file config if required
for cfg in config_sources:
Expand All @@ -189,9 +190,7 @@ def load_config(
config[key] = None

if config["interface"] not in VALID_INTERFACES:
raise NotImplementedError(
"Invalid CAN Bus Type - {}".format(config["interface"])
)
raise NotImplementedError(f'Invalid CAN Bus Type "{config["interface"]}"')

if "bitrate" in config:
config["bitrate"] = int(config["bitrate"])
Expand All @@ -216,30 +215,33 @@ def load_config(
timing_conf[key] = int(config[key], base=0)
del config[key]
if timing_conf:
timing_conf["bitrate"] = config.get("bitrate")
timing_conf["bitrate"] = config["bitrate"]
config["timing"] = can.BitTiming(**timing_conf)

can.log.debug("can config: {}".format(config))
return config
can.log.debug("can config: %s", config)

return cast(typechecking.BusConfig, config)


def set_logging_level(level_name: str) -> None:
"""Set the logging level for the `"can"` logger.
def set_logging_level(level_name: Optional[str] = None):
"""Set the logging level for the "can" logger.
Expects one of: 'critical', 'error', 'warning', 'info', 'debug', 'subdebug'
:param level_name: One of: `'critical'`, `'error'`, `'warning'`, `'info'`,
`'debug'`, `'subdebug'`, or the value `None` (=default). Defaults to `'debug'`.
"""
can_logger = logging.getLogger("can")

try:
can_logger.setLevel(getattr(logging, level_name.upper())) # type: ignore
can_logger.setLevel(getattr(logging, level_name.upper()))
except AttributeError:
can_logger.setLevel(logging.DEBUG)
log.debug("Logging set to {}".format(level_name))
log.debug("Logging set to %s", level_name)


def len2dlc(length: int) -> int:
"""Calculate the DLC from data length.
:param int length: Length in number of bytes (0-64)
:param length: Length in number of bytes (0-64)
:returns: DLC (0-15)
"""
Expand All @@ -261,20 +263,17 @@ def dlc2len(dlc: int) -> int:
return CAN_FD_DLC[dlc] if dlc <= 15 else 64


def channel2int(channel: Optional[Union[typechecking.Channel]]) -> Optional[int]:
def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]:
"""Try to convert the channel to an integer.
:param channel:
Channel string (e.g. can0, CAN1) or integer
Channel string (e.g. `"can0"`, `"CAN1"`) or an integer
:returns: Channel integer or `None` if unsuccessful
:returns: Channel integer or ``None`` if unsuccessful
"""
if channel is None:
return None
if isinstance(channel, int):
return channel
# String and byte objects have a lower() method
if hasattr(channel, "lower"):
if isinstance(channel, str):
match = re.match(r".*(\d+)$", channel)
if match:
return int(match.group(1))
Expand All @@ -299,35 +298,33 @@ def library_function(new_arg):
def deco(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
rename_kwargs(f.__name__, kwargs, aliases)
_rename_kwargs(f.__name__, kwargs, aliases)
return f(*args, **kwargs)

return wrapper

return deco


def rename_kwargs(func_name, kwargs, aliases):
def _rename_kwargs(
func_name: str, kwargs: Dict[str, str], aliases: Dict[str, str]
) -> None:
"""Helper function for `deprecated_args_alias`"""
for alias, new in aliases.items():
if alias in kwargs:
value = kwargs.pop(alias)
if new is not None:
warnings.warn(
"{} is deprecated; use {}".format(alias, new), DeprecationWarning
)
warnings.warn(f"{alias} is deprecated; use {new}", DeprecationWarning)
if new in kwargs:
raise TypeError(
"{} received both {} (deprecated) and {}".format(
func_name, alias, new
)
f"{func_name} received both {alias} (deprecated) and {new}"
)
kwargs[new] = value
else:
warnings.warn("{} is deprecated".format(alias), DeprecationWarning)


def time_perfcounter_correlation():
def time_perfcounter_correlation() -> Tuple[float, float]:
"""Get the `perf_counter` value nearest to when time.time() is updated
Computed if the default timer used by `time.time` on this platform has a resolution
Expand Down
6 changes: 3 additions & 3 deletions test/test_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import warnings

from can.util import rename_kwargs
from can.util import _rename_kwargs


class RenameKwargsTest(unittest.TestCase):
Expand All @@ -11,7 +11,7 @@ def _test(self, kwargs, aliases):

# Test that we do get the DeprecationWarning when called with deprecated kwargs
with self.assertWarnsRegex(DeprecationWarning, "is deprecated"):
rename_kwargs("unit_test", kwargs, aliases)
_rename_kwargs("unit_test", kwargs, aliases)

# Test that the aliases contains the deprecated values and
# the obsolete kwargs have been removed
Expand All @@ -23,7 +23,7 @@ def _test(self, kwargs, aliases):
# Cause all warnings to always be triggered.
warnings.simplefilter("error", DeprecationWarning)
try:
rename_kwargs("unit_test", kwargs, aliases)
_rename_kwargs("unit_test", kwargs, aliases)
finally:
warnings.resetwarnings()

Expand Down

0 comments on commit 4048765

Please sign in to comment.