diff --git a/xcs/beamline.py b/xcs/beamline.py index ea7bbc8..3384511 100644 --- a/xcs/beamline.py +++ b/xcs/beamline.py @@ -20,6 +20,7 @@ with safe_load('Event Sequencer'): from pcdsdevices.sequencer import EventSequencer seq = EventSequencer('ECS:SYS0:4', name='seq_4') + seq2 = EventSequencer('ECS:SYS0:11', name='seq_11') with safe_load('LXE'): from pcdsdevices.lxe import LaserEnergyPositioner @@ -52,7 +53,7 @@ class LXE(LaserEnergyPositioner): lxe_opa_calib_file = '/reg/neh/operator/xcsopr/experiments/'+get_current_experiment('xcs')+'/wpcalib_opa' try: - lxe = LXE('XCS:LAS:MMN:04', calibration_file=lxe_opa_calib_file, name='lxe_opa') + lxe_opa = LXE('XCS:LAS:MMN:04', calibration_file=lxe_opa_calib_file, name='lxe_opa') except OSError: logger.error('Could not load file: %s', lxe_opa_calib_file) raise FileNotFoundError @@ -61,37 +62,58 @@ class LXE(LaserEnergyPositioner): from pcdsdevices.lxe import LaserEnergyPositioner, LaserTiming, LaserTimingCompensation from pcdsdevices.epics_motor import Newport - lens_h = Newport('XCS:LAS:MMN:05', name='lens_h') + + #las_wp = Newport('XCS:LAS:MMN:08', name='las_wp') + las_opa_wp = Newport('XCS:LAS:MMN:04', name='las_opa_wp') + lens_h = Newport('XCS:LAS:MMN:08', name='lens_h') lens_v = Newport('XCS:LAS:MMN:06', name='lens_v') lens_f = Newport('XCS:LAS:MMN:07', name='lens_f') pol_wp = Newport('XCS:USR:MMN:07', name='pol_wp') - #lxt_ttc = LaserTimingCompensation('', delay_prefix='XCS:LAS:MMN:01', laser_prefix='LAS:FS4', name='lxt_ttc') - #lxt_ttc.delay.n_bounces = 10 - - #lxt = lxt_ttc.laser - # It's okay to be a little unhappy, no need to whine about it # from ophyd.epics_motor import AlarmSeverity import logging # lxt_fast.tolerated_alarm = AlarmSeverity.MINOR logging.getLogger('pint').setLevel(logging.ERROR) -with safe_load('New lxt & lxt_ttc'): - from ophyd.device import Component as Cpt +#with safe_load('Old lxt & lxt_ttc'): +# from ophyd.device import Component as Cpt +# +# +# from pcdsdevices.epics_motor import Newport +# from pcdsdevices.lxe import LaserTiming +# from pcdsdevices.pseudopos import DelayMotor, SyncAxis, delay_class_factory +# +# DelayNewport = delay_class_factory(Newport) +# +# # Reconfigurable lxt_ttc +# # Any motor added in here will be moved in the group +# class LXTTTC(SyncAxis): +# lxt = Cpt(LaserTiming, 'LAS:FS4', name='lxt') +# txt = Cpt(DelayNewport, 'XCS:LAS:MMN:01', +# n_bounces=10, name='txt') +# +# tab_component_names = True +# scales = {'txt': -1} +# warn_deadband = 5e-14 +# fix_sync_keep_still = 'lxt' +# sync_limits = (-10e-6, 10e-6) +# +# lxt_ttc = LXTTTC('', name='lxt_ttc') +# lxt = lxt_ttc.lxt - from pcdsdevices.epics_motor import Newport +with safe_load('New lxt & lxt_ttc'): + from pcdsdevices.device import ObjectComponent as OCpt from pcdsdevices.lxe import LaserTiming - from pcdsdevices.pseudopos import DelayMotor, SyncAxis, delay_class_factory + from pcdsdevices.pseudopos import SyncAxis + from xcs.db import xcs_txt - DelayNewport = delay_class_factory(Newport) + lxt = LaserTiming('LAS:FS4', name='lxt') + xcs_txt.name = 'txt' - # Reconfigurable lxt_ttc - # Any motor added in here will be moved in the group class LXTTTC(SyncAxis): - lxt = Cpt(LaserTiming, 'LAS:FS4', name='lxt') - txt = Cpt(DelayNewport, 'XCS:LAS:MMN:01', - n_bounces=10, name='txt') + lxt = OCpt(lxt) + txt = OCpt(xcs_txt) tab_component_names = True scales = {'txt': -1} @@ -99,8 +121,9 @@ class LXTTTC(SyncAxis): fix_sync_keep_still = 'lxt' sync_limits = (-10e-6, 10e-6) + lxt_ttc = LXTTTC('', name='lxt_ttc') - lxt = lxt_ttc.lxt + with safe_load('Delay Scan'): from ophyd.device import Device, Component as Cpt @@ -114,11 +137,13 @@ class LXTTTC(SyncAxis): tt_ty = IMS('XCS:SB2:MMS:46',name='tt_ty') lib_y = IMS('XCS:USR:MMS:04',name='lib_y') det_y = IMS('XCS:USR:MMS:40',name='det_y') - lp = EpicsSignal('XCS:USR:ao1:7',name='lp') + + from xcs.devices import LaserShutter + lp = LaserShutter('XCS:USR:ao1:7', name='lp') def lp_close(): - lp.put('IN') + lp('IN') def lp_open(): - lp.put('OUT') + lp('OUT') #with safe_load('User Opal'): # from pcdsdevices.areadetector.detectors import PCDSDetector @@ -186,29 +211,12 @@ class RovingSpec(Device): # y_up_south_prefix='XCS:MON:MMS:28', in_pos=3.3, out_pos=13.18, # name='xcs_ccm') -with safe_load('User Dumb Motor'): - from pcdsdevices.epics_motor import IMS - top_slit_x = IMS('XCS:USR:MMS:33', name='top_slit_x') - top_slit_y = IMS('XCS:USR:MMS:34', name='top_slit_y') - bot_slit_x = IMS('XCS:USR:MMS:35', name='bot_slit_x') - #1z = IMS('XCS:USR:MMS:36', name='1z') - jet_z = IMS('XCS:USR:MMS:41', name='jet_z') - #mirror_y = IMS('XCS:USR:MMS:48', name='mirror_y') - #sam_x = IMS('XCS:USR:MMS:44', name='sam_x') - #sam_y = IMS('XCS:USR:MMS:43', name='sam_y') - #sam_th = IMS('XCS:USR:MMS:45', name='sam_th') - #huber_Rx = IMS('XCS:USR:MMS:32', name='huber_Rx') - #huber_x = IMS('XCS:USR:MMS:30', name='huber_x') - #huber_Ry = IMS('XCS:USR:MMS:29', name='huber_Ry') - #huber_y = IMS('XCS:USR:MMS:19', name='huber_y') - bot_slit_y = IMS('XCS:USR:MMS:37', name='bot_slit_y') - jet_y = IMS('XCS:USR:MMS:38', name='jet_y') - jet_x = IMS('XCS:USR:MMS:39', name='jet_x') - det_y = IMS('XCS:USR:MMS:40', name='det_y') - #jj2hg = IMS('XCS:USR:MMS:25', name='jj2hg') - #jj2ho = IMS('XCS:USR:MMS:26', name='jj2ho') - #jj2vg = IMS('XCS:USR:MMS:27', name='jj2vg') - #jj2vo = IMS('XCS:USR:MMS:28', name='jj2vo') +# +# this all thould go and we should start using the questionnaire. +# that's what it's goe. +# + + with safe_load('Timetool'): from pcdsdevices.timetool import TimetoolWithNav tt = TimetoolWithNav('XCS:SB2:TIMETOOL', name='xcs_timetool', prefix_det='XCS:GIGE:08') @@ -218,6 +226,15 @@ class RovingSpec(Device): from xcs.db import scan_pvs scan_pvs.enable() +with safe_load('XFLS Motors (Temporary)'): + from ophyd import Device, Component as Cpt + from pcdsdevices.epics_motor import IMS + from pcdsdevices.interface import BaseInterface + class XFLS(BaseInterface, Device): + x = Cpt(IMS, ':MMS:22', name='x') + y = Cpt(IMS, ':MMS:23', name='y') + z = Cpt(IMS, ':MMS:24', name='z') + crl2 = XFLS('XCS:SB2', name='xcs_xfls') with safe_load('Create Aliases'): #from xcs.db import at2l0 @@ -247,6 +264,9 @@ class RovingSpec(Device): from xcs.db import xcs_sb2_upstream_slits as s5 from xcs.db import xcs_sb2_downstream_slits as s6 + from xcs.db import at1l0 as fat1 + from xcs.db import at2l0 as fat2 + from xcs.db import xcs_attenuator as att from xcs.db import xcs_pulsepicker as pp from xcs.db import xcs_gon as gon @@ -255,9 +275,17 @@ class RovingSpec(Device): from xcs.db import xcs_lxt_fast as lxt_fast from xcs.db import xcs_ccm as ccm - ccmE = ccm.calc.energy + #from xcs.db import xcs_xfls as crl2 + from xcs.db import xcs_pfls as crl1 + + from xcs.db import xcs_samplestage + gon_sx = xcs_samplestage.x + gon_sy = xcs_samplestage.y + gon_sz = xcs_samplestage.z + + ccmE = ccm.energy ccmE.name = 'ccmE' - ccmE_vernier = ccm.calc.energy_with_vernier + ccmE_vernier = ccm.energy_with_vernier ccmE_vernier.name = 'ccmE_vernier' with safe_load('Pink/Mono Offset'): @@ -277,6 +305,11 @@ class RovingSpec(Device): lib_y: 'default' } +with safe_load('Syringe Pump'): + #from xcs.syringepump import SyringePump + from xcs.devices import SyringePump + syringepump=SyringePump('solvent_topup',"XCS:USR:ao1:0","XCS:USR:ao1:1") + with safe_load('import macros'): from xcs.macros import * @@ -291,3 +324,159 @@ class RovingSpec(Device): with safe_load('bluesky setup'): from bluesky.callbacks.mpl_plotting import initialize_qt_teleporter initialize_qt_teleporter() +with safe_load('ladm_det'): + xcsdet_y = IMS('XCS:LAM:MMS:07',name = 'xcsdet_y') + xcsdet_x = IMS('XCS:LAM:MMS:06',name = 'xcsdet_x') + +with safe_load('drift monitor'): + import numpy as np + import json + import sys + import time + import os + import socket + import logging + class drift(): + def drift_log(idata): + savefilename = "/cds/home/opr/xcsopr/experiments/xcsl2619/drift_log.txt" + currenttime = time.ctime() + out_f = open(savefilename,'a') + out_f.write(str(idata)+ "," + currenttime.split(" ")[3] +"\n") + out_f.close() + + def tt_rough_FB(ttamp_th = 0.02, ipm4_th = 500, tt_window = 0.05): + fbvalue = 0 # for drift record + while(1): + tenshots_tt = np.zeros([1,]) + dlen = 0 + while(dlen < 61): + #ttcomm = Popen("caget XPP:TIMETOOL:TTALL",shell = True, stdout=PIPE) + #ttdata = (ttcomm.communicate()[0]).decode() + ttall = EpicsSignal('XCS:TIMETOOL:TTALL') + ttdata = ttall.get() + + current_tt = ttdata[1,] + ttamp = ttdata[2,] + ipm4val = ttdata[3,] + ttfwhm = ttdata[5,] + #current_tt = float((ttdata.split(" "))[3]) + #ttamp = float((ttdata.split(" "))[4]) + #ipm2val = float((ttdata.split(" "))[5]) + #ttfwhm = float((ttdata.split(" "))[7]) + if(dlen%10 == 0): + print("tt_value",current_tt,"ttamp",ttamp,"ipm4",ipm4val) + if (ttamp > ttamp_th)and(ipm4val > ipm4_th)and(ttfwhm < 130)and(ttfwhm > 30)and(current_tt != tenshots_tt[-1,]):# for filtering the last one is for when DAQ is stopping + tenshots_tt = np.insert(tenshots_tt,dlen,current_tt) + dlen = np.shape(tenshots_tt)[0] + time.sleep(0.01) + tenshots_tt = np.delete(tenshots_tt,0) + ave_tt = np.mean(tenshots_tt) + print("Moving average of timetool value:", ave_tt) + + if np.abs(ave_tt) > tt_window: + ave_tt_second=-(ave_tt*1e-12) + lxt.mvr(ave_tt_second) + print("feedback %f ps"%ave_tt) + fbvalue = ave_tt + fbvalue + #drift_log(str(fbvalue)) + return + + + + def pid_control(kp,ki,kd,ave_data,faketime): + fd_value = kp*ave_data[0,] + ki*(np.sum(ave_data[:,]))+kd*((ave_data[1,]-ave_data[0,])/faketime) + return fd_value + def matlabPV_FB(feedbackvalue):#get and put timedelay signal + matPV = EpicsSignal('LAS:FS4:VIT:matlab:04') + org_matPV = matPV.get()#the matlab PV value before FB + fbvalns = feedbackvalue * 1e+9#feedback value in ns + fbinput = org_matPV + fbvalns#relative to absolute value + matPV.put(fbinput) + return + def get_ttall():#get timetool related signal + ttall = EpicsSignal('XCS:TIMETOOL:TTALL') + ttdata = ttall.get() + current_tt = ttdata[1,] + ttamp = ttdata[2,] + ipm4val = ttdata[3,] + ttfwhm = ttdata[5,] + return current_tt, ttamp, ipm4val, ttfwhm + def tt_recover(scanrange = 5e-12,stepsize = -0.5e-12,direction = "p",testshot = 240):#For tt_signal recover in 10 ps + #las.tt_y.umv(54.67)#LuAG to find tt signal + originaldelay = lxt() + if direction == "p": + print("Search tt signal from positive to negative") + lxt.mvr(scanrange) + time.sleep(0.5) + elif direction == "n": + lxt.mvr(-1*scanrange) + print("Search tt signal from negative to positive") + stepsize = -1 * stepsize + time.sleep(0.5) + j = 0 + while(abs(stepsize * j) < abs(scanrange * 2) ): + ttdata = np.zeros([testshot,]) + ii = 0 + for ii in range(testshot): + current_tt, ttamp, ipm2val, ttfwhm = drift.get_ttall()#get 240 shots to find timetool signal + if (ttamp > 0.03)and(ttfwhm < 130)and(ttfwhm > 70)and(ttamp<2): + ttdata[ii,] = ttamp + time.sleep(0.008) + print(ttdata) + if np.count_nonzero(ttdata[:,]) > 30:#1/4 shots have timetool signal + print("Found timetool signal and set current lxt to 0") + print(f"we will reset the current {lxt()} position to 0") + lxt.set_current_position(0) + #las.tt_y.umv(67.1777)#Switch to YAG + print("Please run las.tt_rough_FB()") + ttfb = input("Turn on feedback? yes(y) or No 'any other' ") + if ((ttfb == "yes") or (ttfb == "y")): + print("feedback on") + drift.tt_rough_FB(kp= 0.2,ki=0.1) + else: + print("No feedback yet") + return + else: + lxt.umvr(stepsize) + time.sleep(0.5) + print(f"searching timetool signal {lxt()}") + j = j + 1 + print("The script cannot find the timetool signal in this range. Try las.tt_find()") + + + return + + def tt_find(ini_delay = 10e-9):#tt signal find tool the ini_delay is now input argument + if lxt() != 0: + print('\033[1m'+ "Set current position to 0 to search" + '\033[0m') + return + elif lxt() ==0: + #las.tt_y.umv(54.67)#LuAG to find tt signal + delayinput = ini_delay#Search window + i = 0#iteration time + while(1):#20ns search until finding the correlation switched + print('\033[1m'+ "Can you see 'The positive correlation(p)' or 'The negative correlation(n)?' p/n or quit this script q"+'\033[0m') + bs = input()#input the current correlation + if i == 0:# for the initialization + prebs = bs + + if (i < 10)and(prebs == bs):#First search in 100 ns. 100 ns is too large. If cannot find in 10 iteration need to check the other side + if bs == "p": + delayinput = -1 * abs(delayinput) + lxt.mvr(delayinput) + i = i + 1 + print(f"Searching the negative correlation with 10ns. Number of iteration:{i}") + elif bs == "n":#find non-correlation + delayinput = abs(delayinput) + lxt.mvr(delayinput) + i = i + 1 + print(f"Searching the positive or no correlation with 10ns. Number of iteration:{i}") + elif bs == "q": + print("Quit") + return + else: + print('\033[1m'+"Can you see 'The positive correlation(p)'or'The negative correlation(n)?' p/n or quit this script q" + '\033[0m') + elif (prebs != bs): + print('\033[1m'+"Switch to binary search"+'\033[0m') + break + prebs = bs#the correlation change? diff --git a/xcs/devices.py b/xcs/devices.py new file mode 100644 index 0000000..afac60c --- /dev/null +++ b/xcs/devices.py @@ -0,0 +1,156 @@ +# Modified by tcouto 3/4/2014 for trigger mode "LE=Level Control" +from epics import PV +import pyca +#from psp import PV +from time import sleep +from ophyd import (Device, EpicsSignal, EpicsSignalRO, Component as Cpt, + FormattedComponent as FCpt) +from ophyd.signal import AttributeSignal + +import pcdsdevices.device_types +from pcdsdevices.inout import InOutPositioner +from subprocess import check_output +import time + +class LaserShutter(InOutPositioner): + """Controls shutter controlled by Analog Output""" + # EpicsSignals + voltage = Cpt(EpicsSignal, '') + state = FCpt(AttributeSignal, 'voltage_check') + # Constants + out_voltage = 4.5 + in_voltage = 0.0 + barrier_voltage = 1.4 + + @property + def voltage_check(self): + """Return the position we believe shutter based on the channel""" + if self.voltage.get() >= self.barrier_voltage: + return 'OUT' + else: + return 'IN' + + def _do_move(self, state): + """Override to just put to the channel""" + if state.name == 'IN': + self.voltage.put(self.in_voltage) + elif state.name == 'OUT': + self.voltage.put(self.out_voltage) + else: + raise ValueError("%s is in an invalid state", state) + + +#TTL I/O Operational Trigger +class SyringePump: + def __init__(self,name,base,ttl,vlow=0,vhigh=5): + self.name=name + self.base=PV(base) + self.ttl=PV(ttl) + self.vlow=vlow + self.vhigh=vhigh + self.delta=0.5 + self._is_initialized=False + def _initialize(self): + if not self._is_initialized: + self.base.put(self.vhigh) + self.ttl.put(self.vlow) + self._is_initialized=True + def _uninitialize(self): + if self._is_initialized: + self.base.put(self.vlow) + self.ttl.put(self.vlow) + self._is_initialized=False + def on(self): + if not self._is_initialized: + self._initialize() + self.ttl.put(self.vhigh) + sleep(1) + self.ttl.put(self.vlow) + sleep(1) + self.ttl.put(self.vhigh) + def off(self): + if not self._is_initialized: + self._initialize() + self.ttl.put(self.vhigh) + sleep(1) + self.ttl.put(self.vlow) + def status_string(self): + status_string='Syringepump is in unknown state' + if not self._is_initialized: + status_string='Syringepump is not initialized' + else: + if abs(self.ttl.get()-self.vhigh)