-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
构建dongle版的microbit插件 #17
Comments
@bilikyar 提到希望
我的建议是利用makecode构建新的ble 固件,非常简单。 提醒makecode默认的配置不会公开蓝牙,需要设置 ble服务细节: https://lancaster-university.github.io/microbit-docs/ble/profile/ |
这个库具体怎么用? 必须要配BLED 112这款USB Dongle吗? 我在华硕笔记本上运行这个代码报了下面的错误: RetryError[<Future at 0x2ee1130 state=finished raised BGAPIError>] |
@bilikyar 我目前是配合 BLED 112,对USB Dongle的要求,可以参考pygatt文档 |
最新的codelab-adapter内置了 测试插件: https://github.com/Scratch3Lab/codelab_adapter_extensions/blob/master/extension_dongle_scan_test.py |
对microbit scratch官方固件的读取已经完成: https://github.com/Scratch3Lab/codelab_adapter_extensions/blob/master/extension_dongle_read_test.py 对数据的解析 参考我之前的分析: https://blog.just4fun.site/scratch3-microbit-analysis.html |
@wangshub dongle插件应该是支持Makeblock写的固件是吗?我用此文章的方法做实验的时候遇到了问题,Microbit固件代码如下: 1.运行如下代码时,无法检测UART通道上写入的值 dongle.bled_connect('E1:4C:E0:01:43:18') #这是我Microbit的Mac地址
dongle.bled_subscribe('6e400002-b5a3-f393-e0a9-e50e24dcca9e', callback=read_cb) #订阅rx 2.运行如下代码时,Microbit没有任何变化 dongle.bled_write('6e400003-b5a3-f393-e0a9-e50e24dcca9e',0x0a) # 往tx写换行符 3.此固件我已经通过其他各类BLE软件测试过了,运行没问题 |
Hi,我的手上没有设备,暂时没办法测试,我观察到这个位置代码有些不一样,传参改成 List 试一下? dongle.bled_write('6e400003-b5a3-f393-e0a9-e50e24dcca9e', [0x0a]) # 往tx写换行符 |
@wangshub 恩,你帮到我了,最后确定了不是插件的问题,是我固件的问题: |
虽然这个插件能完美解决Scratch link的问题,可是scratch link的固件阉割掉了很多BLE的功能:
所以,我想研究codelab的固件,使得BLE 版 Microbit 更加强大 |
社区里开始有人写link的nodejs版本,应该对我们后面的BLE的研究有启发 |
我已经成功通过MackCode开启Microbit的任意的BLE服务(总过有七个服务,可是最多同时开3到4个,不然内存会不够)并通过dongle来对其读写订阅等,算是有了比较好的进展。现在就需要一个adapter插件和Scratch插件了。 |
经过一段时间的整理,我在dongle的基础上完成了一下内容,代码如下:
import time
import pygatt
import tenacity
import binascii
# pygatt api doc : https://github.com/peplin/pygatt/blob/master/pygatt/device.py
class Dongle(object):
def __init__(self):
self.adapter = pygatt.BGAPIBackend()
self.device = None
self.is_running = True
@tenacity.retry(stop=tenacity.stop_after_attempt(3))
def bled_start(self):
self.adapter.start()
def bled_stop(self):
self.adapter.stop()
self.is_running = False
def bled_scan(self):
devices = self.adapter.scan()
return devices
def bled_rssi(self):
return self.device.get_rssi()
@tenacity.retry(stop=tenacity.stop_after_attempt(3))
def bled_connect(self, mac_addr):
self.device = self.adapter.connect(
mac_addr, address_type=pygatt.BLEAddressType.random)
print('connect to {}'.format(mac_addr))
print("rssi : " + str(self.device.get_rssi()))
# print("chars : " + str(self.device.discover_characteristics()))
# for i in self.device.discover_characteristics():
# print(i)
time.sleep(1)
def bled_subscribe(self, uuid, callback=None, indication=False):
try:
self.device.subscribe(uuid, callback=callback, indication=indication)
except Exception as e:
print(e)
def bled_write(self, uuid, data,length = 1):
if isinstance(data,int):
data = data.to_bytes(length, byteorder='little')
print("写入:", bytearray(data))
self.device.char_write(uuid, bytearray(data))
def bled_read(self, uuid):
data = self.device.char_read(uuid)
return data
@staticmethod
def _ascii_to_hex(rawhex):
return binascii.unhexlify(rawhex)
if __name__ == '__main__':
def read_cb(handle, value):
a = value
b = [bytes([a[i]])+ bytes([a[i+1]]) for i in range(0,len(a),2)]
c = [int.from_bytes(i, byteorder='little', signed=True) for i in b]
print(c)
# bytes_str = binascii.hexlify(value)
# print(bytes_str)
dongle = Dongle()
try:
dongle.bled_start()
devices = dongle.bled_scan()
for dev in devices:
print(dev.get('address'), dev.get('rssi'), dev.get('name'))
target_mac_address = 'DA:19:8A:61:AD:95'
microbit_uuid = {
'button_A' : 'e95dda90-251d-470a-a062-fa1922dfa9a8', #按钮A数据
'button_B' : 'e95dda91-251d-470a-a062-fa1922dfa9a8', #按钮B数据
'accelerometer' : 'e95dca4b-251d-470a-a062-fa1922dfa9a8', #加速度数据
'Temperature' : 'e95d9250-251d-470a-a062-fa1922dfa9a8', #温度计数据
'magnetometer' : 'e95dfb11-251d-470a-a062-fa1922dfa9a8', #磁力计数据
'led_char' : 'e95d93ee-251d-470a-a062-fa1922dfa9a8', #led文字
'led_matrix' : 'e95d7b77-251d-470a-a062-fa1922dfa9a8', #led矩阵
'pin_date' : 'e95d8d00-251d-470a-a062-fa1922dfa9a8', # pin读写服务
'pinMode_config' : 'e95d5899-251d-470a-a062-fa1922dfa9a8', # pin引脚配置
'ADC_config' : 'e95db9fe-251d-470a-a062-fa1922dfa9a8', # pinAD配置
'PWM_config' : 'e95dd822-251d-470a-a062-fa1922dfa9a8', # PWM设置
'tx': '6e400003-b5a3-f393-e0a9-e50e24dcca9e', # tx设置
'rx': '6e400002-b5a3-f393-e0a9-e50e24dcca9e', # rx设置
}
dongle.bled_connect(target_mac_address) # 这是我Microbit的Mac地址
dongle.bled_write(microbit_uuid['led_matrix'], [17, 27, 27, 27, 27]) #矩阵图标的显示方式
dongle.bled_subscribe(microbit_uuid['accelerometer'], callback=read_cb,indication=False) # 订阅传感器数据
while True:
time.sleep(1)
finally:
dongle.bled_stop()
|
目前待解决的问题如下:
|
@wwj718 根据在 Scratch micro:bit extension与Scratch Link通信的细节 里分析的消息体的结构,我用dongle库编写了一下 adapter 的 Microbit ble 插件,由于库的限制无法呈现出 link 全部的功能,是一种单线程的实现方式。 还有个问题是,scratch 端主动断开之后,重新连接 Microbit,只能是通过重启插件的方式来连接。 '''
EIM: Everything Is Message
'''
import time
import threading
from codelab_adapter import settings
from codelab_adapter.core_extension import Extension
from codelab_adapter.utils import threaded
from codelab_adapter.dongle import Dongle
import time
import pygatt
import tenacity
import binascii
import base64
class BleMicrobit(Extension):
def __init__(self):
name = type(self).__name__ # class name
super().__init__(name)
self.TOPIC = "eim"
def read_cb(self, handle, value):
message = base64.b64encode(value)
self.publish({
"payload": {
"encoding": "base64",
"message": str(message)[2:-1] #处理字符
},
"method": "characteristicDidChange",
"topic": self.TOPIC,
})
@threaded
def message_monitor(self):
while self._running:
read_message = self.read()
topic = read_message.get("topic")
if topic == self.TOPIC:
self.logger.info("消息:{}".format(read_message))
method = read_message.get("method")
data = read_message.get("payload")
if method == 'discover':
try:
self.services = data.get('filters')[
0].get('services')[0]
except Exception as e:
self.publish({
"messageID": read_message.get("messageID"),
"error": str(e),
"topic": self.TOPIC,
})
else:
self.publish({
"messageID": read_message.get("messageID"),
"result": "",
"topic": self.TOPIC,
})
try:
self.logger.info("开始扫描BLE")
devices = self.dongle.bled_scan()
except Exception as e:
self.logger.info("BLE扫描错误:{}".format(e))
else:
for dev in devices:
try:
complete_list_16_bit_service_class_uuids = dev.get('packet_data').get(
'connectable_advertisement_packet').get('complete_list_16-bit_service_class_uuids')
except Exception as e:
pass
else:
if isinstance(complete_list_16_bit_service_class_uuids, bytearray):
if int.from_bytes(complete_list_16_bit_service_class_uuids, byteorder='little') == self.services:
self.publish({
"payload": {
"name": dev.get('name'),
"rssi": dev.get('rssi'),
"peripheralId": dev.get('address')
},
"method": "didDiscoverPeripheral",
"topic": self.TOPIC,
})
elif method == 'connect':
self.peripheralId = data.get("peripheralId")
try:
self.logger.info("开始连接")
self.dongle.bled_connect(self.peripheralId)
except Exception as e:
self.logger.info("连接错误:{}".format(str(e)))
self.publish({
"messageID": read_message.get("messageID"),
"error": str(e),
"topic": self.TOPIC
})
self.ble_connected = False
else:
self.publish({
"messageID": read_message.get("messageID"),
"result": "",
"topic": self.TOPIC
})
elif method == 'read':
characteristicId = data.get("characteristicId")
self.dongle.bled_subscribe(
characteristicId, callback=self.read_cb, indication=True) # 订阅传感器数据
self.publish(
{
"messageID": read_message.get("messageID"),
"result": {
"encoding": "base64",
"message": ""
},
"topic": self.TOPIC
}
)
elif method == 'write':
characteristicId = data.get("characteristicId")
text = data.get("message").encode('ascii')
self.logger.info("收到的消息为:{}".format(text))
try:
self.dongle.bled_write(
characteristicId, text)
except Exception as e:
self.logger.info("写消息错误:{}".format(e))
else:
self.publish(
{
"messageID": read_message.get("messageID"),
"topic": self.TOPIC,
"result": 6
}
)
def run(self):
self.dongle = Dongle()
try:
self.logger.info("启动BLE")
self.dongle.bled_start()
self.message_monitor()
while True:
time.sleep(1)
except Exception as e:
self.logger.info("BLE启动出错:{}".format(e))
finally:
self.dongle.bled_stop()
export = BleMicrobit |
@wwj718 是个很不错的工具, 尤其是用树莓派来做python教学的时候,通过BLE设备开展教学工作的好方法,最重要的一点不需要再添加额外的蓝牙适配器。 |
@wangshub 完成了dongle class,我这两天会发布一个包含dongle class新版本。
大家可以先用于本地测试
我之前已经完成了对microbit官方固件的分析:https://blog.just4fun.site/scratch3-microbit-analysis.html ,可以直接接入scratch microbit官方固件。read部分我测试已经没问题了。
The text was updated successfully, but these errors were encountered: