Skip to content

Commit

Permalink
Merge pull request #340 from lcls-users/BUG/js_slurm_account
Browse files Browse the repository at this point in the history
Pass SLURM Account Information to `JobScheduler`
  • Loading branch information
fredericpoitevin authored Nov 7, 2023
2 parents 7bdd308 + 31c740e commit b9ac842
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 20 deletions.
17 changes: 12 additions & 5 deletions btx/diagnostics/geoptimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ class Geoptimizer:
"""
Class for refining the geometry.
"""
def __init__(self, queue, task_dir, scan_dir, runs, input_geom, dx_scan, dy_scan, dz_scan):

def __init__(self, queue, task_dir, scan_dir, runs, input_geom, dx_scan, dy_scan, dz_scan,
slurm_account="lcls"
):
self.slurm_account = slurm_account
self.queue = queue # queue to submit jobs to, str
self.task_dir = task_dir # path to indexing directory, str
self.scan_dir = scan_dir # path to scan directory, str
Expand Down Expand Up @@ -115,7 +117,8 @@ def launch_indexing(self, exp, det_type, params, cell_file):
idxr = Indexer(exp=exp, run=run, det_type=det_type, tag=params.tag, tag_cxi=params.get('tag_cxi'), taskdir=self.task_dir,
geom=gfile, cell=cell_file, int_rad=params.int_radius, methods=params.methods, tolerance=params.tolerance, no_revalidate=params.no_revalidate,
multi=params.multi, profile=params.profile, queue=self.queue, ncores=params.get('ncores') if params.get('ncores') is not None else 64,
time=params.get('time') if params.get('time') is not None else '1:00:00')
time=params.get('time') if params.get('time') is not None else '1:00:00',
slurm_account=self.slurm_account)
idxr.tmp_exe = jobfile
idxr.stream = stream
idxr.launch(addl_command=f"echo {jobname} | tee -a {statusfile}\n",
Expand Down Expand Up @@ -153,7 +156,9 @@ def launch_stream_wrangling(self, params):
ncores=params.get('ncores') if params.get('ncores') is not None else 16,
cell_out=os.path.join(celldir, f"g{num}.cell"),
cell_ref=params.get('ref_cell'),
addl_command=f"echo {jobname} | tee -a {statusfile}\n")
addl_command=f"echo {jobname} | tee -a {statusfile}\n",
slurm_account=self.slurm_account
)
jobnames.append(jobname)
time.sleep(self.frequency)

Expand Down Expand Up @@ -189,7 +194,9 @@ def launch_merging(self, params):
cellfile = os.path.join(self.scan_dir, f"cell/g{num}.cell")

stream_to_mtz = StreamtoMtz(instream, params.symmetry, mergedir, cellfile, queue=self.queue, tmp_exe=jobfile,
ncores=params.get('ncores') if params.get('ncores') is not None else 16)
ncores=params.get('ncores') if params.get('ncores') is not None else 16,
slurm_account=self.slurm_account
)
stream_to_mtz.cmd_partialator(iterations=params.iterations, model=params.model,
min_res=params.get('min_res'), push_res=params.get('push_res'))
stream_to_mtz.cmd_compare_hkl(foms=['CCstar','Rsplit'], nshells=1, highres=params.get('highres'))
Expand Down
5 changes: 3 additions & 2 deletions btx/interfaces/imtz.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def f2mtz_command(outmtz):

return command

def run_dimple(mtz, pdb, outdir, queue='ffbh3q', ncores=16, anomalous=False):
def run_dimple(mtz, pdb, outdir, queue='milano', ncores=16, anomalous=False, slurm_account="lcls"):
"""
Run dimple to solve the structure:
http://ccp4.github.io/dimple/.
Expand All @@ -166,7 +166,8 @@ def run_dimple(mtz, pdb, outdir, queue='ffbh3q', ncores=16, anomalous=False):
logdir=outdir,
ncores=ncores,
jobname=f'dimple',
queue=queue)
queue=queue,
account=slurm_account)
js.write_header()
js.write_main(command + "\n", dependencies=['ccp4'])
js.clean_up()
Expand Down
7 changes: 5 additions & 2 deletions btx/interfaces/istream.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ def cluster_cell_params(cell, out_clusters, out_cell, in_cell=None, eps=5, min_s
return clustering.labels_

def launch_stream_analysis(in_stream, out_stream, fig_dir, tmp_exe, queue, ncores,
cell_only=False, cell_out=None, cell_ref=None, addl_command=None):
cell_only=False, cell_out=None, cell_ref=None, addl_command=None,
slurm_account="lcls"):

"""
Launch stream analysis task using iScheduler.
Expand Down Expand Up @@ -631,7 +633,8 @@ def launch_stream_analysis(in_stream, out_stream, fig_dir, tmp_exe, queue, ncore
if cell_ref is not None:
command += f" --cell_ref={cell_ref}"

js = JobScheduler(tmp_exe, ncores=ncores, jobname=f'stream_analysis', queue=queue)
js = JobScheduler(tmp_exe, ncores=ncores, jobname=f'stream_analysis', queue=queue,
account=slurm_account)
js.write_header()
js.write_main(f"{command}\n")
js.write_main(f"cat {in_stream} > {out_stream}\n")
Expand Down
7 changes: 5 additions & 2 deletions btx/processing/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Indexer:

def __init__(self, exp, run, det_type, tag, taskdir, geom, cell=None, int_rad='4,5,6', methods='mosflm',
tolerance='5,5,5,1.5', tag_cxi=None, no_revalidate=True, multi=True, profile=True,
ncores=64, queue='milano', time='1:00:00', *, mpi_init = False):
ncores=64, queue='milano', time='1:00:00', *, mpi_init = False, slurm_account="lcls"):

