Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/update operation syntax #56

Merged
merged 14 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions notebooks/signac_301_Aggregation_Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"\n",
"Aggregation allows a **signac-flow** operation to act on multiple jobs, rather than one job at a time.\n",
"\n",
"An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when the `@flow.aggregator` decorator is applied to an operation.\n",
"An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when a `flow.aggregator` object is provided to the `FlowProject.operation` decorator.\n",
"\n",
"Please refer to the [documentation](https://docs.signac.io/en/latest/aggregation.html) for detailed instructions on how to use aggregation."
]
Expand Down Expand Up @@ -189,8 +189,7 @@
" pass\n",
"\n",
"\n",
"@aggregator()\n",
"@AggregationProject.operation\n",
"@AggregationProject.operation(aggregator=aggregator=())\n",
"@AggregationProject.post(lambda *jobs: project.doc.get(\"average_temperature\", False))\n",
"def compute_average_temperature(*jobs):\n",
" \"\"\"Compute the average temperature using the state point parameter,\n",
Expand All @@ -201,8 +200,7 @@
" project.document[\"average_temperature\"] = float(average_temp)\n",
"\n",
"\n",
"@aggregator(sort_by=\"day\")\n",
"@AggregationProject.operation\n",
"@AggregationProject.operation(aggregator=aggregator(sort_by=\"day\"))\n",
"@AggregationProject.pre.after(compute_average_temperature)\n",
"def plot_daily_temperature(*jobs):\n",
" \"\"\"Graph of daily temperature for the year.\"\"\"\n",
Expand All @@ -224,8 +222,8 @@
" plt.show()\n",
"\n",
"\n",
"@aggregator(sort_by=\"day\", select=lambda job: job.sp[\"day\"] % 7 == 0)\n",
"@AggregationProject.operation\n",
"\n",
"@AggregationProject.operation(aggregator=aggregator(sort_by=\"day\", select=lambda job: job.sp[\"day\"] % 7 == 0))\n",
"def plot_weekly_temperature(*jobs):\n",
" \"\"\"Graph the temperature for only one day of each week.\"\"\"\n",
" print(\"Generating plot of weekly temperature.\")\n",
Expand Down
17 changes: 9 additions & 8 deletions projects/flow.2D-random-walk/src/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ def plot_mean_squared_displacement(job):


@agg_analyze_and_plot
@std_aggregator
@RandomWalkProject.pre(all_simulated)
@RandomWalkProject.post.true("msd_analyzed")
@RandomWalkProject.operation
@RandomWalkProject.operation(aggregator=std_aggregator)
def compute_mean_squared_displacement(*jobs):
"""Compute and store the mean squared displacement for all std."""
msd = np.zeros(jobs[0].doc.run_steps + 1)
Expand All @@ -142,12 +141,15 @@ def compute_mean_squared_displacement(*jobs):

# Since this uses a separate aggragator to restrict aggregates to the first 5 replicas,
# we cannot assign this operation to either agg_plot or agg_analyze_and_plot
@flow.aggregator.groupby(
"standard_deviation", select=lambda job: job.sp.replica <= 4, sort_by="replica"
)


@RandomWalkProject.pre(all_simulated)
@RandomWalkProject.post.true("plotted_walks")
@RandomWalkProject.operation
@RandomWalkProject.operation(
aggregator=flow.aggregator.groupby(
"standard_deviation", select=lambda job: job.sp.replica <= 4, sort_by="replica"
)
)
def plot_walks(*jobs):
"""Plot the first 5 replicas random walks for each standard_deviation."""
fig, ax = plt.subplots()
Expand All @@ -168,10 +170,9 @@ def plot_walks(*jobs):

