Skip to content

Commit

Permalink
Run file through autopep8.
Browse files Browse the repository at this point in the history
  • Loading branch information
omar-moreno committed Oct 17, 2023
1 parent 95799f0 commit 9202a3f
Showing 1 changed file with 108 additions and 78 deletions.
186 changes: 108 additions & 78 deletions python/pyrogue/protocols/epicsV4.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Title : PyRogue epics support
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Description:
# Module containing epics support classes and routines
# TODO:
# Not clear on to force a read on get
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------

import pyrogue
import time
Expand All @@ -30,34 +30,35 @@

def EpicsConvStatus(varValue):
if varValue.status == "AlarmLoLo":
return 5 # epicsAlarmLoLo
return 5 # epicsAlarmLoLo
elif varValue.status == "AlarmLow":
return 6 # epicsAlarmLow
return 6 # epicsAlarmLow
elif varValue.status == "AlarmHiHi":
return 3 # epicsAlarmHiHi
return 3 # epicsAlarmHiHi
elif varValue.status == "AlarmHigh":
return 4 # epicsAlarmHigh
return 4 # epicsAlarmHigh
else:
return 0


def EpicsConvSeverity(varValue):
if varValue.severity == "AlarmMinor":
return 1 # epicsSevMinor
return 1 # epicsSevMinor
elif varValue.severity == "AlarmMajor":
return 2 # epicsSevMajor
return 2 # epicsSevMajor
else:
return 0


class EpicsPvHandler(p4p.server.thread.Handler):
def __init__(self, valType, var, log):
self._valType = valType
self._var = var
self._log = log
self._var = var
self._log = log

def put(self, pv, op):
if self._var.isVariable and (self._var.mode == 'RW' or self._var.mode == 'WO'):
if self._var.isVariable and (
self._var.mode == 'RW' or self._var.mode == 'WO'):
try:
if self._valType == 'enum':
self._var.setDisp(str(op.value()))
Expand Down Expand Up @@ -92,7 +93,8 @@ def rpc(self, pv, op):
if ret is None:
ret = 'None'

v = p4p.Value(p4p.Type([('value',self._valType)]), {'value':ret})
v = p4p.Value(
p4p.Type([('value', self._valType)]), {'value': ret})
op.done(value=(v))

except Exception as msg:
Expand All @@ -101,21 +103,21 @@ def rpc(self, pv, op):
else:
op.done(error='Rpc Not Supported On Variables')

def onFirstConnect(self, pv): # may be omitted
#print(f"PV First connect called pv={pv}")
def onFirstConnect(self, pv): # may be omitted
# print(f"PV First connect called pv={pv}")
pass

def onLastDisconnect(self, pv): # may be omitted
#print(f"PV Last Disconnect called pv={pv}")
def onLastDisconnect(self, pv): # may be omitted
# print(f"PV Last Disconnect called pv={pv}")
pass


class EpicsPvHolder(object):
def __init__(self,provider,name,var,log):
self._var = var
def __init__(self, provider, name, var, log):
self._var = var
self._name = name
self._log = log
self._pv = None
self._log = log
self._pv = None

if self._var.isCommand:
typeStr = self._var.retTypeStr
Expand All @@ -125,7 +127,7 @@ def __init__(self,provider,name,var,log):
# Convert valType
if var.nativeType is np.ndarray:
self._valType = 'ndarray'
#self._valType = 's'
# self._valType = 's'
elif self._var.disp == 'enum':
self._valType = 'enum'
elif typeStr is None or var.nativeType is list or var.nativeType is dict:
Expand All @@ -134,7 +136,7 @@ def __init__(self,provider,name,var,log):

# Unsigned
if 'UInt' in typeStr:
m = re.search('^UInt(\\d+)\\.*',typeStr)
m = re.search('^UInt(\\d+)\\.*', typeStr)

if m is not None and m.lastindex == 1:
bits = int(m[1])
Expand All @@ -154,7 +156,7 @@ def __init__(self,provider,name,var,log):

# Signed
elif 'int' in typeStr or 'Int' in typeStr:
m = re.search('^Int(\\d+)\\.*',typeStr)
m = re.search('^Int(\\d+)\\.*', typeStr)

