Skip to content

Commit

Permalink
First draft of a ProcessorPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pckroon committed May 7, 2024
1 parent e9bf1c9 commit ff636ed
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 150 deletions.
188 changes: 94 additions & 94 deletions bin/martinize2
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import vermouth
import vermouth.forcefield
from vermouth.file_writer import deferred_open, DeferredFileWriter
from vermouth import DATA_PATH
from vermouth.processors.processor import ProcessorPipeline
from vermouth.dssp import dssp
from vermouth.dssp.dssp import (
AnnotateDSSP,
AnnotateMartiniSecondaryStructures,
AnnotateResidues,
)
from vermouth.rcsu import ComputeStructuralGoBias, VirtualSiteCreator
from vermouth.log_helpers import (
StyleAdapter,
BipolarFormatter,
Expand All @@ -50,7 +52,6 @@ from vermouth.map_input import (
combine_mappings,
)
from vermouth.rcsu.contact_map import read_go_map
from vermouth.rcsu.go_pipeline import GoPipeline
from vermouth.citation_parser import citation_formatter
from vermouth.gmx.topology import write_gmx_topology

Expand Down Expand Up @@ -84,30 +85,8 @@ LOGGER = StyleAdapter(LOGGER)
VERSION = "martinize with vermouth {}".format(vermouth.__version__)


def read_system(path, ignore_resnames=(), ignh=None, modelidx=None):
"""
Read a system from a PDB or GRO file.
This function guesses the file type based on the file extension.
The resulting system does not have a force field and may not have edges.
"""
system = vermouth.System()
file_extension = path.suffix.upper()[1:] # We do not keep the dot
if file_extension in ["PDB", "ENT"]:
vermouth.PDBInput(
str(path), exclude=ignore_resnames, ignh=ignh, modelidx=modelidx
).run_system(system)
elif file_extension in ["GRO"]:
vermouth.GROInput(str(path), exclude=ignore_resnames, ignh=ignh).run_system(
system
)
else:
raise ValueError('Unknown file extension "{}".'.format(file_extension))
return system


def pdb_to_universal(
pipeline,
system,
delete_unknown=False,
force_field=None,
Expand All @@ -133,62 +112,64 @@ def pdb_to_universal(
canonicalized.force_field = force_field

LOGGER.info("Guessing the bonds.", type="step")
vermouth.MakeBonds(
allow_name=bonds_from_name, allow_dist=bonds_from_dist, fudge=bonds_fudge
).run_system(canonicalized)
vermouth.MergeNucleicStrands().run_system(canonicalized)
pipeline.add(
vermouth.MakeBonds(
allow_name=bonds_from_name, allow_dist=bonds_from_dist, fudge=bonds_fudge
)
)
pipeline.add(vermouth.MergeNucleicStrands())
if write_graph is not None:
vermouth.pdb.write_pdb(
canonicalized, str(write_graph), omit_charges=True, defer_writing=False
canonicalized, write_graph, omit_charges=True, defer_writing=False
)

LOGGER.debug("Annotating required mutations and modifications.", type="step")
vermouth.AnnotateMutMod(modifications, mutations).run_system(canonicalized)
pipeline.add(vermouth.AnnotateMutMod(modifications, mutations))
LOGGER.info("Repairing the graph.", type="step")
vermouth.RepairGraph(delete_unknown=delete_unknown, include_graph=False).run_system(
canonicalized
)
pipeline.add(vermouth.RepairGraph(delete_unknown=delete_unknown, include_graph=False))
if write_repair is not None:
vermouth.pdb.write_pdb(
canonicalized,
str(write_repair),
write_repair,
omit_charges=True,
nan_missing_pos=True,
defer_writing=False,
)
LOGGER.info("Dealing with modifications.", type="step")
vermouth.CanonicalizeModifications().run_system(canonicalized)
pipeline.add(vermouth.CanonicalizeModifications())
if write_canon is not None:
vermouth.pdb.write_pdb(
canonicalized,
str(write_canon),
write_canon,
omit_charges=True,
nan_missing_pos=True,
defer_writing=False,
)
vermouth.AttachMass(attribute="mass").run_system(canonicalized)
pipeline.add(vermouth.AttachMass(attribute="mass"))
return canonicalized


def martinize(system, mappings, to_ff, delete_unknown=False):
def martinize(pipeline, system, mappings, to_ff, delete_unknown=False):
"""
Convert a system from one force field to an other at lower resolution.
"""
LOGGER.info("Creating the graph at the target resolution.", type="step")
vermouth.DoMapping(
mappings=mappings,
to_ff=to_ff,
delete_unknown=delete_unknown,
attribute_keep=("cgsecstruct", "chain"),
attribute_must=("resname",),
attribute_stash=("resid",),
).run_system(system)
pipeline.add(
vermouth.DoMapping(
mappings=mappings,
to_ff=to_ff,
delete_unknown=delete_unknown,
attribute_keep=("cgsecstruct", "chain"),
attribute_must=("resname",),
attribute_stash=("resid",),
)
)
LOGGER.info("Averaging the coordinates.", type="step")
vermouth.DoAverageBead(ignore_missing_graphs=True).run_system(system)
pipeline.add(vermouth.DoAverageBead(ignore_missing_graphs=True))
LOGGER.info("Applying the links.", type="step")
vermouth.DoLinks().run_system(system)
pipeline.add(vermouth.DoLinks())
LOGGER.info("Placing the charge dummies.", type="step")
vermouth.LocateChargeDummies().run_system(system)
pipeline.add(vermouth.LocateChargeDummies())
return system


Expand Down Expand Up @@ -851,17 +832,20 @@ def entry():
if not any("nter" in resspec for resspec in resspecs):
args.modifications.append(["nter", "N-ter"])

# Reading the input structure.
# So far, we assume we only go from atomistic to martini. We want the
# input structure to be a clean universal system.
# For now at least, we silently delete molecules with unknown blocks.
system = read_system(
args.inpath,
ignore_resnames=ignore_res,
ignh=args.ignore_h,
modelidx=args.modelidx,
)
system = vermouth.System()
pipeline = ProcessorPipeline(name='martinize2')
to_universal = ProcessorPipeline(name='to universal')

file_extension = args.inpath.suffix.upper()[1:] # We do not keep the dot
if file_extension in ["PDB", "ENT"]:
to_universal.add(vermouth.PDBInput(args.inpath, exclude=ignore_res, ignh=args.ignore_h, modelidx=args.modelidx))
elif file_extension in ["GRO"]:
to_universal.add(vermouth.GROInput(args.inpath, exclude=ignore_res, ignh=args.ignore_h))
else:
raise ValueError('Unknown file extension "{}".'.format(file_extension))

system = pdb_to_universal(
to_universal,
system,
delete_unknown=True,
force_field=known_force_fields[from_ff],
Expand All @@ -874,24 +858,30 @@ def entry():
write_repair=args.write_repair,
write_canon=args.write_canon,
)
pipeline.add(to_universal)

LOGGER.info("Read input.", type="step")
for molecule in system.molecules:
LOGGER.debug("Read molecule {}.", molecule, type="step")

martinize_pipeline = ProcessorPipeline(name='martinize')

target_ff = known_force_fields[args.to_ff]

if args.dssp:
if not isinstance(args.dssp, str):
args.dssp = None
AnnotateDSSP(executable=args.dssp, savedir='.').run_system(system)
AnnotateMartiniSecondaryStructures().run_system(system)
martinize_pipeline.add(AnnotateDSSP(executable=args.dssp, savedir="."))
martinize_pipeline.add(AnnotateMartiniSecondaryStructures())
elif args.ss is not None:
AnnotateResidues(
attribute="secstruct",
sequence=args.ss,
molecule_selector=selectors.is_protein,
).run_system(system)
AnnotateMartiniSecondaryStructures().run_system(system)
martinize_pipeline.add(
AnnotateResidues(
attribute="secstruct",
sequence=args.ss,
molecule_selector=selectors.is_protein,
)
)
martinize_pipeline.add(AnnotateMartiniSecondaryStructures())
elif args.collagen:
if not target_ff.has_feature("collagen"):
LOGGER.warning(
Expand All @@ -900,27 +890,29 @@ def entry():
target_ff.name,
type="missing-feature",
)
AnnotateResidues(
attribute="cgsecstruct",
sequence="F",
molecule_selector=selectors.is_protein,
).run_system(system)
martinize_pipeline.add(
AnnotateResidues(
attribute="cgsecstruct",
sequence="F",
molecule_selector=selectors.is_protein,
)
)
if args.extdih and not target_ff.has_feature("extdih"):
LOGGER.warning(
'The force field "{}" does not define dihedral '
"angles for extended regions of proteins (-extdih).",
target_ff.name,
type="missing-feature",
)
vermouth.SetMoleculeMeta(extdih=args.extdih).run_system(system)
martinize_pipeline.add(vermouth.SetMoleculeMeta(extdih=args.extdih))
if args.scfix and not target_ff.has_feature("scfix"):
LOGGER.warning(
'The force field "{}" does not define angle and '
"torsion for the side chain corrections (-scfix).",
target_ff.name,
type="missing-feature",
)
vermouth.SetMoleculeMeta(scfix=args.scfix).run_system(system)
martinize_pipeline.add(vermouth.SetMoleculeMeta(scfix=args.scfix))

ss_sequence = list(
itertools.chain(
Expand All @@ -933,18 +925,20 @@ def entry():
)

if args.cystein_bridge == "none":
vermouth.RemoveCysteinBridgeEdges().run_system(system)
martinize_pipeline.add(vermouth.RemoveCysteinBridgeEdges())
elif args.cystein_bridge != "auto":
vermouth.AddCysteinBridgesThreshold(args.cystein_bridge).run_system(system)
martinize_pipeline.add(vermouth.AddCysteinBridgesThreshold(args.cystein_bridge))

# Run martinize on the system.
system = martinize(
martinize_pipeline,
system,
mappings=known_mappings,
to_ff=known_force_fields[args.to_ff],
delete_unknown=True,
)

pipeline.add(martinize_pipeline)
postprocessing = ProcessorPipeline(name='post-processing')
# Apply position restraints if required.
if args.posres != "none":
LOGGER.info("Applying position restraints.", type="step")
Expand All @@ -953,21 +947,24 @@ def entry():
"backbone": selectors.select_backbone,
}
node_selector = node_selectors[args.posres]
vermouth.ApplyPosres(node_selector, args.posres_fc).run_system(system)
postprocessing.add(vermouth.ApplyPosres(node_selector, args.posres_fc))

# Generate the Go model if required
if args.go:
LOGGER.info("Reading Go model contact map.", type="step")
go_map = read_go_map(args.go)
LOGGER.info("Generating the Go model.", type="step")
GoPipeline.run_system(system,
moltype=args.govs_moltype,
contact_map=go_map,
cutoff_short=args.go_low,
cutoff_long=args.go_up,
go_eps=args.go_eps,
res_dist=args.go_res_dist,)

go_pipeline = ProcessorPipeline(name='GoPipeline')
go_pipeline.add(vermouth.MergeAllMolecules())
go_pipeline.add(vermouth.SetMoleculeMeta(moltype=args.govs_moltype))
go_pipeline.add(VirtualSiteCreator())
go_pipeline.add(ComputeStructuralGoBias(moltype=args.govs_moltype,
contact_map=go_map,
cutoff_short=args.go_low,
cutoff_long=args.go_up,
go_eps=args.go_eps,
res_dist=args.go_res_dist))
postprocessing.add(go_pipeline)
defines = ("GO_VIRT",)
itp_paths = {"atomtypes": "go_atomtypes.itp",
"nonbond_params": "go_nbparams.itp"}
Expand All @@ -980,16 +977,16 @@ def entry():
if "all" not in args.merge_chains:
input_chain_sets = [i.split(",") for i in args.merge_chains]
for chain_set in input_chain_sets:
vermouth.MergeChains(chains=chain_set, all_chains=False).run_system(system)
postprocessing.add(vermouth.MergeChains(chains=chain_set, all_chains=False))
#if "all" is in the list and is the only argument
elif "all" in args.merge_chains and len(args.merge_chains) == 1:
vermouth.MergeChains(chains=[], all_chains=True).run_system(system)
postprocessing.add(vermouth.MergeChains(chains=[], all_chains=True))
#otherwise there are multiple arguments and we need to raise an ArgumentError
else:
raise argparse.ArgumentError(chain_merging,
message=("Multiple conflicting merging arguments given. "
"Either specify -merge all or -merge A,B,C (+)."))
vermouth.NameMolType(deduplicate=not args.keep_duplicate_itp).run_system(system)
postprocessing.add(vermouth.NameMolType(deduplicate=not args.keep_duplicate_itp))
defines = ()

# Apply a rubber band elastic network is required.
Expand All @@ -998,7 +995,7 @@ def entry():
if args.rb_unit == "molecule":
domain_criterion = vermouth.processors.apply_rubber_band.always_true
elif args.rb_unit == "all":
vermouth.MergeAllMolecules().run_system(system)
postprocessing.add(vermouth.MergeAllMolecules())
domain_criterion = vermouth.processors.apply_rubber_band.always_true
elif args.rb_unit == "chain":
domain_criterion = vermouth.processors.apply_rubber_band.same_chain
Expand Down Expand Up @@ -1041,7 +1038,7 @@ def entry():
domain_criterion=domain_criterion,
res_min_dist=args.res_min_dist,
)
rubber_band_processor.run_system(system)
postprocessing.add(rubber_band_processor)

# If required add some water bias
if args.water_bias:
Expand All @@ -1068,7 +1065,7 @@ def entry():
# model, thus we skip the sorting here altogether.
if not args.go:
LOGGER.info("Sorting atomids", type="step")
vermouth.SortMoleculeAtoms().run_system(system)
postprocessing.add(vermouth.SortMoleculeAtoms())

LOGGER.info("Writing output.", type="step")
for molecule in system.molecules:
Expand Down Expand Up @@ -1099,6 +1096,9 @@ def entry():
"was used for the full system:",
"".join(ss_sequence),
]
pipeline.add(postprocessing)
print(pipeline)
pipeline.run_system(system)

if args.top_path is not None:
write_gmx_topology(system,
Expand All @@ -1109,7 +1109,7 @@ def entry():
header=header)

# Write a PDB file.
vermouth.pdb.write_pdb(system, str(args.outpath), omit_charges=True)
vermouth.pdb.write_pdb(system, args.outpath, omit_charges=True)

# TODO: allow ignoring warnings per class/amount (i.e. ignore 2
# inconsistent-data warnings)
Expand All @@ -1126,7 +1126,7 @@ def entry():
sys.exit(2)
else:
DeferredFileWriter().write()
vermouth.Quoter().run_system(system)
vermouth.Quoter().run_system(None)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit ff636ed

Please sign in to comment.