@agg_analyze_and_plot
@agg_plot
@std_aggregator
@RandomWalkProject.pre(all_simulated)
@RandomWalkProject.post.true("plotted_histogram")
@RandomWalkProject.operation
@RandomWalkProject.operation(aggregator=std_aggregator)
def plot_histogram(*jobs):
"""Create a 2D histogram of the final positions of random walks per std."""
final_positions = np.array(
Expand Down
9 changes: 5 additions & 4 deletions projects/flow.aggregation-mpi/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow import FlowProject, aggregator, directives
from flow import FlowProject, aggregator


class Project(FlowProject):
Expand All @@ -22,9 +22,10 @@ def mpi_task(job, comm):
print(f"In the mpi_task function, {rank=} of {size=} has {data=}.")


@Project.operation
@directives(nranks=lambda *jobs: RANKS_PER_JOB * len(jobs))
@aggregator.groupsof(num=JOBS_PER_AGGREGATE)
@Project.operation(
directives={"nranks": lambda *jobs: RANKS_PER_JOB * len(jobs)},
aggregator=aggregator.groupsof(num=JOBS_PER_AGGREGATE),
)
def do_mpi_task(*jobs):
from mpi4py import MPI

Expand Down
6 changes: 2 additions & 4 deletions projects/flow.aggregation-plotting/src/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ def get_pressure(crystal_name, density):
return pressure


@aggregator.groupby("crystal", sort_by="density")
@Project.operation
@Project.operation(aggregator=aggregator.groupby("crystal", sort_by="density"))
def plot_pressure_by_crystal(*jobs):
"""Plot the pressure as a function of density for each group."""
pressures = {}
Expand All @@ -40,8 +39,7 @@ def plot_pressure_by_crystal(*jobs):
plt.close()


@aggregator(sort_by="density")
@Project.operation
@Project.operation(aggregator=aggregator(sort_by="density"))
def plot_pressure_all(*jobs):
"""Plot pressure for all data on the same axes."""
crystal_pressures = {"fcc": [], "bcc": []}
Expand Down
71 changes: 24 additions & 47 deletions projects/flow.gmx-lysozyme-in-water/project.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import flow
import pexpect # Used to automate interaction with GROMACS interface.
import signac
from flow import FlowProject

gmx_exec = "gmx" # or use gmx_mpi if available
mpi_exec = "mpirun"

"""Define file level constants."""

Expand Down Expand Up @@ -96,16 +94,10 @@ def _grompp_str(op_name, gro_name, checkpoint_file=None):
return workspace_command(cmd)


def _mdrun_str(op_name, np=1, nt=None, verbose=False):
def _mdrun_str(op_name, nt=None, verbose=False):
"""Helper function, returns mdrun command string for operation."""
num_threads = 1 if nt is None else nt
num_nodes = np // num_threads
cmd = (
"OMP_NUM_THREADS={num_threads} {mpi_exec} -n {np} {gmx} "
"mdrun -ntomp {num_threads} {verbose} -deffnm {op}"
).format(
np=num_nodes,
mpi_exec=mpi_exec,
cmd = ("{gmx} mdrun -ntomp {num_threads} {verbose} -deffnm {op}").format(
b-butler marked this conversation as resolved.
Show resolved Hide resolved
gmx=gmx_exec,
num_threads=num_threads,
op=op_name,
Expand All @@ -115,9 +107,8 @@ def _mdrun_str(op_name, np=1, nt=None, verbose=False):


# First three steps are simple configuration
@MyProject.operation
@MyProject.post.isfile(gro_file)
@flow.cmd
@MyProject.operation(cmd=True)
def pdb2gmx(job):
return workspace_command(
"{gmx} pdb2gmx -f {pdb_file} -o {gro_file} -water {water_model} "
Expand All @@ -131,10 +122,9 @@ def pdb2gmx(job):
)


@MyProject.operation
@MyProject.pre.after(pdb2gmx)
@MyProject.post.isfile(boxed_file)
@flow.cmd
@MyProject.operation(cmd=True)
def editconf(job):
return workspace_command(
"{gmx} editconf -f {gro_file} -o {boxed_file} -c -d {edge_spacing} "
Expand All @@ -148,10 +138,9 @@ def editconf(job):
)


@MyProject.operation
@MyProject.pre.isfile(boxed_file)
@MyProject.post.isfile(solvated_file)
@flow.cmd
@MyProject.operation(cmd=True)
def solvate(job):
return workspace_command(
"{gmx} solvate -cp {boxed_file} -cs {solvent_configuration} "
Expand All @@ -176,17 +165,16 @@ def solvate(job):
# automate responding to requested std input


@MyProject.operation
@MyProject.pre.isfile(solvated_file)
@MyProject.post.isfile(ionization_config)
@flow.cmd
@MyProject.operation(cmd=True)
def grompp_add_ions(job):
return _grompp_str("ions", solvated_file)
return _grompp_str("ions", solvated_file).format(job)


@MyProject.operation
@MyProject.pre.after(grompp_add_ions)
@MyProject.post(prepared_for_simulation)
@MyProject.operation
def ionize(job):
"""Exploit the pexpect module to run."""
with job:
Expand All @@ -210,73 +198,62 @@ def ionize(job):


# Minimization
@MyProject.operation
@MyProject.pre(prepared_for_simulation)
@MyProject.post.isfile(em_op + ".tpr")
@flow.cmd
@MyProject.operation(cmd=True)
def grompp_minim(job):
return _grompp_str("minim", ionized_file)
return _grompp_str("minim", ionized_file).format(job)


@MyProject.operation
@MyProject.pre.after(grompp_minim)
@MyProject.post.isfile(em_file)
@flow.cmd
@MyProject.operation(cmd=True)
def minim(job):
return _mdrun_str("minim")
return _mdrun_str("minim").format(job)


# Equilibration: NVT then NPT
@MyProject.operation
@MyProject.pre.after(minim)
@MyProject.post.isfile(nvt_op + ".tpr")
@flow.cmd
@MyProject.operation(cmd=True)
def grompp_nvt(job):
return _grompp_str("nvt", em_file)
return _grompp_str("nvt", em_file).format(job)


@MyProject.operation
@MyProject.pre.after(grompp_nvt)
@MyProject.post.isfile(nvt_file)
@flow.cmd
@flow.directives(np=16)
@MyProject.operation(cmd=True, directives={"np": 16})
def nvt(job):
return _mdrun_str("nvt")
return _mdrun_str("nvt").format(job)


@MyProject.operation
@MyProject.pre.after(nvt)
@MyProject.post.isfile(npt_op + ".tpr")
@flow.cmd
@MyProject.operation(cmd=True)
def grompp_npt(job):
return _grompp_str("npt", nvt_file)
return _grompp_str("npt", nvt_file).format(job)


@MyProject.operation
@MyProject.pre.isfile(npt_op + ".tpr")
@MyProject.post.isfile(npt_file)
@flow.cmd
@flow.directives(np=16)
@MyProject.operation(cmd=True, directives={"np": 16})
def npt(job):
return _mdrun_str("npt")
return _mdrun_str("npt").format(job)


# Final run
@MyProject.operation
@MyProject.pre.isfile(npt_file)
@MyProject.post.isfile(production_op + ".tpr")
@flow.cmd
@MyProject.operation(cmd=True)
def grompp_md(job):
return _grompp_str("md", npt_file)
return _grompp_str("md", npt_file).format(job)


@MyProject.operation
@MyProject.pre.after(grompp_md)
@MyProject.post(finished)
@flow.cmd
@flow.directives(np=16)
@MyProject.operation(cmd=True, directives={"np": 16})
def md(job):
return _mdrun_str("md")
return _mdrun_str("md").format(job)


if __name__ == "__main__":
Expand Down
18 changes: 7 additions & 11 deletions projects/flow.gmx-mtools/src/project.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Define the project's workflow logic."""
import flow
import mbuild as mb
import signac
from flow import FlowProject
Expand All @@ -23,7 +22,7 @@ def _mdrun_str(op_name):

def gromacs_command(name, gro, sys):
"""Simplify GROMACS operations"""
return "cd {{job.ws}} ; {} && {}".format(
return "{} && {}".format(
_grompp_str(project_root_directory, name, gro, sys),
_mdrun_str(name),
)
Expand Down Expand Up @@ -67,28 +66,25 @@ def initialize(job):
system.save("init.top", forcefield_name="oplsaa", overwrite=True)


@MyProject.operation
@MyProject.pre(initialized)
@MyProject.post(minimized)
@flow.cmd
@MyProject.operation(cmd=True, with_job=True)
def em(job):
return gromacs_command(name="em", gro="init", sys="init")
return gromacs_command(name="em", gro="init", sys="init").format(job)


@MyProject.operation
@MyProject.pre(minimized)
@MyProject.post(equilibrated)
@flow.cmd
@MyProject.operation(cmd=True, with_job=True)
def equil(job):
return gromacs_command(name="equil", gro="em", sys="init")
return gromacs_command(name="equil", gro="em", sys="init").format(job)


@MyProject.operation
@MyProject.pre(equilibrated)
@MyProject.post(sampled)
@flow.cmd
@MyProject.operation(cmd=True, with_job=True)
def sample(job):
return gromacs_command(name="sample", gro="equil", sys="init")
return gromacs_command(name="sample", gro="equil", sys="init").format(job)


if __name__ == "__main__":
Expand Down