if m is not None and m.lastindex == 1:
bits = int(m[1])
Expand Down Expand Up @@ -191,42 +193,66 @@ def __init__(self,provider,name,var,log):
if varVal.valueDisp is None:
varVal.valueDisp = ''

self._log.info("Adding {} with type {} init={}".format(self._name,self._valType,varVal.valueDisp))
self._log.info(
"Adding {} with type {} init={}".format(
self._name,
self._valType,
varVal.valueDisp))
try:
if self._valType == 'ndarray':
# If a 1D array is encountered, use a NTScalar. Note, if an
# If a 1D array is encountered, use a NTScalar. Note, if an
# NTScalar is used, the values will be automatically converted
# to doubles.
if varVal.value.ndim == 1:
if varVal.value.ndim == 1:
nt = p4p.nt.NTScalar('ad')
else:
else:
nt = p4p.nt.NTNDArray()
iv = varVal.value
elif self._valType == 'enum':
nt = p4p.nt.NTEnum(display=False, control=False, valueAlarm=False)
nt = p4p.nt.NTEnum(
display=False,
control=False,
valueAlarm=False)
enum = list(self._var.enum.values())
iv = {'choices':enum, 'index':enum.index(varVal.valueDisp)}
iv = {'choices': enum, 'index': enum.index(varVal.valueDisp)}
elif self._valType == 's':
nt = p4p.nt.NTScalar(self._valType, display=False, control=False, valueAlarm=False)
nt = p4p.nt.NTScalar(
self._valType,
display=False,
control=False,
valueAlarm=False)
iv = nt.wrap(varVal.valueDisp)
else:
nt = p4p.nt.NTScalar(self._valType, display=True, control=True, valueAlarm=True)
#print(f"Setting value {varVal.value} to {self._name}")
nt = p4p.nt.NTScalar(
self._valType,
display=True,
control=True,
valueAlarm=True)
# print(f"Setting value {varVal.value} to {self._name}")
iv = nt.wrap(varVal.value)
except Exception as e:
raise Exception("Failed to add {} with type {} ndtype={} init={}. Error={}".format(self._name,self._valType,self._var.ndType,varVal.valueDisp,e))
raise Exception(
"Failed to add {} with type {} ndtype={} init={}. Error={}".format(
self._name,
self._valType,
self._var.ndType,
varVal.valueDisp,
e))

# Setup variable
try:
self._pv = p4p.server.thread.SharedPV(queue=None,
handler=EpicsPvHandler(self._valType,self._var,self._log),
initial=iv,
nt=nt,
options={})
self._pv = p4p.server.thread.SharedPV(queue=None, handler=EpicsPvHandler(
self._valType, self._var, self._log), initial=iv, nt=nt, options={})
except Exception as e:
raise Exception("Failed to start {} with type {} ndtype={} init={}. Error={}".format(self._name,self._valType,self._var.ndType,varVal.valueDisp,e))

provider.add(self._name,self._pv)
raise Exception(
"Failed to start {} with type {} ndtype={} init={}. Error={}".format(
self._name,
self._valType,
self._var.ndType,
varVal.valueDisp,
e))

provider.add(self._name, self._pv)
self._var.addListener(self._varUpdated)

# Update fields in numeric types
Expand All @@ -237,52 +263,54 @@ def __init__(self,provider,name,var,log):
curr.raw.alarm.severity = EpicsConvSeverity(varVal)
curr.raw.display.description = self._var.description

if self._var.units is not None:
curr.raw.display.units = self._var.units
if self._var.maximum is not None:
if self._var.units is not None:
curr.raw.display.units = self._var.units
if self._var.maximum is not None:
curr.raw.display.limitHigh = self._var.maximum
if self._var.minimum is not None:
curr.raw.display.limitLow = self._var.minimum
if self._var.minimum is not None:
curr.raw.display.limitLow = self._var.minimum

