diff --git a/polyply/src/builder.py b/polyply/src/builder.py new file mode 100644 index 00000000..2394c78b --- /dev/null +++ b/polyply/src/builder.py @@ -0,0 +1,211 @@ +""" +Base class for all polyply builders. +""" +from .meta_molecule import _find_starting_node +from .conditionals import CONDITIONALS + +class MaxIterationError(Exception): + """ + Raised when the maximum number of iterations + is reached in a building attempt. + """ + +class Builder(): + """ + A builder generates coordiantes for a single meta_molecule at the + one bead per residue resolution. It defines a protocol to generate + new positions and automatically checks conditionals that define + extra build options such as geometrical constraints. + """ + self.conditions = CONDITIONALS + + def __init__(self, + nonbond_matrix, + maxiter): + """ + Paramters + ---------- + nonbond_matrix: :class:`polyply.src.nonbond_engine.NonBondMatrix` + maxiter: int + number of tries to build a coordiante before returning + + Attributes + ---------- + mol_idx: index of the molecule in the topology list of molecules + molecule: molecule: :class:`polyply.Molecule` + meta_molecule for which the point should be added + """ + self.nonbond_matrix = nonbond_matrix + self.maxiter = maxiter + + # convience attributes + self.mol_idx = None + self.molecule = None + self.step_count = 0 + self.path = None + + @staticmethod + def _check_conditionals(new_point, molecule, node): + """ + Checks that the new point fullfills every conditon + as dictated by the conditional functions. + """ + for conditional in self.conditions: + if not conditional(new_point, molecule, node): + return False + return True + + def remove_positions(self, mol_idx, nodes): + """ + Remove positions of nodes from the nonbond-matrix by + molecule index. + + Parameters + ---------- + mol_idx: int + the index of the molecule in the system + nodes: list[abc.hashable] + list of nodes from which to remove the positions + """ + self.nonbond_matrix.remove_positions(mol_idx, nodes) + + def add_position(self, new_point, node, start=False): + """ + If conditionals are fullfilled for the node then + add the position to the nonbond_matrix. + + Parameters + ---------- + new_point: numpy.ndarray + the coordinates of the point to add + nodes: list[abc.hashable] + list of nodes from which to remove the positions + start: bool + if True triggers rebuilding of the positions tree + + Returns + ------- + bool + is True if the position could be added + """ + if self.step_count > self.maxiter: + raise MaxIterationError() + + if self._check_conditionals(new_point, self.molecule, node): + self.nonbond_matrix.add_positions(new_point, + self.mol_idx, + node, + start=start) + self.step_count = 0 + return True + else: + self.step_count += 1 + return False + + def add_positions(self, new_points, nodes): + """ + Add positions of multiple nodes if each node full-fills the conditionals. + If not then none of the points is added and the False is returned. + + Parameters + ---------- + new_point: numpy.ndarray + the coordinates of the point to add + nodes: list[abc.hashable] + list of nodes from which to remove the positions + + Returns + ------- + bool + is True if the position could be added + """ + if self.step_count > self.maxiter: + raise MaxIterationError() + + for node, point in zip(nodes, new_points): + if not self._check_conditionals(point, self.molecule, node): + self.step_count += 1 + return False + + for node, point in zip(nodes, new_points): + self.nonbond_matrix.add_positions(point, + self.mol_idx, + node, + start=False) + self.step_count = 0 + return True + + def build_protocol(self): + """ + Must be defined in subclass of builder call update + positions on every new_point added. In addition + this function must return a bool indicating if the + position building has worked or failed. + """ + return True + + def execute_build_protocol(self): + """ + This wraps the build protocol method and handles maximum + iteration counts. + """ + try: + status = self.build_protocol() + except MaxIterationError: + return False + return status + + def _prepare_build(self, starting_point, starting_node): + """ + Called before the building stage. This function finds + the starting node and adds the first coordinate. It also + resets all counters associated with a build cycle (i.e. a + call to the run_molecule method). + """ + if starting_node: + first_node = _find_starting_node(self.molecule) + else: + first_node = starting_node + + self.molecule.root = first_node + + if "position" not in self.molecule.nodes[first_node]: + prepare_status = self.add_position(starting_point, first_node, start=True) + else: + prepare_status = True + + self.path = list(self.molecule.search_tree.edges) + self.step_count = 0 + return prepare_status + + def run_molecule(self, molecule, mol_idx, starting_point, starting_node=None): + """ + Overwrites run_molecule of the Processor class and + adds the alternative starting_point argument. + + Parameters + ---------- + molecule: :class:`polyply.MetaMolecule` + meta_molecule for which the point should be added + mol_idx: int + index of the molecule in topology list + sarting_point: numpy.ndarray + the coordinates of the point to add + starting_node: abc.hashable + first node to start building at; if default None + first node without defined coordinates is taken + + Returns + ------- + bool + if the building process has completed successfully + """ + # update some class variables for convience + self.molecule = molecule + self.mol_idx = mol_idx + is_prepared = self._prepare_build(starting_point, starting_node) + if not is_prepared: + return False + + build_status = self.execute_build_protocol() + return build_status diff --git a/polyply/src/conditionals.py b/polyply/src/conditionals.py new file mode 100644 index 00000000..2e994863 --- /dev/null +++ b/polyply/src/conditionals.py @@ -0,0 +1,230 @@ +# Copyright 2022 University of Groningen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import inspect +import random +import numpy as np +from numpy.linalg import norm +import networkx as nx +from .processor import Processor +from .linalg_functions import norm_sphere +from .linalg_functions import _vector_angle_degrees +from .graph_utils import neighborhood +from .meta_molecule import _find_starting_node +""" +Conditionals for all polyply builders. +""" +# list of all condition functions for builders +CONDITIONALS = [] + +def condition_wrapper(condition): + """ + Wraps a condition function and registers it in the CONDITIONALS + list, which is picked up by the builder classes. + """ + args_list = inspect.getfullargspec(condition).args + name = condition.__name__ + if args_list != ['nobond_matrix', 'molecule', 'mol_idx', 'current_node', 'current_position']: + msg = "Condition function with name {name} does not have the correct signature." + raise SignatureError(msg) + + CONDITIONALS.append(condition) + +def in_cylinder(point, parameters): + """ + Assert if a point is within a cylinder or outside a + cylinder as defined in paramters. Note the cylinder + is z-aligned always. + + Parameters: + ----------- + point: np.ndarray + reference point + parameters: abc.iteratable + + Returns: + -------- + bool + """ + in_out = parameters[0] + diff = parameters[1] - point + radius = norm(diff[:2]) + half_heigth = diff[2] + if in_out == "in" and radius < parameters[2] and np.abs(half_heigth) < parameters[3]: + return True + elif in_out == "out" and (radius > parameters[2] or half_heigth > np.abs(parameters[3])): + return True + else: + return False + +def in_rectangle(point, parameters): + """ + Assert if a point is within a rectangle or outside a + rectangle as defined in paramters: + + Parameters: + ----------- + point: np.ndarray + reference point + parameters: abc.iteratable + + Returns: + -------- + bool + """ + in_out = parameters[0] + diff = parameters[1] - point + check = [np.abs(dim) < max_dim for dim, + max_dim in zip(diff, parameters[2:])] + if in_out == "in" and not all(check): + return False + elif in_out == "out" and all(check): + return False + else: + return True + + +def in_sphere(point, parameters): + """ + Assert if a point is within a sphere or outside a + sphere as defined in paramters: + + Parameters: + ----------- + point: np.ndarray + reference point + parameters: abc.iteratable + + Returns: + -------- + bool + """ + in_out = parameters[0] + r = norm(parameters[1] - point) + if in_out == 'in' and r > parameters[2]: + return False + elif in_out == 'out' and r < parameters[2]: + return False + else: + return True + + +# methods of geometrical restraints known to polyply +RESTRAINT_METHODS = {"cylinder": in_cylinder, + "rectangle": in_rectangle, + "sphere": in_sphere} + +@condition_wrapper() +def fulfill_geometrical_constraints(nobond_matrix, molecule, mol_idx, current_node, current_position): + """ + Assert if a point fulfills a geometrical constraint + as defined by the "restraint" key in a dictionary. + If there is no key "restraint" the function returns true. + + Parameters: + ----------- + point: np.ndarray + reference point + + node_dict: :class:ditc + + Returns: + -------- + bool + """ + node_dict = molecule.nodes[current_node] + if "restraints" not in node_dict: + return True + + for restraint in node_dict['restraints']: + restr_type = restraint[-1] + if not RESTRAINT_METHODS[restr_type](point, restraint): + return False + + return True + + +@condition_wrapper() +def not_is_overlap(nobond_matrix, molecule, mol_idx, current_node, current_position): + neighbours = neighborhood(molecule, current_node, molecule.nrexcl) + force_vect = nonbond_matrix.compute_force_point(current_position, + mol_idx, + node, + neighbours, + potential="LJ") + return norm(force_vect) < self.max_force + + +@condition_wrapper() +def checks_milestones(nonbond_matrix, molecule, mol_idx, current_node, current_position): + + if 'distance_restraints' in molecule.nodes[current_node]: + for restraint in molecule.nodes[current_node]['distance_restraints']: + ref_node, upper_bound, lower_bound = restraint + ref_pos = nonbond_matrix.get_point(mol_idx, ref_node) + current_distance = nonbond_matrix.pbc_min_dist(current_position, ref_pos) + if current_distance > upper_bound: + return False + + if current_distance < lower_bound: + return False + + return True + +@condition_wrapper() +def is_restricted(nonbond_matrix, molecule, mol_idx, current_node, current_position): + """ + The function checks, if the step `old_point` to + `point` is in a direction as defined by a plane + with normal and angle as set in the `node_dict` + options with keyword rw_options. It is true + if the direction vector point-old_point is + pointing some max angle from the normal of the plane + in the same direction as an angle specifies. + To check we compute the signed angle of the vector + `point`-`old_point` has with the plane defined + by a normal and compare to the reference angle. + The angle needs to have the same sign as angle + and not be smaller in magnitude. + + Parameters: + ----------- + point: np.ndarray + old_point: np.ndarray + node_dict: dict["rw_options"][[np.ndarray, float]] + dict with key rw_options containing a list of lists + specifying a normal and an angle + + Returns: + ------- + bool + """ + node_dict = molecule.nodes[current_node] + if not "rw_options" in node_dict: + return True + + # find the previous node + prev_node = molecule.predecessors(current_node) + prev_position = molecule.nodes[prev_node]["position"] + + normal, ref_angle = node_dict["rw_options"][0] + # check condition 1 + sign = np.sign(np.dot(normal, current_position - prev_position)) + if sign != np.sign(ref_angle): + return False + + # check condition 2 + angle = _vector_angle_degrees(normal, current_position - prev_position) + if angle > np.abs(ref_angle): + return False + return True diff --git a/polyply/src/tool_wrapper.py b/polyply/src/tool_wrapper.py new file mode 100644 index 00000000..ffb04272 --- /dev/null +++ b/polyply/src/tool_wrapper.py @@ -0,0 +1,70 @@ +from .processor import Processor + +class ToolWrapper(): + + def __init__(self, executable, tool_args, *args, **kwargs): + self.executable = executable + self.tool_args = tool_args + self._tool_name = None + + @property + def tool_name(self): + pass + + def _execute(self, **kwargs): + """ + The actual running of the additional tool. + """ + command = [self.executable] + for arg, value in self.tool_args.items(): + command.append("-" + "arg") + command.append(value) + + output = subprocess.run(command) + return output.stdout.decode('utf-8'), output.stderr.decode('utf-8') + + def _post_process(self): + """ + This function needs to be defined at subclass level, and + defines how the coordinates are mapped to the topology. + """ + pass + + def _perpare(self): + """ + This function can be defined if anything special has to be + prepared for the tool like initial coordinates or topology + files. + """ + pass + + def run_system(self, topology): + """ + Called to run the construction based on the tool. + """ + topology = self._prepare(topology) + self._execute(topology) + topology = self._post_process(topolgoy) + return topology + +class FlatMembraneBuilder(Builder, ToolWrapper): + + def __init__(self, executable, *args, **kwargs): + tool_args = {"str": "input.str", + "Bondlength": 0.2, + "LLIP": "Martini3.LIN", + "defout": "system", + "function": "analytical_shape", + } + super().__init__(executable, tool_args, *args, **kwargs) + + @property + def tool_name(self): + return "TS2CG" + + def _prepare(self): + with open("input.str", w) as _file: + _file.write ... + + +