Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: extra keyword arguments given #82

Open
skylin008 opened this issue Nov 3, 2023 · 1 comment
Open

TypeError: extra keyword arguments given #82

skylin008 opened this issue Nov 3, 2023 · 1 comment

Comments

@skylin008
Copy link

Description

Thank you shared this impressive project, I used the version:2.3.7 umodbus code, when run this code, the error shows: TypeError:extra keyword arguments given. Follow as code:
`# modbus master driver for lidar

6 pieces lidar for system, address from 1 to 6

from right to left side, count closewize

system package

from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio

log = logging.MiniLog("RtuMaster", level = logging.DEBUG)
DEBUG = config.get('DEBUG', False)

MasterUartId = config.get("UART_LIDAR")

self defined package

from umodbus.serial import Serial as RTUMaster
from umodbus import version

gc.collect()

version = '1.0'
author = 'skylin'

register_definitions = {
"HREGS":{ # read hold registers
"slave_address":{
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
"6": 6
},
"register_address": 0x94,
"length": 0x02,
},
}

class Frame(object):
def init(self,data = None):
""" Data struct: """
if data is None:
self.data = bytearray()
else:
self.data = bytearray(data)

def __len__(self):
    return len(self.data)

class FrameBuffer(object):
def init(self, size):
"""
A FrameBuffer is used to buffer frames received from the Network
over the active Bus.The FrameBuffer is designed as a FIFO and is
useful to overcome the limited storing capability
"""
self._size = size
self._data = [[memoryview(bytearray(6))] for _ in range(size)]
self._index_write = 0
self._index_read = 0
self._null_sink = [0, 0, 0, 0]
self._count = 0

@micropython.native
def put(self, v):
    self._data[self._index_write] = v
    next_index = (self._index_write + 1) % self._size
    if next_index != self._index_read:
        self._index_write = self.next_index
        self._count += 1
    else:
        self.clear()
        if DEBUG:
            raise Exception("frame buffer over flow")

@micropython.native
def get(self):
    if self._index_read == self._index_write:
        return None

    lin_id, _, data = self._data[self._index_read]
    self._index_read = (self._index_read +1) % self._size
    self._count -= 1
    return Frame(data)  # Omit the data length


def any(self):
    if self._index_read == self._index_write:
        return False
    return True

def clear(self):
    self._index_write = 0
    self._index_read  = 0

class Master(object):
def init(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128):
self.rtu_pins = (Pin('A2'), Pin('A3'))
self.uart_id = uart_id
self.ctrlPin = ctrlPin
self.baudrate = baudrate

    self.host = RTUMaster(
        pins = self.rtu_pins,
        uart_id = self.uart_id,
        baudrate = self.baudrate,
        ctrl_pin = self.ctrlPin
        )

    self.framebuf = FrameBuffer(fbSize)

def write(self, registerdefinitions, idx):     
    slave_addr = registerdefinitions['HREGS']['slave_address']['idx']
    register_addr = registerdefinitions['HREGS']['register_address']
    qty = registerdefinitions['HREGS']['length']
     
    register_value = self.host.read_holding_registers(
                    slave_addr = slave_addr,
                    starting_addr = register_addr,
                    register_qty = qty,
                    signed = False
)   
    if not register_value:
        self.framebuf.put(register_value)

def read(self):
    if self.framebuf.any():
        return self.framebuf.get()

if name == 'main':

async def test():            
    ctrl_Pin = '485_TR1'
    m = Master(uart_id = 2, ctrlPin = '485_TR1')
    while True:
        for idx in range(7):
            w = m.write(register_definitions, idx)
                
            if w is not None:
                log.info(f"Lidar{idx} return data is:{w}")
            await asyncio.sleep_ms(30)  


async def start():
    asyncio.create_task(test())
    while True:
        await asyncio.sleep_ms(10)

app = None
async def main():
    import time
    t = time.localtime()
    log.info(t)
    gc.collect()
    global app
    
    await start()

try:
    gc.collect()
    asyncio.run(main())

except KeyboardInterrupt:
    log.info('Interrupt')

finally:
    asyncio.new_event_loop()
    log.info("run again")

`

Reproduction steps

...

MicroPython version

V1.22

MicroPython board

pyboard

MicroPython Modbus version

# e.g. v2.3.3
# use the following command to get the used version
import os
from umodbus import version
print('MicroPython infos:', os.uname())
print('Used micropthon-modbus version:', version.__version__))

Relevant log output

>>> %Run -c $EDITOR_CONTENT

MPY: sync filesystems
MPY: soft reboot
Task exception wasn't retrieved
future: <Task> coro= <generator object 'test' at 2000c850>
Traceback (most recent call last):
  File "asyncio/core.py", line 1, in run_until_complete
  File "<stdin>", line 147, in test
  File "<stdin>", line 117, in __init__
  File "umodbus/serial.py", line 1, in __init__
TypeError: extra keyword arguments given

