You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Thank you shared this impressive project, I used the version:2.3.7 umodbus code, when run this code, the error shows: TypeError:extra keyword arguments given. Follow as code:
`# modbus master driver for lidar
6 pieces lidar for system, address from 1 to 6
from right to left side, count closewize
system package
from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio
class Frame(object):
def init(self,data = None):
""" Data struct: """
if data is None:
self.data = bytearray()
else:
self.data = bytearray(data)
def __len__(self):
return len(self.data)
class FrameBuffer(object):
def init(self, size):
"""
A FrameBuffer is used to buffer frames received from the Network
over the active Bus.The FrameBuffer is designed as a FIFO and is
useful to overcome the limited storing capability
"""
self._size = size
self._data = [[memoryview(bytearray(6))] for _ in range(size)]
self._index_write = 0
self._index_read = 0
self._null_sink = [0, 0, 0, 0]
self._count = 0
@micropython.native
def put(self, v):
self._data[self._index_write] = v
next_index = (self._index_write + 1) % self._size
if next_index != self._index_read:
self._index_write = self.next_index
self._count += 1
else:
self.clear()
if DEBUG:
raise Exception("frame buffer over flow")
@micropython.native
def get(self):
if self._index_read == self._index_write:
return None
lin_id, _, data = self._data[self._index_read]
self._index_read = (self._index_read +1) % self._size
self._count -= 1
return Frame(data) # Omit the data length
def any(self):
if self._index_read == self._index_write:
return False
return True
def clear(self):
self._index_write = 0
self._index_read = 0
async def test():
ctrl_Pin = '485_TR1'
m = Master(uart_id = 2, ctrlPin = '485_TR1')
while True:
for idx in range(7):
w = m.write(register_definitions, idx)
if w is not None:
log.info(f"Lidar{idx} return data is:{w}")
await asyncio.sleep_ms(30)
async def start():
asyncio.create_task(test())
while True:
await asyncio.sleep_ms(10)
app = None
async def main():
import time
t = time.localtime()
log.info(t)
gc.collect()
global app
await start()
try:
gc.collect()
asyncio.run(main())
except KeyboardInterrupt:
log.info('Interrupt')
finally:
asyncio.new_event_loop()
log.info("run again")
`
Reproduction steps
...
MicroPython version
V1.22
MicroPython board
pyboard
MicroPython Modbus version
# e.g. v2.3.3# use the following command to get the used versionimportosfromumodbusimportversionprint('MicroPython infos:', os.uname())
print('Used micropthon-modbus version:', version.__version__))
Relevant log output
>>> %Run -c $EDITOR_CONTENT
MPY: sync filesystems
MPY: soft reboot
Task exception wasn't retrievedfuture: <Task> coro= <generator object 'test' at 2000c850>Traceback (most recent call last): File "asyncio/core.py", line 1, in run_until_complete File "<stdin>", line 147, in test File "<stdin>", line 117, in __init__ File "umodbus/serial.py", line 1, in __init__TypeError: extra keyword arguments given
User code
# modbus master driver for lidar # 6 pieces lidar for system, address from 1 to 6# from right to left side, count closewize# system packagefrommachineimportPinimportloggingimportgcfromInitConfigimportconfigimportuasyncioasasynciolog=logging.MiniLog("RtuMaster", level=logging.DEBUG)
DEBUG=config.get('DEBUG', False)
MasterUartId=config.get("UART_LIDAR")
# self defined packagefromumodbus.serialimportSerialasRTUMasterfromumodbusimportversiongc.collect()
__version__='1.0'__author__='skylin'register_definitions= {
"HREGS":{ # read hold registers"slave_address":{
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
"6": 6
},
"register_address": 0x94,
"length": 0x02,
},
}
classFrame(object):
def__init__(self,data=None):
""" Data struct: """ifdataisNone:
self.data=bytearray()
else:
self.data=bytearray(data)
def__len__(self):
returnlen(self.data)
classFrameBuffer(object):
def__init__(self, size):
""" A FrameBuffer is used to buffer frames received from the Network over the active Bus.The FrameBuffer is designed as a FIFO and is useful to overcome the limited storing capability """self._size=sizeself._data= [[memoryview(bytearray(6))] for_inrange(size)]
self._index_write=0self._index_read=0self._null_sink= [0, 0, 0, 0]
self._count=0@micropython.nativedefput(self, v):
self._data[self._index_write] =vnext_index= (self._index_write+1) %self._sizeifnext_index!=self._index_read:
self._index_write=self.next_indexself._count+=1else:
self.clear()
ifDEBUG:
raiseException("frame buffer over flow")
@micropython.nativedefget(self):
ifself._index_read==self._index_write:
returnNonelin_id, _, data=self._data[self._index_read]
self._index_read= (self._index_read+1) %self._sizeself._count-=1returnFrame(data) # Omit the data lengthdefany(self):
ifself._index_read==self._index_write:
returnFalsereturnTruedefclear(self):
self._index_write=0self._index_read=0classMaster(object):
def__init__(self, uart_id, baudrate=115200, ctrlPin=None, fbSize=128):
self.rtu_pins= (Pin('A2'), Pin('A3'))
self.uart_id=uart_idself.ctrlPin=ctrlPinself.baudrate=baudrateself.host=RTUMaster(
pins=self.rtu_pins,
uart_id=self.uart_id,
baudrate=self.baudrate,
ctrl_pin=self.ctrlPin
)
self.framebuf=FrameBuffer(fbSize)
defwrite(self, registerdefinitions, idx):
slave_addr=registerdefinitions['HREGS']['slave_address']['idx']
register_addr=registerdefinitions['HREGS']['register_address']
qty=registerdefinitions['HREGS']['length']
register_value=self.host.read_holding_registers(
slave_addr=slave_addr,
starting_addr=register_addr,
register_qty=qty,
signed=False
)
ifnotregister_value:
self.framebuf.put(register_value)
defread(self):
ifself.framebuf.any():
returnself.framebuf.get()
if__name__=='__main__':
asyncdeftest():
ctrl_Pin='485_TR1'm=Master(uart_id=2, ctrlPin='485_TR1')
whileTrue:
foridxinrange(7):
w=m.write(register_definitions, idx)
ifwisnotNone:
log.info(f"Lidar{idx} return data is:{w}")
awaitasyncio.sleep_ms(30)
asyncdefstart():
asyncio.create_task(test())
whileTrue:
awaitasyncio.sleep_ms(10)
app=Noneasyncdefmain():
importtimet=time.localtime()
log.info(t)
gc.collect()
globalappawaitstart()
try:
gc.collect()
asyncio.run(main())
exceptKeyboardInterrupt:
log.info('Interrupt')
finally:
asyncio.new_event_loop()
log.info("run again")
Additional informations
No response
The text was updated successfully, but these errors were encountered:
MPY: sync filesystems
MPY: soft reboot
Traceback (most recent call last):
File "<stdin>", line 13, in <module>
File "umodbus/serial.py", line 108, in __init__
TypeError: extra keyword arguments given
Description
Thank you shared this impressive project, I used the version:2.3.7 umodbus code, when run this code, the error shows: TypeError:extra keyword arguments given. Follow as code:
`# modbus master driver for lidar
6 pieces lidar for system, address from 1 to 6
from right to left side, count closewize
system package
from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio
log = logging.MiniLog("RtuMaster", level = logging.DEBUG)
DEBUG = config.get('DEBUG', False)
MasterUartId = config.get("UART_LIDAR")
self defined package
from umodbus.serial import Serial as RTUMaster
from umodbus import version
gc.collect()
version = '1.0'
author = 'skylin'
register_definitions = {
"HREGS":{ # read hold registers
"slave_address":{
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
"6": 6
},
"register_address": 0x94,
"length": 0x02,
},
}
class Frame(object):
def init(self,data = None):
""" Data struct: """
if data is None:
self.data = bytearray()
else:
self.data = bytearray(data)
class FrameBuffer(object):
def init(self, size):
"""
A FrameBuffer is used to buffer frames received from the Network
over the active Bus.The FrameBuffer is designed as a FIFO and is
useful to overcome the limited storing capability
"""
self._size = size
self._data = [[memoryview(bytearray(6))] for _ in range(size)]
self._index_write = 0
self._index_read = 0
self._null_sink = [0, 0, 0, 0]
self._count = 0
class Master(object):
def init(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128):
self.rtu_pins = (Pin('A2'), Pin('A3'))
self.uart_id = uart_id
self.ctrlPin = ctrlPin
self.baudrate = baudrate
if name == 'main':
`
Reproduction steps
...
MicroPython version
V1.22
MicroPython board
pyboard
MicroPython Modbus version
Relevant log output
User code
Additional informations
No response
The text was updated successfully, but these errors were encountered: