This Library uses the asyncio
framework to allow the asynchronous execution of functions.
Because most methods of the gravitrax_bridge.Bridge
class are asyncio coroutines they need to be started inside an asynchronous function itself.
This is achieved by putting the keyword async
in font of the main function and running it with asyncio.run()
afterwards.
import asyncio
async def main():
# Add your Code here
if __name__ == "__main__":
asyncio.run(main())
All async methods from gravitraxconnect need to be prefixed with the keyword await
.
More information about asyncio can be found in the asyncio documentation.
A basic program that simply connects to a bridge looks as follows:
import asyncio
from gravitraxconnect import gravitrax_bridge as gb
bridge = gb.Bridge()
async def main():
await bridge.connect()
if __name__ == "__main__":
asyncio.run(main())
The gravitrax_constants module can be imported to provide access to frequently used data like Statuscodes, Stonetypes and Color Values. Although optional, it's still recommended to use it as it future proofs your programs against potential changes and improves readability.
- How to use the Gravitrax Connect Library
- Table of Content
- Connecting and disconnecting from a Bridge
- Sending Signals
- Bridge Mode
- Notifications
- Getting Information about the Bridge
- Logging
- Additional Utility methods
Argument | Description | Default value |
---|---|---|
name_or_addr | The identifier(name or MAC-Address) for the Bridge. | gravitrax_constants.BRIDGE_NAME |
by_name | Specifies whether name_or_addr is a name(True) or MAC-Address(False) | True |
timeout | Maximum allowed Time in seconds for the scan or the connect process | 25 |
dc_callback | Callback function that is executed when the bridge is disconnected. | None |
try_reconnect | Specifies if there should be an attempt to reconnect after the connection is lost | False |
restart_notifications | Specifies if the Notifications should be enabled after a reconnect | True |
Return Value | Type | Description |
---|---|---|
True | boolean | Successfully connected to the Bridge |
False | boolean | Failed to connect to a bridge |
The connect() method is used to find a Bridge and connect to it. The default behavior is to look for a Bluetooth device with
the name "GravitraxConnect"(can be found in the gravitrax_constants file). When multiple bridges are available,
the connection will be established with the first one found. It's is possible to change
the name by adjusting the name_or_addr
argument. This doesn't really make sense as of now because every Bridge uses the name
"GravitraxConnect".
It's also possible to connect to a specific Bridge by its MAC-Address. In order to do that the by_name
argument needs to be set to False
and name_or_addr
must contain the MAC-Address in this format "FF:FF:FF:FF:FF:FF".
Both connect methods return a boolean value representing whether the connection attempt was successful.
# Connect by MAC-Address
if not await b.connect(name_or_addr="FF:FF:FF:FF:FF:FF",by_name=False, timeout=10):
# Connect by Name
if not await b.connect():
sys.exit("Could not find a bridge to connect to")
print("Connected to {} ({})".format(b.get_name(), b.get_address()))
Return Value | Type | Description |
---|---|---|
True | boolean | Active connection to bridge |
False | boolean | Not connected to the bridge |
Returns True if there is an active connection to the bridge.
while await b.is_connected():
print("Still connected")
Argument | Description | Default value |
---|---|---|
timeout | Maximum allowed time before the method returns false. The bridge may still disconnect | 20 |
dc_callback_on_timeout | If set to True the disconnect callback function is called if the disconnect takes longer than specified in the timeout parameter | False |
_user | Used to enable/disable the tracking if a disconnect was made intentional by the user or not. | True |
Return Value | Type | Description |
---|---|---|
True | boolean | Disconnect was successful |
False | boolean | Disconnect failed or timed out |
The connection to a bridge can be closed by calling disconnect()
.
If the bridge is not disconnected after the timeout period, the method returns False. It's also possible to call the disconnect callback in this case by setting dc_callback_on_timeout
to True.
The connection will be closed eventually even if a timeout occurs.
await b.disconnect(timeout=15, dc_callback_on_timeout=True)
It's possible to specify a custom callback function that is executed when the Bridge is disconnected. The callback has to accept the following Arguments:
- bridge: the Object of the Bridge that was disconnected
- **kwargs:
Argument | Description | typical values |
---|---|---|
user_disconnected | Indicates if the disconnect was intentional(i.e. disconnect() was called ) or not. | True/False |
by_timeout | If the disconnect callback was called because of a timeout(of disconnect() ) or because the bridge actually disconnected | True/False |
The callback function can be passed to connect().
Alternatively, the member variable 'dc_callback' can be accessed directly.
If the try_reconnect
member variable is set to True
the callback will try to reestablish an interrupted connection.
After a successful reconnect the notifications are reenabled by default. This can be changed by setting restart_notifications
to False
def disconnect_callback(bridge: gb.Bridge, **kwargs):
if kwargs.get('by_timeout'):
print(f"({bridge.get_address()}) Disconnect timed out")
elif kwargs.get('user_disconnected'):
print(f"({bridge.get_address()}) User Disconnected from Bridge!")
else:
print(f"({bridge.get_address()}) Connection to Bridge was interrupted")
def main():
# Passing the callback as a Argument
await b.connect(dc_callback=disconnect_callback, try_reconnect = True, restart_notifications = False)
# Or set directly to the variable
b.dc_callback = disconnect_callback
The Library implements multiple ways to send data to the bridge. send_signal() can be used to send signals to any receiver Stones (Lever, Starter, Switch, etc.). To send arbitrary data, the send_bytes method can be used. send_periodic() can be used to send multiple signals in a fixed time interval.
Argument | Description | Default value |
---|---|---|
status | Specifies which Blocks should react to the signal | |
color_channel | The color value of the signal | |
resends | How often a Signal is transmitted | 12 |
resend_gap | How long the added delay(in seconds) after every transmission(including resends) is | 0 |
uuid | The uuid of the Bridge to send the Signals. Should not be changed unless future updates to the Bridge change the uuid for writes | Default value specified in gravitrax_constants file |
header | The header of the send signal. Should not be changed unless there are changes to the communications protocol in the future | Default value specified in gravitrax_constants file |
stone | Specifies the Stonetype of the send signal. | Defaults to the value of the Bridge Stone(5) |
random_id | Boolean specifying if the the message_id value should be chosen randomly or incremented with each signal | False |
error_event | A asyncio.Event that is set when a error happens during the send operation | None |
The method send_signal() is used to send a Signal that is understood by Receiver Stones. The Arguments resend and resend_gap are used to prevent package loss at the cost of a higher delay between transmissions.
The argument error_event
can be used to specify a asyncio.Event
that is set when an error happens during the send process. An example how this can be used can be found in the send_periodic() Method.
if await b.send_signal(gv.STATUS_ALL, gv.COLOR_RED, stone=gv.STONE_REMOTE):
print("Send red Signal")
Argument | Description | Default value |
---|---|---|
status | Specifies which Stones should react to the signal | |
color_channel | The color value of the signal | |
count | The amount of signals to send | |
gap | The time in seconds between every transmission | 0 |
resends | How often a Signal is transmitted | 12 |
resend_gap | How long the added delay(in seconds) after every transmission(including resends) is | 0 |
stone | Specifies the Stonetype of the send signal. | Defaults to the value of the Bridge Stone(5) |
stop_on_failure | Specifies if the function should stop if a send fails | True |
Sends multiple Signals with a fixed time in between.
await b.send_periodic(gv.STATUS_ALL, gv.COLOR_RED, count=20, gap=2, stone=gv.STONE_REMOTE):
print("Sending 20 red Signals")
Argument | Description | Default value |
---|---|---|
data | The bytes to send to the bridge | |
resends | How often a Signal is transmitted | 1 |
resend_gap | How long the added delay(in seconds) after every transmission(including resends) is | 0 |
uuid | The uuid of the Bridge to send the Signals. Should not be changed unless future updates to the Bridge change the uuid for writes | Default value specified in gravitrax_constants file |
error_event | A asyncio.Event that is set when a error happens during the send operation | None |
The send_bytes() method can be used to send arbitrary data to the bridge. Doing this usually doesn't make sense because the Bridge only reacts to specific Signals.
In order to send a correct Signal send_signal() is used. The argument error_event
can be used to specify a asyncio.Event
which is set when an error occurs.
This is used by send_periodic() to stop if an error happens.
if await b.send_bytes(bytes([1,2,3])):
print("Send 1,2,3")
By using start_bridge_mode() a signal is sent that puts all receiving Stones in a mode where they only accept signals with the Stonetype set to Bridge. To return to normal operation, stop_bridge_mode() is used.
Send a signal that puts all receiving Stones into Bridge Mode, where all signals not from a Bridge are ignored.
if await b.start_bridge_mode():
print("Change to Bridge mode Requested")
Send a signal that puts all receiving stones into the normal operation Mode.
if await b.stop_bridge_mode():
print("Change to Normal mode Requested")
Argument | Description | Default value |
---|---|---|
callback | A callback function that is executed when a Notification is received | |
uuid | The uuid for Notifications. Should not be changed unless future updates to the Bridge change the uuid for writes | Default value specified in gravitrax_constants file |
Return Value | Type | Description |
---|---|---|
True | boolean | Enabling Notifications was successful |
False | boolean | Enabling Notifications failed |
Enables Notifications for the Bridge. A Callback function that is executed when a notification is received needs to be specified.
async def notification_callback(bridge: gb.Bridge, **signal):
# add your code here
if await b.notification_enable(notification_callback):
print("Notifications enabled")
The Arguments for the callback need to be like in the example above. Relevant information about the signal can be accessed from the keyword arguments.
async def notification_callback(bridge: gb.Bridge, **signal):
status = signal.get('Status')
stone = signal.get('Stone')
color = signal.get('Color')
if await b.notification_enable(notification_callback):
print("Notifications enabled")
The following keyword arguments are passed to the callback and can be accessed like above:
Argument | Description | typical values |
---|---|---|
Header | The header value. Always set to 19 | 19 |
Stone | The Type of Stone the Message was send from. | 1-11 |
Status | Used to transmit additional Information | 0-255 |
Reserved | Currently not used. Usually a random value | 0-255 |
Id | Id to differentiate consecutive Signals | 0-255 |
Checksum | the checksum of the Signal | 0-255 |
Color | The Color Channel of the Signal | 1,2,3 |
Data | The raw notification data | bytearray containing the Data |
A simple program that prints out useful information about a received Notification looks as follows:
async def notification_callback(bridge: gb.Bridge,**signal):
if signal.get("Header") :
print(f"New Notification: Stone={signal.get('Stone')}, Color={signal.get('Color')}")
else:
print(f"Received Data: {signal.get('Data')}")
async def main():
b = gb.Bridge()
await b.connect()
await b.notification_enable(notification_callback)
await asyncio.sleep(15.0)
await b.notification_disable()
if __name__ == "__main__":
asyncio.run(main())
Argument | Description | Default value |
---|---|---|
uuid | The uuid for Notifications. Should not be changed unless future updates to the Bridge change the uuid for writes | Default value specified in gravitrax_constants file |
Return Value | Type | Description |
---|---|---|
True | boolean | Disabling Notifications was successful |
False | boolean | Disabling Notifications failed |
Disables Notifications for the bridge
await b.notification_disable()
print("Notifications disabled")
The Library implements two ways to get the current battery level of the bridge. request_battery() returns an float value from 2 to 3.1 that represents the approximate Voltage of the batteries. These values can be interpreted as follows:
value | Voltage | Battery String |
---|---|---|
2 | <2V | Battery is empty. |
2.5 | 2V - 2,5V | Battery level is low |
2.9 | 2,5V - 3V | Battery level is medium |
3 | 3V | Battery level is high |
3.1 | 3V+ | Battery is full |
If request_battery_string() is used, the Strings in the above table are returned instead of voltage values.
Return Value | Type | Description |
---|---|---|
2-3.1 | float | The approx. Battery Voltage |
Returns the approximate battery Voltage
print(f"Battery level: {await b.request_battery()} V")
Return Value | Type | Description |
---|---|---|
Battery String | str | String for the current battery level |
Returns the approximate Battery level in form of a descriptive String
print(await b.request_battery_string())
Return Value | Type | Description |
---|---|---|
(int,int) | tuple | Tuple containing Firmware and Hardware version |
The Software and Hardware Version of a connected bridge can be read by using the request_bridge_info() method. It returns a Tuple containing 2 int values. The first value is the current Firmware Version, the second one the Hardware Version. When request_bridge_info() is called the Member Variables firmware and hardware are set to the received values. After successfully connecting to a Bridge request_bridge_infos() is called automatically so it's usually easier to use the member variables instead of request_bridge_infos().
info = await b.request_bridge_info()
print(f"Firmware: {info[0]} \n Hardware: {info[1]}")
# Output:
# Firmware: 16
# Hardware: 1
print(f"Firmware: {b.firmware} \n Hardware: {b.hardware}")
# Output:
# Firmware: 16
# Hardware: 1
Return Value | Type | Description |
---|---|---|
address String | str | The address of the bridge in the format FF:FF:FF:FF:FF:FF |
Returns the MAC-Address of the currently or last connected Bridge.
print(await b.get_address())
# Output: FF:FF:FF:FF:FF:FF
Return Value | Type | Description |
---|---|---|
name String | str | The name of the bridge |
Returns the Device Name of the currently or last connected Bridge.
print(await b.get_name())
# Output: GravitraxConnect
Prints out the GATT services of the bridge.
await b.print_services()
This library uses a logger from the logging
module to display status messages.
The logger is disabled by default, but can be enabled like so:
async def main():
gravitrax_bridge.logger.disabled = False
b = gravitrax_bridge.Bridge()
await b.connect()
# Output:
# (FF:FF:FF:FF:FF:FF) Connected to bridge (MTU:512)
if __name__ == "__main__":
asyncio.run(main())
To print your own status messages with this logger, the log_print()
function can be used.
The log_set_level()
function can be used to set the level of the logger without importing the logging module in your script.
Argument | Description | Default value |
---|---|---|
*text | Arguments to be printed | |
bridge | bridge object. When passed the address of the bridge is added to the printed message | None |
level | logging level for the message | 'INFO' |
Prints out the Arguments using the formatting of the log messages used in the library.
gravitrax_bridge.log_print(f"A simple log print")
# Output: 15:42:12.833 - INFO: A simple log print
gravitrax_bridge.log_print(f"Passing ", 3, " Arguments")
# Output: 15:42:12.833 - INFO: Passing 3 Arguments
gravitrax_bridge.log_print(f"Connected!",bridge=b)
# Output: 15:42:16.082 - INFO: (FF:FF:FF:FF:FF:FF) Connected to Bridge!
gravitrax_bridge.log_print(f"Connected!",bridge=b, level='DEBUG')
#Output: 15:42:16.082 - DEBUG: (FF:FF:FF:FF:FF:FF) Connected to Bridge!
Argument | Description | Default value |
---|---|---|
level | logging level to be used | 'INFO' |
Sets the logging level of the logger to the one specified.
gravitrax_bridge.log_set_level('NOTSET') # All Messages are printed
gravitrax_bridge.log_set_level('DEBUG') # DEBUG, INFO, WARNING, ERROR and CRITICAL Messages are printed
gravitrax_bridge.log_set_level('INFO') # INFO, WARNING, ERROR and CRITICAL Messages are printed
gravitrax_bridge.log_set_level('WARNING') # WARNING, ERROR and CRITICAL Messages are printed
gravitrax_bridge.log_set_level('ERROR') # ERROR and CRITICAL Messages are printed
gravitrax_bridge.log_set_level('CRITICAL')# CRITICAL Messages are printed
Argument | Description | Default value |
---|---|---|
name | name to filter the scan results with. All other names are discarded | gravitrax_constants.BRIDGE_NAME |
timeout | The duration of the scan | 10 |
do_print | If set to True the discovered device is printed as soon as it is discovered | False |
stop_on_hit | If set to True the scan is stopped as soon as one device is found | False |
Return Value | Type | Description |
---|---|---|
[String,String,..] | List | A List containing the MAC-Addresses of all discovered devices with the specified name |
Scan for Bluetooth devices with the specified name. By default the standard name for Gravitrax Power Bridges is used. A List containing all MAC-Addresses of the found Bluetooth devices is returned.
Argument | Description | Default value |
---|---|---|
data | the data bytes to calculate the checksum from | |
for_received | Specifies whether the calculation should be done for Signals received(True) from or send(False) to the bridge | False |
Return Value | Type | Description |
---|---|---|
int | int | the calculated checksum for the passed signal data |
Returns the checksum for a 7-Byte Signal. The calculation for send and received Signals is slightly different. By default the calculation is
for a send signal. To calculate the checksum for a received Signal for_received
needs to be set to True.
print(b.calc_checksum(bytes([1,1,1,1,1,0,3])))
# output:
# 5
print(b.calc_checksum(bytes([1,1,1,1,1,0,3]),for_received=True))
# output:
# 7
Argument | Description | Default value |
---|---|---|
data | the data bytes to calculate the checksum from | |
for_received | Specifies whether the calculation should be done for Signals received(True) from or send(False) to the bridge | False |
Return Value | Type | Description |
---|---|---|
bytes([byte,byte,byte,byte,byte,byte,byte]) | bytes | the data bytes passed as argument with the calculated checksum |
Returns the bytes for a signal with the content of the checksum field substituted for the correct checksum.
print(b.add_checksum(bytes([1,1,1,1,1,0,1])))
# output:
# b'\x01\x01\x01\x01\x01\x05\x01'