diff --git a/pyproject.toml b/pyproject.toml index 2053921..31020fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -256,7 +256,9 @@ useLibraryCodeForTypes = true [tool.pytest.ini_options] addopts = "--order-scope=module --cov-config=pyproject.toml" doctest_optionflags = "ELLIPSIS NORMALIZE_WHITESPACE IGNORE_EXCEPTION_DETAIL" -filterwarnings = [] +filterwarnings = [ + "ignore:Type google._upb._message.:DeprecationWarning" +] junit_family = "xunit2" junit_logging = "all" markers = [ @@ -338,7 +340,9 @@ order-by-type = false ] "tests/server/tekhsi_test_server.py" = [ # TODO: remove this entire ignore section - "N80" + "N80", + "PLW0602", + "SIM" ] [tool.semantic_release] diff --git a/tests/server/tekhsi_test_server.py b/tests/server/tekhsi_test_server.py index 182351b..ed2140b 100644 --- a/tests/server/tekhsi_test_server.py +++ b/tests/server/tekhsi_test_server.py @@ -1,3 +1,4 @@ +# pylint: disable=global-variable-not-assigned """This file provides a simple TekHSI streaming server implementation for testing. The primary usage for this is to allow unit testing to occur on GitHub. An alternative usage is as @@ -115,11 +116,14 @@ def __init__( # noqa: C901,PLR0912,PLR0915 resolution = None offset = 0 noise = 0.01 - # FUTURE # if self._type is WfmDataType.Int8: - # FUTURE # yincr = self.amplitude / 230 - # FUTURE # if self._type is WfmDataType.Int16: - # FUTURE # yincr = self.amplitude / 58000 - yincr = 1 if self._type is WfmDataType.Float else 1 + if self._type is WfmDataType.Int8: + yincr = self.amplitude / 230 + if self._type is WfmDataType.Int16: + yincr = self.amplitude / 58000 + if self._type is WfmDataType.Float: + yincr = 1 + else: + yincr = 1 self._yincr = yincr @@ -246,27 +250,31 @@ def GetWaveform(self, request, context): # noqa: ARG002 context : Any This contains information relevant to the current gRPC call. """ + global connect_server + global verbose if verbose: print(f"TekHSI_NormalizedDataServer.GetWaveform({request.sourcename})") try: - if connect_server.dataaccess_allowed and request.sourcename in connect_server.data: - wfm = connect_server.data[request.sourcename] - chunksize = request.chunksize - for cur in range(0, len(wfm._vertical_data), chunksize): - reply = tekhsi_pb2.NormalizedReply( - headerordata=tekhsi_pb2.NormalizedReply.DataOrHeaderAccess( - chunk=tekhsi_pb2.NormalizedReply.WaveformSampleChunk( - data=wfm._vertical_data[cur : cur + chunksize], - ), - ), - ) + if connect_server.dataaccess_allowed: + if request.sourcename in connect_server.data: + wfm = connect_server.data[request.sourcename] + chunksize = request.chunksize + for cur in range(0, len(wfm._vertical_data), chunksize): + reply = tekhsi_pb2.NormalizedReply( + headerordata=tekhsi_pb2.NormalizedReply.DataOrHeaderAccess( + chunk=tekhsi_pb2.NormalizedReply.WaveformSampleChunk( + data=wfm._vertical_data[cur : cur + chunksize] + ) + ) + ) + yield reply + reply = tekhsi_pb2.NormalizedReply() + reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") yield reply - reply = tekhsi_pb2.NormalizedReply() - reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") - yield reply - return + return except Exception as e: print(e) + return def GetHeader(self, request, context): # noqa: ARG002 """The message returns the header (equivalent to preamble when using SCPI commands). @@ -279,43 +287,47 @@ def GetHeader(self, request, context): # noqa: ARG002 context : Any This contains information relevant to the current gRPC call. - Returns: + Returns ------- NormalizedReply The return reply contains the status + the requested header information. """ + global connect_server + global verbose + global acq_id if verbose: print(f"TekHSI_NormalizedDataServer.GetHeader({request.sourcename})") try: - if connect_server.dataaccess_allowed and request.sourcename in connect_server.data: - wfm = connect_server.data[request.sourcename] - reply = tekhsi_pb2.NormalizedReply() - reply.headerordata.header.dataid = acq_id - reply.headerordata.header.hasdata = True - reply.headerordata.header.horizontalspacing = wfm.xincr - reply.headerordata.header.horizontalUnits = "S" - reply.headerordata.header.horizontalzeroindex = wfm.trigger_index - reply.headerordata.header.noofsamples = wfm.length - reply.headerordata.header.sourcename = request.sourcename - reply.headerordata.header.sourcewidth = 4 - - if isinstance(data, AnalogWaveform): # noqa: F821 - reply.headerordata.header.wfmtype = 3 - elif isinstance(data, IQWaveform): # noqa: F821 - reply.headerordata.header.wfmtype = 6 - elif isinstance(data, DigitalWaveform): # noqa: F821 - reply.headerordata.header.wfmtype = 4 - - reply.headerordata.header.pairtype = 1 - reply.headerordata.header.verticaloffset = 0 - reply.headerordata.header.verticalspacing = 1.0 - reply.headerordata.header.verticalunits = "V" - reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") - return reply + if connect_server.dataaccess_allowed: + if request.sourcename in connect_server.data: + wfm = connect_server.data[request.sourcename] + reply = tekhsi_pb2.NormalizedReply() + reply.headerordata.header.dataid = acq_id + reply.headerordata.header.hasdata = True + reply.headerordata.header.horizontalspacing = wfm.xincr + reply.headerordata.header.horizontalUnits = "S" + reply.headerordata.header.horizontalzeroindex = wfm.trigger_index + reply.headerordata.header.noofsamples = wfm.length + reply.headerordata.header.sourcename = request.sourcename + reply.headerordata.header.sourcewidth = 4 + + if isinstance(data, AnalogWaveform): # noqa: F821 + reply.headerordata.header.wfmtype = 3 + elif isinstance(data, IQWaveform): # noqa: F821 + reply.headerordata.header.wfmtype = 6 + elif isinstance(data, DigitalWaveform): # noqa: F821 + reply.headerordata.header.wfmtype = 4 + + reply.headerordata.header.pairtype = 1 + reply.headerordata.header.verticaloffset = 0 + reply.headerordata.header.verticalspacing = 1.0 + reply.headerordata.header.verticalunits = "V" + reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") + return reply except Exception as e: print(e) return tekhsi_pb2.NormalizedReply( - status=tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_FAILURE"), + status=tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_FAILURE") ) @@ -339,31 +351,34 @@ def GetWaveform(self, request, context): # noqa: ARG002 context : Any This contains information relevant to the current gRPC call. - Returns: + Returns ------- NativeReply The return reply contains the status. The yield reply, returns each data chunk. """ + global connect_server + global verbose if verbose: print(f"TekHSI_NativeDataServer.GetWaveform({request.sourcename})") try: - if connect_server.dataaccess_allowed and request.sourcename in connect_server.data: - wfm = connect_server.data[request.sourcename] - chunksize = request.chunksize - raw_bytes = wfm._raw_data.tobytes() - for cur in range(0, len(raw_bytes), chunksize): - reply = tekhsi_pb2.RawReply( - headerordata=tekhsi_pb2.RawReply.DataOrHeaderAccess( - chunk=tekhsi_pb2.RawReply.WaveformSampleByteChunk( - data=raw_bytes[cur : cur + chunksize], - ), - ), - ) - yield reply - reply = tekhsi_pb2.RawReply() - reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") - return reply + if connect_server.dataaccess_allowed: + if request.sourcename in connect_server.data: + wfm = connect_server.data[request.sourcename] + chunksize = request.chunksize + raw_bytes = wfm._raw_data.tobytes() + for cur in range(0, len(raw_bytes), chunksize): + reply = tekhsi_pb2.RawReply( + headerordata=tekhsi_pb2.RawReply.DataOrHeaderAccess( + chunk=tekhsi_pb2.RawReply.WaveformSampleByteChunk( + data=raw_bytes[cur : cur + chunksize] + ) + ) + ) + yield reply + reply = tekhsi_pb2.RawReply() + reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") + return reply except Exception as e: print(e) return tekhsi_pb2.RawReply(status=tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_FAILURE")) @@ -384,60 +399,64 @@ def GetHeader(self, request, context): # noqa: ARG002,PLR0912,PLR0915,C901 NativeReply The return reply contains the status + the requested header information. """ + global connect_server + global verbose + global acq_id if verbose: print(f"TekHSI_NativeDataServer.GetHeader({request.sourcename})") try: - if connect_server.dataaccess_allowed and request.sourcename in connect_server.data: - wfm = connect_server.data[request.sourcename] - reply = tekhsi_pb2.RawReply() - reply.headerordata.header.dataid = acq_id - reply.headerordata.header.hasdata = True - reply.headerordata.header.horizontalspacing = wfm.xincr - reply.headerordata.header.horizontalUnits = "S" - reply.headerordata.header.horizontalzeroindex = wfm.trigger_index - reply.headerordata.header.noofsamples = wfm.length - reply.headerordata.header.sourcename = request.sourcename - - if isinstance(data, AnalogWaveform): # noqa: F821 - if wfm.type == WfmDataType.Int8: - reply.headerordata.header.sourcewidth = 1 - reply.headerordata.header.wfmtype = 1 - elif wfm.type == WfmDataType.Int16: - reply.headerordata.header.sourcewidth = 2 - reply.headerordata.header.wfmtype = 2 - elif wfm.type == WfmDataType.Float: - reply.headerordata.header.sourcewidth = 4 - reply.headerordata.header.wfmtype = 3 - else: - reply.headerordata.header.sourcewidth = 1 - reply.headerordata.header.wfmtype = 1 - elif isinstance(data, IQWaveform): # noqa: F821 - if wfm.type == WfmDataType.Int8: - reply.headerordata.header.sourcewidth = 1 - reply.headerordata.header.wfmtype = 6 - elif wfm.type == WfmDataType.Int16: - reply.headerordata.header.sourcewidth = 2 - reply.headerordata.header.wfmtype = 7 - else: - reply.headerordata.header.sourcewidth = 1 - reply.headerordata.header.wfmtype = 6 - elif isinstance(data, DigitalWaveform): # noqa: F821 - if wfm.type == WfmDataType.Int8: - reply.headerordata.header.sourcewidth = 1 - reply.headerordata.header.wfmtype = 4 - elif wfm.type == WfmDataType.Int16: - reply.headerordata.header.sourcewidth = 2 - reply.headerordata.header.wfmtype = 5 - else: - reply.headerordata.header.sourcewidth = 1 - reply.headerordata.header.wfmtype = 4 - - reply.headerordata.header.pairtype = 1 - reply.headerordata.header.verticaloffset = 0 - reply.headerordata.header.verticalspacing = wfm.yincr - reply.headerordata.header.verticalunits = "V" - reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") - return reply + if connect_server.dataaccess_allowed: + if request.sourcename in connect_server.data: + wfm = connect_server.data[request.sourcename] + reply = tekhsi_pb2.RawReply() + reply.headerordata.header.dataid = acq_id + reply.headerordata.header.hasdata = True + reply.headerordata.header.horizontalspacing = wfm.xincr + reply.headerordata.header.horizontalUnits = "S" + reply.headerordata.header.horizontalzeroindex = wfm.trigger_index + reply.headerordata.header.noofsamples = wfm.length + reply.headerordata.header.sourcename = request.sourcename + + if isinstance(data, AnalogWaveform): # noqa: F821 + if wfm.type == WfmDataType.Int8: + reply.headerordata.header.sourcewidth = 1 + reply.headerordata.header.wfmtype = 1 + elif wfm.type == WfmDataType.Int16: + reply.headerordata.header.sourcewidth = 2 + reply.headerordata.header.wfmtype = 2 + elif wfm.type == WfmDataType.Float: + reply.headerordata.header.sourcewidth = 4 + reply.headerordata.header.wfmtype = 3 + else: + reply.headerordata.header.sourcewidth = 1 + reply.headerordata.header.wfmtype = 1 + elif isinstance(data, IQWaveform): # noqa: F821 + if wfm.type == WfmDataType.Int8: + reply.headerordata.header.sourcewidth = 1 + reply.headerordata.header.wfmtype = 6 + elif wfm.type == WfmDataType.Int16: + reply.headerordata.header.sourcewidth = 2 + reply.headerordata.header.wfmtype = 7 + else: + reply.headerordata.header.sourcewidth = 1 + reply.headerordata.header.wfmtype = 6 + elif isinstance(data, DigitalWaveform): # noqa: F821 + if wfm.type == WfmDataType.Int8: + reply.headerordata.header.sourcewidth = 1 + reply.headerordata.header.wfmtype = 4 + elif wfm.type == WfmDataType.Int16: + reply.headerordata.header.sourcewidth = 2 + reply.headerordata.header.wfmtype = 5 + else: + reply.headerordata.header.sourcewidth = 1 + reply.headerordata.header.wfmtype = 4 + + reply.headerordata.header.pairtype = 1 + reply.headerordata.header.verticaloffset = 0 + reply.headerordata.header.verticalspacing = wfm.yincr + reply.headerordata.header.verticalunits = "V" + reply.status = tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_SUCCESS") + return reply except Exception as e: print(e) return tekhsi_pb2.RawReply(status=tekhsi_pb2.WfmReplyStatus.Value("WFMREPLYSTATUS_FAILURE")) @@ -485,7 +504,7 @@ def Connect(self, request, context): if self._connections.get(request.name): context.set_code(grpc.StatusCode.ALREADY_EXISTS) return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_INUSE_FAILURE"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_INUSE_FAILURE") ) self._connections[request.name] = True context.set_code(grpc.StatusCode.OK) @@ -495,13 +514,13 @@ def Connect(self, request, context): if verbose: print(f'connection successful - "{request.name}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS") ) except Exception as e: if verbose: print(f'** Exception:{e}: Connect - "{request.name}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) def Disconnect(self, request, context): # noqa: C901 @@ -536,7 +555,7 @@ def Disconnect(self, request, context): # noqa: C901 if verbose: print(f'Disconnect Success - but used a bad name {request.name}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) except Exception as e: if self._new_data: @@ -550,7 +569,7 @@ def Disconnect(self, request, context): # noqa: C901 if verbose: print(f'** Exception: Disconnect - "{request.name} - {e}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) def RequestNewSequence(self, request, context): @@ -561,6 +580,7 @@ def RequestNewSequence(self, request, context): else: print(f'RequestNewSequence Failed - No Connection "{request.name}"') + global connect_server mutex.acquire() if verbose: print("mutex-acquired: RequestNewSequence") @@ -571,7 +591,7 @@ def RequestNewSequence(self, request, context): print("mutex-released: RequestNewSequence") context.set_code(grpc.StatusCode.OK) return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS") ) except Exception as e: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) @@ -579,7 +599,7 @@ def RequestNewSequence(self, request, context): print(e) print(f'** Exception as: RequestNewSequence - "{request.name}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) def RequestAvailableNames(self, request, context): @@ -600,7 +620,7 @@ def RequestAvailableNames(self, request, context): print(f'** Exception: RequestNewSequence - "{request.name}"') context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return tekhsi_pb2.AvailableNamesReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) def WaitForDataAccess(self, request, context): @@ -622,14 +642,14 @@ def WaitForDataAccess(self, request, context): self._dataaccess_allowed = True context.set_code(grpc.StatusCode.OK) return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS") ) except Exception as e: if verbose: print(e) print(f'** Exception: WaitForDataAccess - "{request.name}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) def FinishedWithDataAccess(self, request, context): @@ -647,12 +667,12 @@ def FinishedWithDataAccess(self, request, context): print(f'FinishedWithDataAccess Bad Connection Name "{request.name}"') context.set_code(grpc.StatusCode.OK) return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_SUCCESS") ) if verbose: print(f'FinishedWithDataAccess Failed "{request.name} - No WaitForDataPending"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) except Exception as e: @@ -660,7 +680,7 @@ def FinishedWithDataAccess(self, request, context): print(e) print(f'** Exception: FinishedWithDataAccess - "{request.name}"') return tekhsi_pb2.ConnectReply( - status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED"), + status=tekhsi_pb2.ConnectStatus.Value("CONNECTSTATUS_UNSPECIFIED") ) @@ -672,6 +692,7 @@ def periodic_data_creation(): If you want to change the named sets of data returned you should modify 'make_new_data()' """ while True: + global connect_server global acq_id try: mutex.acquire() @@ -689,7 +710,7 @@ def make_new_data(): symbol names and associated data. This is called on initialization of the TekHSI_Connect service and periodically to create new data. The symbols defined here well be seen by the client. - Returns: + Returns ------- dict Returns a name/value dictionary defining a set of symbols and their associated waveforms.