if mpi_init:
from mpi4py import MPI
Expand Down Expand Up @@ -50,6 +50,7 @@ def __init__(self, exp, run, det_type, tag, taskdir, geom, cell=None, int_rad='4
self.ncores = ncores # int, number of cores to parallelize indexing across
self.queue = queue # str, submission queue
self.time = time # str, time limit
self.slurm_account = slurm_account
self._retrieve_paths()

def _retrieve_paths(self):
Expand Down Expand Up @@ -97,7 +98,9 @@ def launch(self, addl_command=None, dont_report=False):
if addl_command is not None:
command += f"\n{addl_command}"

js = JobScheduler(self.tmp_exe, ncores=self.ncores, jobname=f'idx_r{self.run:04}', queue=self.queue, time=self.time)
js = JobScheduler(self.tmp_exe, ncores=self.ncores,
jobname=f'idx_r{self.run:04}', queue=self.queue,
time=self.time, account=self.slurm_account)
js.write_header()
js.write_main(command, dependencies=['crystfel'] + self.methods.split(','))
js.clean_up()
Expand Down
8 changes: 5 additions & 3 deletions btx/processing/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ class StreamtoMtz:
input stream file.
"""

def __init__(self, input_stream, symmetry, taskdir, cell, ncores=16, queue='ffbh3q', tmp_exe=None, mtz_dir=None, anomalous=False,
mpi_init=False):
def __init__(self, input_stream, symmetry, taskdir, cell, ncores=16, queue='milano', tmp_exe=None, mtz_dir=None, anomalous=False,
mpi_init=False, slurm_account="lcls"):
self.stream = input_stream # file of unmerged reflections, str
self.symmetry = symmetry # point group symmetry, str
self.taskdir = taskdir # path for storing output, str
self.cell = cell # pdb or CrystFEL cell file, str
self.ncores = ncores # int, number of cores for merging
self.queue = queue # cluster to submit job to
self.slurm_account = slurm_account
self.mtz_dir = mtz_dir # directory to which to transfer mtz
self.anomalous = anomalous # whether to separate Bijovet pairs
self._set_up(tmp_exe)
Expand Down Expand Up @@ -61,7 +62,8 @@ def _set_up(self, tmp_exe):
# make path to executable
if tmp_exe is None:
tmp_exe = os.path.join(self.taskdir ,f'merge.sh')
self.js = JobScheduler(tmp_exe, ncores=self.ncores, jobname=f'merge', queue=self.queue)
self.js = JobScheduler(tmp_exe, ncores=self.ncores, jobname=f'merge',
queue=self.queue, account=self.slurm_account)
self.js.write_header()

# retrieve paths
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
setup:
queue: 'milano'
account: ''
root_dir: ''
exp: ''
run: 5
Expand Down
2 changes: 2 additions & 0 deletions scripts/setup_btx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ mkdir -p ${EXP_DIR}/scratch/btx/launchpad
cp ${BTX_DIR}/config/config.yaml ${EXP_DIR}/scratch/btx/yamls
sed -i "s|root_dir: ''|root_dir: \'${EXP_DIR}/scratch/btx/\'|g" ${EXP_DIR}/scratch/btx/yamls/config.yaml
sed -i "s|exp: ''|exp: \'${EXP}\'|g" ${EXP_DIR}/scratch/btx/yamls/config.yaml
sed -i "s|queue: ''|queue: \'${QUEUE}\'|g" ${EXP_DIR}/scratch/btx/yamls/config.yaml
sed -i "s|account: ''|account: \'${ACCOUNT}\'|g" ${EXP_DIR}/scratch/btx/yamls/config.yaml

chmod -R o+r ${EXP_DIR}/scratch/btx
chmod -R o+w ${EXP_DIR}/scratch/btx
Expand Down
18 changes: 12 additions & 6 deletions scripts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def run_analysis(config):
js = JobScheduler(os.path.join(".", f'ra_{setup.run:04}.sh'),
queue=setup.queue,
ncores=task.ncores,
jobname=f'ra_{setup.run:04}')
jobname=f'ra_{setup.run:04}',
account=setup.account)
js.write_header()
js.write_main(f"{command}\n", dependencies=['psana'])
js.clean_up()
Expand Down Expand Up @@ -194,7 +195,7 @@ def index(config):
indexer_obj = Indexer(exp=config.setup.exp, run=config.setup.run, det_type=config.setup.det_type, tag=task.tag, tag_cxi=task.get('tag_cxi'), taskdir=taskdir,
geom=geom_file, cell=task.get('cell'), int_rad=task.int_radius, methods=task.methods, tolerance=task.tolerance, no_revalidate=task.no_revalidate,
multi=task.multi, profile=task.profile, queue=setup.get('queue'), ncores=task.get('ncores') if task.get('ncores') is not None else 64,
time=task.get('time') if task.get('time') is not None else '1:00:00', mpi_init = False)
time=task.get('time') if task.get('time') is not None else '1:00:00', mpi_init = False, slurm_account=setup.account)
logger.debug(f'Generating indexing executable for run {setup.run} of {setup.exp}...')
indexer_obj.launch()
logger.info(f'Indexing launched!')
Expand Down Expand Up @@ -267,7 +268,8 @@ def stream_analysis(config):
ncores=task.get('ncores') if task.get('ncores') is not None else 6,
cell_only=task.get('cell_only') if task.get('cell_only') is not None else False,
cell_out=os.path.join(setup.root_dir, 'cell', f'{task.tag}.cell'),
cell_ref=task.get('ref_cell'))
cell_ref=task.get('ref_cell'),
slurm_account=setup.account)
logger.info(f'Stream analysis launched')

def determine_cell(config):
Expand Down Expand Up @@ -306,7 +308,8 @@ def merge(config):
stream_to_mtz = StreamtoMtz(input_stream, task.symmetry, taskdir, cellfile, queue=setup.get('queue'),
ncores=task.get('ncores') if task.get('ncores') is not None else 16,
mtz_dir=os.path.join(setup.root_dir, "solve", f"{task.tag}"),
anomalous=task.get('anomalous') if task.get('anomalous') is not None else False)
anomalous=task.get('anomalous') if task.get('anomalous') is not None else False,
slurm_account=setup.account)
stream_to_mtz.cmd_partialator(iterations=task.iterations, model=task.model,
min_res=task.get('min_res'), push_res=task.get('push_res'), max_adu=task.get('max_adu'))
for ns in [1, task.nshells]:
Expand All @@ -328,7 +331,8 @@ def solve(config):
taskdir,
queue=setup.get('queue'),
ncores=task.get('ncores') if task.get('ncores') is not None else 16,
anomalous=task.get('anomalous') if task.get('anomalous') is not None else False)
anomalous=task.get('anomalous') if task.get('anomalous') is not None else False,
slurm_account=setup.account)
logger.info(f'Dimple launched!')

def refine_geometry(config, task=None):
Expand Down Expand Up @@ -362,7 +366,9 @@ def refine_geometry(config, task=None):
geom_file,
task.dx,
task.dy,
task.dz)
task.dz,
slurm_account=setup.account
)
geopt.launch_indexing(setup.exp, setup.det_type, config.index, cell_file)
geopt.launch_stream_wrangling(config.stream_analysis)
geopt.launch_merging(config.merge)
Expand Down

0 comments on commit b9ac842

Please sign in to comment.