Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Penify] Full Repo Documentation #48

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pymammotion/luba/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ def __init__(self):
self.on_status_change: Optional[Callable[[StatusType], None]] = None

def _set_rapid_state(self, state: RapidState):
"""Set the rapid state and trigger callbacks if certain state attributes
have changed.

Args:
state (RapidState): The new RapidState object to set.
"""

old_state = self._rapid_state
self._rapid_state = state
if old_state:
Expand All @@ -43,6 +50,13 @@ def _set_rapid_state(self, state: RapidState):
)

def _set_status(self, status: StatusType):
"""Set the status of the object and trigger the on_status_change callback
if available.

Args:
status (StatusType): The status to be set for the object.
"""

self._status = status
if self.on_status_change:
self.on_status_change(status)
Expand Down
13 changes: 13 additions & 0 deletions pymammotion/utility/constant/device_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ class WorkMode:


def device_mode(value):
"""Return the mode corresponding to the given value.

This function takes a value as input and returns the corresponding mode
based on a predefined mapping.

Args:
value (int): An integer representing the mode value.

Returns:
str: The mode corresponding to the input value. Returns "Invalid mode" if no
match is found.
"""

modes = {
0: "MODE_NOT_ACTIVE",
1: "MODE_ONLINE",
Expand Down
42 changes: 42 additions & 0 deletions pymammotion/utility/datatype_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ class DatatypeConverter:

@staticmethod
def init_encode_map():
"""Initialize the encode map for DatatypeConverter if it is not already
initialized.

This function creates a mapping of indices to characters for encoding
purposes. The mapping includes uppercase letters, lowercase letters,
digits, and two special characters.

Returns:
list: A list representing the encode map.
"""

if DatatypeConverter.encode_map is None:
cArr = [0] * 64
for i in range(26):
Expand Down Expand Up @@ -33,6 +44,21 @@ def encode(i):

@staticmethod
def _printBase64Binary(bArr, i=0, i2=None):
"""Convert a binary array to a Base64 encoded string.

This function takes a binary array and converts it to a Base64 encoded
string.

Args:
bArr (list): A list of binary values to be converted.
i (int): Starting index in the binary array (default is 0).
i2 (int): Ending index in the binary array (default is None, which means end of
the array).

Returns:
str: The Base64 encoded string representing the binary array.
"""

if i2 is None:
i2 = len(bArr)
cArr = [""] * (((i2 + 2) // 3) * 4)
Expand All @@ -41,6 +67,22 @@ def _printBase64Binary(bArr, i=0, i2=None):

@staticmethod
def _printBase64Binary_core(bArr, i, i2, cArr, i3):
"""Perform base64 encoding on a binary array.

This function encodes the binary array `bArr` into base64 format and
stores the result in the character array `cArr`.

Args:
bArr (list): The input binary array to be encoded.
i (int): Starting index in the binary array.
i2 (int): Length of the binary array.
cArr (list): The character array to store the base64 encoded result.
i3 (int): Starting index in the character array.

Returns:
int: The index in the character array after encoding.
"""

DatatypeConverter.init_encode_map() # Ensure encode_map is initialized
while i2 >= 3:
cArr[i3] = DatatypeConverter.encode(bArr[i] >> 2)
Expand Down
114 changes: 114 additions & 0 deletions pymammotion/utility/device_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ def set_value(self, value):

@staticmethod
def valueof(value):
"""Return the corresponding DeviceType based on the input value.

Args:
value (int): An integer representing a specific DeviceType.

Returns:
DeviceType: The corresponding DeviceType based on the input value.
"""

if value == 0:
return DeviceType.RTK
elif value == 1:
Expand All @@ -43,6 +52,17 @@ def valueof(value):

@staticmethod
def value_of_str(device_name, product_key=""):
"""Determine the type of device based on the provided device name and
product key.

Args:
device_name (str): The name of the device.
product_key (str?): The product key associated with the device. Defaults to "".

Returns:
DeviceType: The type of device based on the provided information.
"""

if not device_name and not product_key:
return DeviceType.UNKNOWN

Expand Down Expand Up @@ -73,6 +93,20 @@ def value_of_str(device_name, product_key=""):

@staticmethod
def has_4g(device_name, product_key=""):
"""Check if the device supports 4G connectivity.

This function determines if the device specified by 'device_name'
supports 4G connectivity. If 'product_key' is provided, it is used to
further identify the device type.

Args:
device_name (str): The name of the device.
product_key (str?): The product key of the device. Defaults to "".

Returns:
bool: True if the device supports 4G connectivity, False otherwise.
"""

if not product_key:
device_type = DeviceType.value_of_str(device_name)
else:
Expand All @@ -82,6 +116,20 @@ def has_4g(device_name, product_key=""):

@staticmethod
def is_luba1(device_name, product_key=""):
"""Check if the given device is of type LUBA.

This function determines if the device with the given name and optional
product key is of type LUBA by comparing its device type value with the
LUBA device type value.

Args:
device_name (str): The name of the device.
product_key (str?): The product key of the device. Defaults to "".

Returns:
bool: True if the device is of type LUBA, False otherwise.
"""

if not product_key:
device_type = DeviceType.value_of_str(device_name)
else:
Expand All @@ -91,6 +139,17 @@ def is_luba1(device_name, product_key=""):

@staticmethod
def is_luba_2(device_name, product_key=""):
"""Check if the device type is LUBA 2 or higher based on the device name
and optional product key.

Args:
device_name (str): The name of the device.
product_key (str?): The product key associated with the device. Defaults to "".

Returns:
bool: True if the device type is LUBA 2 or higher, False otherwise.
"""

if not product_key:
device_type = DeviceType.value_of_str(device_name)
else:
Expand All @@ -100,13 +159,39 @@ def is_luba_2(device_name, product_key=""):

@staticmethod
def is_yuka(device_name):
"""Check if the given device name corresponds to a Yuka device.

This function takes a device name as input and checks if it corresponds
to a Yuka device.

Args:
device_name (str): The name of the device to be checked.

Returns:
bool: True if the device is a Yuka device, False otherwise.
"""

return (
DeviceType.value_of_str(device_name).get_value()
== DeviceType.LUBA_YUKA.get_value()
)

@staticmethod
def is_rtk(device_name, product_key=""):
"""Check if the device type is within the range of RTK devices.

This function determines if the device type identified by `device_name`
and optional `product_key` falls within the range of RTK (Real-Time
Kinematic) devices.

Args:
device_name (str): The name of the device.
product_key (str?): The product key associated with the device. Defaults to "".

Returns:
bool: True if the device type is within the RTK range, False otherwise.
"""

if not product_key:
device_type = DeviceType.value_of_str(device_name)
else:
Expand All @@ -120,12 +205,32 @@ def is_rtk(device_name, product_key=""):

@staticmethod
def contain_rtk_product_key(product_key):
"""Check if the given product key is contained in a predefined list of RTK
product keys.

Args:
product_key (str): The product key to be checked.

Returns:
bool: True if the product key is in the predefined list, False otherwise.
"""

if not product_key:
return False
return product_key in ["a1qXkZ5P39W", "a1Nc68bGZzX"]

@staticmethod
def contain_luba_product_key(product_key):
"""Check if the given product key is in the list of valid Luba product
keys.

Args:
product_key (str): The product key to be checked.

Returns:
bool: True if the product key is in the list of valid keys, False otherwise.
"""

if not product_key:
return False
return product_key in [
Expand All @@ -144,6 +249,15 @@ def contain_luba_product_key(product_key):

@staticmethod
def contain_luba_2_product_key(product_key):
"""Check if the given product key is contained in a predefined list.

Args:
product_key (str): The product key to be checked.

Returns:
bool: True if the product key is in the predefined list, False otherwise.
"""

if not product_key:
return False
return product_key in ["a1iMygIwxFC", "a1LLmy1zc0j", "a1LLmy1zc0j"]
Expand Down
62 changes: 62 additions & 0 deletions pymammotion/utility/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,24 @@ def __init__(self, func, time):
self._task = None

def start(self):
"""Start the task if it is not already started.

If the task is not already started, it sets the 'is_started' flag to
True and starts a task to call the '_run' function periodically using
asyncio.
"""

if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())

async def stop(self):
"""Stop the task if it is currently running.

If the task is running, it will be cancelled and awaited until it stops.
"""

if self.is_started:
self.is_started = False
# Stop task and await it stopped:
Expand All @@ -24,14 +36,64 @@ async def stop(self):
await self._task

async def _run(self):
"""Asynchronously runs the provided function at regular intervals.

This method runs the provided function asynchronously at regular
intervals specified by the 'time' attribute of the object.
"""

while True:
await asyncio.sleep(self.time)
await self.func()


def periodic(period):
"""Decorator to schedule a function to run periodically at a specified
interval.

Args:
period (int): The time interval in seconds at which the function should run
periodically.

Returns:
function: A decorator function that schedules the decorated function to run
periodically.
Usage:
async def my_periodic_function():
# Code to be executed periodically

Note:
The decorated function will be scheduled to run in an infinite loop with
the specified time interval.
"""

def scheduler(fcn):
"""Schedule the execution of a given asynchronous function at regular
intervals.

This function takes an asynchronous function as input and schedules its
execution at regular intervals.

Args:
fcn (function): An asynchronous function to be scheduled.

Returns:
function: A wrapper function that schedules the execution of the input function.
"""

async def wrapper(*args, **kwargs):
"""Execute the given function periodically using asyncio tasks.

This function continuously creates an asyncio task to execute the
provided function with the given arguments and keyword arguments. It
then waits for the specified period of time before creating the next
task.

Args:
*args: Variable length argument list to be passed to the function.
**kwargs: Arbitrary keyword arguments to be passed to the function.
"""

while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
Expand Down
Loading
Loading