Skip to content

Commit

Permalink
fix: typing errors with new mypy config
Browse files Browse the repository at this point in the history
  • Loading branch information
OdoctorG committed Jul 5, 2024
1 parent 58b3f3c commit 610e94c
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 16 deletions.
6 changes: 3 additions & 3 deletions python/cloud-demo/lib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import binascii
import queue
from threading import Thread
from typing import Any, Callable
from typing import Any, Callable, Sequence

import remotivelabs.broker.sync as br
from typing_extensions import Self
Expand Down Expand Up @@ -49,7 +49,7 @@ def list_signal_names(self) -> list[str]:
signal_names.append(sinfo.id.name)
return signal_names

def subscribe(self, signals: list[br.network_api_pb2.Signal], on_frame: Callable[..., None], changed_values_only: bool = True) -> Any:
def subscribe(self, signals: list[str], on_frame: Callable[..., None], changed_values_only: bool = True) -> Any:
client_id = br.common_pb2.ClientId(id="cloud_demo")

signals_to_subscribe_on = map(lambda signal: self.signal_creator.signal(signal, "custom_can"), signals)
Expand All @@ -73,7 +73,7 @@ def subscribe(self, signals: list[br.network_api_pb2.Signal], on_frame: Callable
def connect(cls, url: str, api_key: str | None = None, access_token: str | None = None) -> Self:
return Broker(url, api_key, access_token) # type: ignore

def __each_signal(self, signals: br.network_api_pb2.Signals, callback: Callable[..., Any]) -> None:
def __each_signal(self, signals: Sequence[br.network_api_pb2.Signal], callback: Callable[..., Any]) -> None:
callback(map(lambda s: {"timestamp_nanos": s.timestamp, "name": s.id.name, "value": self.__get_value(s)}, signals))

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions python/playback-record/playback.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import signal as sig
import time
from threading import Event
from typing import Any, Optional
from typing import Any, Optional, Sequence

import grpc
import remotivelabs.broker.sync as br
Expand Down Expand Up @@ -105,7 +105,7 @@ def ecu_b_subscribe_(stub: br.network_api_pb2_grpc.NetworkServiceStub) -> None:
print(err)


def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: br.network_api_pb2.Signals, pause: int) -> None:
def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: Sequence[br.common_pb2.SignalId], pause: int) -> None:
"""Simple reading with timer, logs on purpose tabbed with double space
Parameters
Expand Down
8 changes: 4 additions & 4 deletions python/reflector-ecu/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def ecu_a(stub: br.network_api_pb2_grpc.NetworkServiceStub, signal_creator: br.S
increasing_counter = (increasing_counter + 1) % 4


def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: br.network_api_pb2.Signals, pause: int) -> None:
def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: Sequence[br.common_pb2.SignalId], pause: int) -> None:
"""Simple reading with timer
Parameters
Expand Down Expand Up @@ -130,7 +130,7 @@ def get_value_pair(signal: br.network_api_pb2.Signal) -> Tuple[str, Any]:
def act_on_signal(
client_id: br.common_pb2.ClientId,
stub: br.network_api_pb2_grpc.NetworkServiceStub,
sub_signals: br.common_pb2.SignalId,
sub_signals: Sequence[br.common_pb2.SignalId],
on_change: bool,
fun: Callable[[Any], None],
on_subcribed: Callable[[Any], None] | None = None,
Expand Down Expand Up @@ -196,7 +196,7 @@ def double_and_publish(
network_stub: br.network_api_pb2_grpc.NetworkServiceStub,
client_id: br.common_pb2.ClientId,
trigger: Any,
signals: br.network_api_pb2.Signals,
signals: Sequence[br.network_api_pb2.Signal],
signal_creator: br.SignalCreator,
) -> None:
if signal_creator is None:
Expand Down Expand Up @@ -262,7 +262,7 @@ def modify_signals_publish_frame(
network_stub: br.network_api_pb2_grpc.NetworkServiceStub,
client_id: br.common_pb2.ClientId,
destination_namespace_name: str,
signals: br.network_api_pb2.Signals,
signals: Sequence[br.network_api_pb2.Signal],
) -> None:
"""Modifiy recieved signals and publish them."""

Expand Down
2 changes: 1 addition & 1 deletion python/restbus/restbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple

import google.protobuf.internal.containers # type: ignore
import google.protobuf.internal.containers
import remotivelabs.broker.sync as br
from grpc import Channel
from typing_extensions import TypeAlias
Expand Down
6 changes: 3 additions & 3 deletions python/subscribe-to-scripted-signal/subscribe_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import queue
import time
from threading import Thread
from typing import Any, Callable, Generator, Optional, Tuple
from typing import Any, Callable, Generator, Optional, Sequence, Tuple

import remotivelabs.broker.sync as br

Expand All @@ -15,7 +15,7 @@ def subscribe(
client_id: br.common_pb2.ClientId,
network_stub: br.network_api_pb2_grpc.NetworkServiceStub,
script: bytes,
on_subscribe: Callable[[br.network_api_pb2.Signals], None],
on_subscribe: Callable[[Sequence[br.network_api_pb2.Signal]], None],
on_change: bool = False,
) -> Tuple[Any, Thread]:
# pylint: disable=R0913
Expand Down Expand Up @@ -54,7 +54,7 @@ def _get_value_str(signal: br.network_api_pb2.Signal) -> str:
return "empty"


def printer(signals: br.network_api_pb2.Signals) -> None:
def printer(signals: Sequence[br.network_api_pb2.Signal]) -> None:
for signal in signals:
print(f"{signal.id.name} {signal.id.namespace.name} {_get_value_str(signal)}")

Expand Down
6 changes: 3 additions & 3 deletions python/subscribe-to-scripted-signal/subscribe_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import time
from threading import Thread
from typing import Any, Callable, Generator, Optional, Tuple
from typing import Any, Callable, Generator, Optional, Sequence, Tuple

import remotivelabs.broker.sync as br

Expand All @@ -16,7 +16,7 @@ def subscribe(
client_id: br.common_pb2.ClientId,
network_stub: br.network_api_pb2_grpc.NetworkServiceStub,
script: bytes,
on_subscribe: Callable[[br.network_api_pb2.Signals], None],
on_subscribe: Callable[[Sequence[br.network_api_pb2.Signal]], None],
on_change: bool = False,
) -> Tuple[Any, Thread]:
# pylint: disable=R0913
Expand Down Expand Up @@ -55,7 +55,7 @@ def _get_value_str(signal: br.network_api_pb2.Signal) -> str:
return "empty"


def printer(signals: br.network_api_pb2.Signals) -> None:
def printer(signals: Sequence[br.network_api_pb2.Signal]) -> None:
for signal in signals:
print(f"{signal.id.name} {signal.id.namespace.name} {_get_value_str(signal)}")

Expand Down

0 comments on commit 610e94c

Please sign in to comment.