if self._var.lowWarning is not None:
curr.raw.valueAlarm.lowWarningLimit = self._var.lowWarning
if self._var.lowAlarm is not None:
curr.raw.valueAlarm.lowAlarmLimit = self._var.lowAlarm
if self._var.lowWarning is not None:
curr.raw.valueAlarm.lowWarningLimit = self._var.lowWarning
if self._var.lowAlarm is not None:
curr.raw.valueAlarm.lowAlarmLimit = self._var.lowAlarm
if self._var.highWarning is not None:
curr.raw.valueAlarm.highWarningLimit = self._var.highWarning
if self._var.highAlarm is not None:
curr.raw.valueAlarm.highAlarmLimit = self._var.highAlarm
curr.raw.valueAlarm.highWarningLimit = self._var.highWarning
if self._var.highAlarm is not None:
curr.raw.valueAlarm.highAlarmLimit = self._var.highAlarm

# Precision ?
self._pv.post(curr)

def _varUpdated(self,path,value):
def _varUpdated(self, path, value):
if self._valType == 'enum' or self._valType == 's':
self._pv.post(value.valueDisp)
elif self._valType == 'ndarray':
self._pv.post(value.value)
else:
curr = self._pv.current()
curr.raw.value = value.value
curr.raw.alarm.status = EpicsConvStatus(value)
curr.raw.value = value.value
curr.raw.alarm.status = EpicsConvStatus(value)
curr.raw.alarm.severity = EpicsConvSeverity(value)
curr.raw['timeStamp.secondsPastEpoch'], curr.raw['timeStamp.nanoseconds'] = divmod(float(time.time_ns()), 1.0e9)
curr.raw['timeStamp.secondsPastEpoch'], curr.raw['timeStamp.nanoseconds'] = divmod(
float(time.time_ns()), 1.0e9)
self._pv.post(curr)


class EpicsPvServer(object):
"""
Class to contain an epics PV server
"""
def __init__(self,*,base,root,incGroups,excGroups,pvMap=None):
self._root = root
self._base = base
self._log = pyrogue.logInit(cls=self)
self._server = None

def __init__(self, *, base, root, incGroups, excGroups, pvMap=None):
self._root = root
self._base = base
self._log = pyrogue.logInit(cls=self)
self._server = None
self._incGroups = incGroups
self._excGroups = excGroups
self._pvMap = pvMap
self._started = False
self._pvMap = pvMap
self._started = False

self._provider = p4p.server.StaticProvider(__name__)

Expand All @@ -301,7 +329,8 @@ def _start(self):
self._list = {}

if not self._root.running:
raise Exception("Epics can not be setup on a tree which is not started")
raise Exception(
"Epics can not be setup on a tree which is not started")

# Figure out mapping mode
if self._pvMap is None:
Expand All @@ -315,35 +344,36 @@ def _start(self):
eName = None

if doAll:
if v.filterByGroup(self._incGroups,self._excGroups):
eName = self._base + ':' + v.path.replace('.',':')
if v.filterByGroup(self._incGroups, self._excGroups):
eName = self._base + ':' + v.path.replace('.', ':')
self._pvMap[v.path] = eName
elif v.path in self._pvMap:
eName = self._pvMap[v.path]

if eName is not None:
pvh = EpicsPvHolder(self._provider,eName,v,self._log)
pvh = EpicsPvHolder(self._provider, eName, v, self._log)
self._list[v] = pvh

# Check for missing map entries
if len(self._pvMap) != len(self._list):
for k,v in self._pvMap.items():
for k, v in self._pvMap.items():
if k not in self._list:
self._log.error(f"Failed to find {k} from P4P mapping in Rogue tree!")
self._log.error(
f"Failed to find {k} from P4P mapping in Rogue tree!")

self._server = p4p.server.Server(providers=[self._provider])

def list(self):
return self._pvMap

def dump(self,fname=None):
def dump(self, fname=None):
if fname is not None:
try:
with open(fname,'w') as f:
for k,v in self._pvMap.items():
print("{} -> {}".format(v,k),file=f)
with open(fname, 'w') as f:
for k, v in self._pvMap.items():
print("{} -> {}".format(v, k), file=f)
except Exception:
raise Exception("Failed to dump epics map to {}".format(fname))
else:
for k,v in self._pvMap.items():
print("{} -> {}".format(v,k))
for k, v in self._pvMap.items():
print("{} -> {}".format(v, k))

0 comments on commit 9202a3f

Please sign in to comment.