User code

# modbus master driver for lidar 
# 6 pieces lidar for system, address from 1 to 6
# from right to left side, count closewize

# system package
from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio

log = logging.MiniLog("RtuMaster", level = logging.DEBUG)
DEBUG = config.get('DEBUG', False)

MasterUartId = config.get("UART_LIDAR")

# self defined package
from umodbus.serial import Serial as RTUMaster
from umodbus import version

gc.collect()

__version__ = '1.0'
__author__ = 'skylin'


register_definitions = {
    "HREGS":{   # read hold registers
        "slave_address":{
            "1": 1,
            "2": 2,
            "3": 3,
            "4": 4,
            "5": 5,
            "6": 6
        },
        "register_address": 0x94,
        "length": 0x02,
    },
}


class Frame(object):
    def __init__(self,data = None):
        """ Data struct: """        
        if data is None:
            self.data = bytearray()
        else:            
            self.data = bytearray(data)

    def __len__(self):
        return len(self.data)
    

class FrameBuffer(object):
    def __init__(self, size):
        """
        A FrameBuffer is used to buffer frames received from the Network
        over the active Bus.The FrameBuffer is designed as a FIFO and is 
        useful to overcome the limited storing capability
        """
        self._size = size
        self._data = [[memoryview(bytearray(6))] for _ in range(size)]
        self._index_write = 0
        self._index_read = 0
        self._null_sink = [0, 0, 0, 0]
        self._count = 0

    @micropython.native
    def put(self, v):
        self._data[self._index_write] = v
        next_index = (self._index_write + 1) % self._size
        if next_index != self._index_read:
            self._index_write = self.next_index
            self._count += 1
        else:
            self.clear()
            if DEBUG:
                raise Exception("frame buffer over flow")

    @micropython.native
    def get(self):
        if self._index_read == self._index_write:
            return None

        lin_id, _, data = self._data[self._index_read]
        self._index_read = (self._index_read +1) % self._size
        self._count -= 1
        return Frame(data)  # Omit the data length


    def any(self):
        if self._index_read == self._index_write:
            return False
        return True

    def clear(self):
        self._index_write = 0
        self._index_read  = 0

    


class Master(object):
    def __init__(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128):
        self.rtu_pins = (Pin('A2'), Pin('A3'))
        self.uart_id = uart_id
        self.ctrlPin = ctrlPin
        self.baudrate = baudrate

        

        self.host = RTUMaster(
            pins = self.rtu_pins,
            uart_id = self.uart_id,
            baudrate = self.baudrate,
            ctrl_pin = self.ctrlPin
            )

        self.framebuf = FrameBuffer(fbSize)

    def write(self, registerdefinitions, idx):     
        slave_addr = registerdefinitions['HREGS']['slave_address']['idx']
        register_addr = registerdefinitions['HREGS']['register_address']
        qty = registerdefinitions['HREGS']['length']
         
        register_value = self.host.read_holding_registers(
                        slave_addr = slave_addr,
                        starting_addr = register_addr,
                        register_qty = qty,
                        signed = False
    )   
        if not register_value:
            self.framebuf.put(register_value)

    def read(self):
        if self.framebuf.any():
            return self.framebuf.get()




if __name__ == '__main__':
    
    async def test():            
        ctrl_Pin = '485_TR1'
        m = Master(uart_id = 2, ctrlPin = '485_TR1')
        while True:
            for idx in range(7):
                w = m.write(register_definitions, idx)
                    
                if w is not None:
                    log.info(f"Lidar{idx} return data is:{w}")
                await asyncio.sleep_ms(30)  

    
    async def start():
        asyncio.create_task(test())
        while True:
            await asyncio.sleep_ms(10)

    app = None
    async def main():
        import time
        t = time.localtime()
        log.info(t)
        gc.collect()
        global app
        
        await start()

    try:
        gc.collect()
        asyncio.run(main())

    except KeyboardInterrupt:
        log.info('Interrupt')

    finally:
        asyncio.new_event_loop()
        log.info("run again")

Additional informations

No response

@volodink
Copy link

volodink commented Nov 10, 2023

Same thing, can't find what cause the problem nor fix it so far.

My code for Pyboard v1.0:

from pyb import Pin
Y1 = pyb.Pin(pyb.Pin.board.Y1, pyb.Pin.OUT)
Y2 = pyb.Pin(pyb.Pin.board.Y2, pyb.Pin.IN)

# self defined package
from umodbus.serial import Serial as RTUMaster


rtu_pins = (Pin('Y1'), Pin('Y2'))
host = RTUMaster(
            pins = rtu_pins,
            uart_id = 6,
            baudrate = 9600,
)

Error message:

MPY: sync filesystems
MPY: soft reboot
Traceback (most recent call last):
  File "<stdin>", line 13, in <module>
  File "umodbus/serial.py", line 108, in __init__
TypeError: extra keyword arguments given

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants