-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
26 changed files
with
1,608 additions
and
57 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
from subprocess import check_output | ||
|
||
import logging | ||
import json | ||
import sys | ||
import time | ||
import os | ||
|
||
import numpy as np | ||
import elog | ||
from hutch_python.utils import safe_load | ||
from ophyd import EpicsSignalRO | ||
from ophyd import EpicsSignal | ||
from bluesky import RunEngine | ||
from bluesky.plans import scan | ||
from bluesky.plans import list_scan | ||
from ophyd import Component as Cpt | ||
from ophyd import Device | ||
from pcdsdevices.interface import BaseInterface | ||
from pcdsdevices.areadetector import plugins | ||
from xcs.db import daq | ||
from xcs.db import camviewer | ||
from xcs.db import RE | ||
from xcs.db import xcs_ccm as ccm | ||
from xcs.delay_scan import delay_scan | ||
from xcs.db import lxt_fast, lxt_fast_enc | ||
from pcdsdevices.device_types import Newport, IMS | ||
from pcdsdevices.evr import Trigger | ||
#from macros import * | ||
import time | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class User(): | ||
def __init__(self): | ||
|
||
with safe_load ('VH_motors'): | ||
self.vh_y = IMS('XCS:USR:MMS:15', name='vh_y') | ||
self.vh_x = IMS('XCS:USR:MMS:16', name='vh_x') | ||
self.vh_rot = IMS('XCS:USR:MMS:39', name='vh_rot') | ||
|
||
with safe_load ('LED'): | ||
self.led = Trigger('XCS:R42:EVR:01:TRIG9', name='led_delay') | ||
self.led_uS = MicroToNano() | ||
|
||
# with safe_load('Laser Phase Motor'): | ||
# from ophyd.epics_motor import EpicsMotor | ||
# self.las_phase = EpicsMotor('LAS:FS4:MMS:PH', name='las_phase') | ||
|
||
def set_current_position(self, motor, value): | ||
motor.set_use_switch.put(1) | ||
motor.set_use_switch.wait_for_connection() | ||
motor.user_setpoint.put(value, force=True) | ||
motor.user_setpoint.wait_for_connection() | ||
motor.set_use_switch.put(0) | ||
|
||
def lxt_fast_set_absolute_zero(self): | ||
currentpos = lxt_fast() | ||
currentenc = lxt_fast_enc.get() | ||
#elog.post('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos)) | ||
print('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos)) | ||
lxt_fast.set_current_position(0) | ||
lxt_fast_enc.set_zero() | ||
return | ||
|
||
def takeRun(self, nEvents, record=True, use_l3t=False): | ||
daq.configure(events=120, record=record, use_l3t=use_l3t) | ||
daq.begin(events=nEvents) | ||
daq.wait() | ||
daq.end_run() | ||
|
||
def pvascan(self, motor, start, end, nsteps, nEvents, record=None): | ||
currPos = motor.get() | ||
daq.configure(nEvents, record=record, controls=[motor]) | ||
RE(scan([daq], motor, start, end, nsteps)) | ||
motor.put(currPos) | ||
|
||
def pvdscan(self, motor, start, end, nsteps, nEvents, record=None): | ||
daq.configure(nEvents, record=record, controls=[motor]) | ||
currPos = motor.get() | ||
RE(scan([daq], motor, currPos + start, currPos + end, nsteps)) | ||
motor.put(currPos) | ||
|
||
def ascan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False): | ||
self.cleanup_RE() | ||
currPos = motor.wm() | ||
daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t) | ||
try: | ||
RE(scan([daq], motor, start, end, nsteps)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
motor.mv(currPos) | ||
|
||
def listscan(self, motor, posList, nEvents, record=True, use_l3t=False): | ||
self.cleanup_RE() | ||
currPos = motor.wm() | ||
daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t) | ||
try: | ||
RE(list_scan([daq], motor, posList)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
motor.mv(currPos) | ||
|
||
def dscan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False): | ||
self.cleanup_RE() | ||
daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t) | ||
currPos = motor.wm() | ||
try: | ||
RE(scan([daq], motor, currPos+start, currPos+end, nsteps)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
motor.mv(currPos) | ||
|
||
def a2scan(self, m1, a1, b1, m2, a2, b2, nsteps, nEvents, record=True, use_l3t=False): | ||
self.cleanup_RE() | ||
daq.configure(nEvents, record=record, controls=[m1, m2], use_l3t=use_l3t) | ||
try: | ||
RE(scan([daq], m1, a1, b1, m2, a2, b2, nsteps)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
|
||
def a3scan(self, m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps, nEvents, record=True): | ||
self.cleanup_RE() | ||
daq.configure(nEvents, record=record, controls=[m1, m2, m3]) | ||
try: | ||
RE(scan([daq], m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
|
||
def delay_scan(self, start, end, sweep_time, record=True, use_l3t=False, | ||
duration=None): | ||
"""Delay scan with the daq.""" | ||
self.cleanup_RE() | ||
daq.configure(events=None, duration=None, record=record, | ||
use_l3t=use_l3t, controls=[lxt_fast]) | ||
try: | ||
RE(delay_scan(daq, lxt_fast, [start, end], sweep_time, | ||
duration=duration)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
|
||
def empty_delay_scan(self, start, end, sweep_time, record=True, | ||
use_l3t=False, duration=None): | ||
"""Delay scan without the daq.""" | ||
self.cleanup_RE() | ||
#daq.configure(events=None, duration=None, record=record, | ||
# use_l3t=use_l3t, controls=[lxt_fast]) | ||
try: | ||
RE(delay_scan(None, lxt_fast, [start, end], sweep_time, | ||
duration=duration)) | ||
except Exception: | ||
logger.debug('RE Exit', exc_info=True) | ||
finally: | ||
self.cleanup_RE() | ||
|
||
def cleanup_RE(self): | ||
if not RE.state.is_idle: | ||
print('Cleaning up RunEngine') | ||
print('Stopping previous run') | ||
try: | ||
RE.stop() | ||
except Exception: | ||
pass | ||
|
||
|
||
class MicroToNano(): | ||
def __init__(self): | ||
self._offset_nano = 0 | ||
|
||
def setOffset_nano(self, offset): | ||
self._offset_nano = offset | ||
|
||
def setOffset_micro(self, offset): | ||
self._offset_nano = offset * 1000 | ||
|
||
def getOffset_nano(self): | ||
return self._offset_nano | ||
|
||
def __call__(self, micro): | ||
return (micro * 1000) + self._offset_nano | ||
|
||
|
||
#LXE is virtual motor easy to have linear power and talks to laser wave plate (newport) Friday | ||
#Saturday LXE_OPA | ||
|
Oops, something went wrong.