From 5dea9682bda2623bdd1beb470bd457b67b23caa3 Mon Sep 17 00:00:00 2001 From: Colin Gilgenbach Date: Fri, 5 Apr 2024 13:12:30 -0400 Subject: [PATCH] Initial support for parsing LMP files --- README.md | 4 +- atomlib/cell.py | 2 +- atomlib/io/__init__.py | 30 +- atomlib/io/lmp.py | 464 ++++++++++++++++++++++--- atomlib/io/test_lmp.py | 315 +++++++++++++++-- atomlib/io/util.py | 60 ++++ atomlib/mixins.py | 8 +- atomlib/util.py | 25 +- tests/baseline_files/AlN_out.lmp | 7 +- tests/baseline_files/AlN_roundtrip.lmp | 25 ++ tests/input_files/labeled.lmp | 32 ++ tests/test_io.py | 26 +- 12 files changed, 920 insertions(+), 78 deletions(-) create mode 100644 atomlib/io/util.py create mode 100644 tests/baseline_files/AlN_roundtrip.lmp create mode 100644 tests/input_files/labeled.lmp diff --git a/README.md b/README.md index 0462e60..aa0b127 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,9 @@ This means you can write your own code to utilize advanced format features even | [Basic XYZ][xyz] | .xyz | :white_check_mark: | :white_check_mark: | | | [Ext. XYZ][xyz] | .exyz | :white_check_mark: | :white_check_mark: | Arbitrary properties not implemented | | [Special XYZ][xyz] | .sxyz | :x: | :x: | To be implemented | -| [LAMMPS Data][lmp] | .lmp | :x: | :white_check_mark: | | +| [LAMMPS Data][lmp] | .lmp | :white_check_mark: | :white_check_mark: | | | [Quantum Espresso][qe] | .qe | :x: | :white_check_mark: | pw.x format | -| [pyMultislicer][pyM] | .mslice | :white_check_mark: | :white_check_mark: | | +| [pyMultislicer][pyM] | .mslice | :white_check_mark: | :white_check_mark: | Currently XML format only | [atomsk]: https://atomsk.univ-lille.fr/ [ase]: https://wiki.fysik.dtu.dk/ase/ diff --git a/atomlib/cell.py b/atomlib/cell.py index ea85c5e..3b660fb 100644 --- a/atomlib/cell.py +++ b/atomlib/cell.py @@ -381,7 +381,7 @@ def with_cell(self: Cell, cell: Cell) -> Cell: def __init__(self, *, affine: t.Optional[AffineTransform3D] = None, ortho: t.Optional[LinearTransform3D] = None, cell_size: VecLike, cell_angle: t.Optional[VecLike] = None, - n_cells: t.Optional[VecLike] = None, pbc: t.Optional[VecLike]): + n_cells: t.Optional[VecLike] = None, pbc: t.Optional[VecLike] = None): object.__setattr__(self, '_affine', AffineTransform3D() if affine is None else affine) object.__setattr__(self, '_ortho', LinearTransform3D() if ortho is None else ortho) diff --git a/atomlib/io/__init__.py b/atomlib/io/__init__.py index 8f834a7..de636ac 100644 --- a/atomlib/io/__init__.py +++ b/atomlib/io/__init__.py @@ -13,7 +13,7 @@ from .xsf import XSF from .cfg import CFG from .mslice import write_mslice, read_mslice -from .lmp import write_lmp +from .lmp import LMP from .qe import write_qe from ..atoms import Atoms, HasAtoms @@ -73,9 +73,9 @@ def read_cif(f: t.Union[FileOrPath, CIF, CIFDataBlock], block: t.Union[int, str, for sym in cif.get_symmetry(): sym_atoms.append(atoms.transform(sym)) - s = '\n'.join(map(str, sym_atoms)) - print(f"sym_atoms:\n{s}") - print(f"atoms: {atoms!s}") + #s = '\n'.join(map(str, sym_atoms)) + #print(f"sym_atoms:\n{s}") + #print(f"atoms: {atoms!s}") if len(sym_atoms) > 0: atoms = Atoms.concat(sym_atoms)._wrap().deduplicate() @@ -176,11 +176,29 @@ def read_cfg(f: t.Union[FileOrPath, CFG]) -> AtomCell: def write_cfg(atoms: t.Union[HasAtoms, CFG], f: FileOrPath): + """Write a structure to an AtomEye CFG file.""" if not isinstance(atoms, CFG): atoms = CFG.from_atoms(atoms) atoms.write(f) +def read_lmp(f: t.Union[FileOrPath, LMP], type_map: t.Optional[t.Dict[int, t.Union[str, int]]] = None) -> AtomCell: + """Read a structure from a LAAMPS data file.""" + if isinstance(f, LMP): + lmp = f + else: + lmp = LMP.from_file(f) + + return lmp.get_atoms(type_map=type_map) + + +def write_lmp(atoms: t.Union[HasAtoms, LMP], f: FileOrPath): + """Write a structure to a LAAMPS data file.""" + if not isinstance(atoms, LMP): + atoms = LMP.from_atoms(atoms) + atoms.write(f) + + ReadFunc = t.Callable[[FileOrPath], HasAtoms] _READ_TABLE: t.Mapping[FileType, t.Optional[ReadFunc]] = { 'cif': read_cif, @@ -188,7 +206,7 @@ def write_cfg(atoms: t.Union[HasAtoms, CFG], f: FileOrPath): 'xsf': read_xsf, 'cfg': read_cfg, 'mslice': read_mslice, - 'lmp': None, + 'lmp': read_lmp, 'qe': None } @@ -294,7 +312,7 @@ def write(atoms: HasAtoms, path: FileOrPath, ty: t.Optional[FileType] = None): __all__ = [ - 'CIF', 'XYZ', 'XSF', 'CFG', + 'CIF', 'XYZ', 'XSF', 'CFG', 'LMP', 'read', 'read_cif', 'read_xyz', 'read_xsf', 'read_cfg', 'write', 'write_xyz', 'write_xsf', 'write_cfg', 'write_lmp', 'write_mslice', 'write_qe', ] diff --git a/atomlib/io/lmp.py b/atomlib/io/lmp.py index b73eb7d..68797e6 100644 --- a/atomlib/io/lmp.py +++ b/atomlib/io/lmp.py @@ -1,60 +1,446 @@ +from __future__ import annotations + +from dataclasses import dataclass +from io import TextIOBase +import re +import typing as t import numpy +import polars -from ..atoms import HasAtoms -from ..atomcell import HasAtomCell -from ..util import open_file, FileOrPath, localtime -from ..transform import AffineTransform3D +from ..atomcell import HasAtomCell, HasAtoms, Atoms, Cell, AtomCell +from ..elem import get_elem, get_sym +from ..util import open_file, FileOrPath, localtime, checked_left_join, CheckedJoinError +from ..transform import AffineTransform3D, LinearTransform3D -def write_lmp(atoms: HasAtoms, f: FileOrPath): - if isinstance(atoms, HasAtomCell): - # we're basically converting everything to the ortho frame, but including the affine shift +@dataclass +class LMP: + comment: t.Optional[str] + headers: t.Dict[str, t.Any] + sections: t.Tuple[LMPSection, ...] + + def get_cell(self) -> Cell: + dims = numpy.array([ + self.headers.get(f"{c}lo {c}hi", (-0.5, 0.5)) + for c in "xyz" + ]) + origin = dims[:, 0] + tilts = self.headers.get("xy xz yz", (0., 0., 0.)) + + ortho = numpy.diag(dims[:, 1] - dims[:, 0]) + (ortho[0, 1], ortho[0, 2], ortho[1, 2]) = tilts + + return Cell.from_ortho(LinearTransform3D(ortho).translate(origin)) + + def get_atoms(self, type_map: t.Optional[t.Dict[int, t.Union[str, int]]] = None) -> AtomCell: + if type_map is not None: + try: + type_map_df = polars.DataFrame({ + 'type': polars.Series(type_map.keys(), dtype=polars.Int32), + 'elem': polars.Series(list(map(get_elem, type_map.values())), dtype=polars.UInt8), + 'symbol': polars.Series([get_sym(v) if isinstance(v, int) else v for v in type_map.values()], dtype=polars.Utf8), + }) + except ValueError as e: + raise ValueError("Invalid type map") from e + else: + type_map_df = None + + cell = self.get_cell() + + def _apply_type_labels(df: polars.DataFrame, section_name: str, d: t.Optional[t.Dict[str, int]]) -> polars.DataFrame: + if d is not None: + df = df.with_columns(polars.col('type').replace(d, default=polars.col('type').cast(polars.Int32, strict=False), return_dtype=polars.Int32)) + if df['type'].is_null().any(): + raise ValueError(f"While parsing section {section_name}: Unknown atom label or invalid atom type") + try: + return df.with_columns(polars.col('type').cast(polars.Int32)) + except polars.ComputeError: + raise ValueError(f"While parsing section {section_name}: Invalid atom type(s)") + + atoms: t.Optional[polars.DataFrame] = None + labels: t.Optional[polars.DataFrame] = None + labels_dict: t.Optional[t.Dict[str, int]] = None + masses: t.Optional[polars.DataFrame] = None + velocities = None - # transform affine shift into ortho frame - origin = atoms.get_transform('ortho', 'local').to_linear().round_near_zero() \ - .transform(atoms.get_cell().affine.translation()) + for section in self.sections: + if section.name == 'Atoms': + if section.style not in (None, 'atomic'): + # TODO support other styles + raise ValueError(f"Only 'atomic' atom_style is supported, instead got '{section.style}'") - # get the orthogonalization transform only, without affine - ortho = atoms.get_transform('ortho', 'cell_box').to_linear().round_near_zero().inner + atoms = _parse_whitespace_separated(section.body, { + 'i': polars.Int64, 'type': polars.Utf8, + 'x': polars.Float64, 'y': polars.Float64, 'z': polars.Float64, + }).select('i', 'type', polars.concat_list('x', 'y', 'z').alias('coords').list.to_array(3)) + atoms = _apply_type_labels(atoms, 'Atoms', labels_dict) + elif section.name == 'Atom Type Labels': + labels = _parse_whitespace_separated(section.body, {'type': polars.Int32, 'symbol': polars.Utf8}) + labels_dict = {ty: sym for (sym, ty) in labels.iter_rows()} + elif section.name == 'Masses': + masses = _parse_whitespace_separated(section.body, {'type': polars.Utf8, 'mass': polars.Float64}) + masses = _apply_type_labels(masses, 'Masses', labels_dict) + elif section.name == 'Velocities': + velocities = _parse_whitespace_separated(section.body, { + 'i': polars.Int64, 'x': polars.Float64, 'y': polars.Float64, 'z': polars.Float64 + }).select('i', polars.concat_list('x', 'y', 'z').alias('velocity').list.to_array(3)) - # get atoms in ortho frame, and then add the affine shift - frame = atoms.get_atoms('ortho').transform_atoms(AffineTransform3D.translate(origin)) \ - .round_near_zero().with_type() - else: - bbox = atoms.bbox_atoms() - ortho = numpy.diag(bbox.size) - origin = bbox.min + # now all 'type's should be in Int32 - frame = atoms.get_atoms('local').with_type() + if atoms is None: + if self.headers['atoms'] > 0: + raise ValueError("Missing required section 'Atoms'") + return AtomCell(Atoms.empty(), cell=cell, frame='local') - types = frame.unique(subset='type') - types = types.with_mass().sort('type') + # next we need to assign element symbols + # first, if type_map is specified, use that: + #if type_map_elem is not None and type_map_sym is not None: + if type_map_df is not None: + try: + atoms = checked_left_join(atoms, type_map_df, on='type') + except CheckedJoinError as e: + raise ValueError(f"Missing type_map specification for atom type(s): {', '.join(map(repr, e.missing_keys))}") + elif labels is not None: + try: + labels = labels.with_columns(get_elem(labels['symbol'])) + except ValueError as e: + raise ValueError("Failed to auto-detect elements from type labels. Please pass 'type_map' explicitly") from e + try: + atoms = checked_left_join(atoms, labels, 'type') + except CheckedJoinError as e: + raise ValueError(f"Missing labels for atom type(s): {', '.join(map(repr, e.missing_keys))}") + # otherwise we have no way + else: + raise ValueError("Failed to auto-detect elements from type labels. Please pass 'type_map' explicitly") - with open_file(f, 'w') as f: - def p(s: object): - print(s, file=f) + if velocities is not None: + # join velocities + try: + # TODO use join_asof here? + atoms = checked_left_join(atoms, velocities, 'i') + except CheckedJoinError as e: + raise ValueError(f"Missing velocities for {len(e.missing_keys)}/{len(atoms)} atoms") + + if masses is not None: + # join masses + try: + atoms = checked_left_join(atoms, masses, 'type') + except CheckedJoinError as e: + raise ValueError(f"Missing masses for atom type(s): {', '.join(map(repr, e.missing_keys))}") + + return AtomCell(atoms, cell=cell, frame='local') + + @staticmethod + def from_atoms(atoms: HasAtoms) -> LMP: + if isinstance(atoms, HasAtomCell): + # we're basically converting everything to the ortho frame, but including the affine shift + + # transform affine shift into ortho frame + origin = atoms.get_transform('ortho', 'local').to_linear().round_near_zero() \ + .transform(atoms.get_cell().affine.translation()) + + # get the orthogonalization transform only, without affine + ortho = atoms.get_transform('ortho', 'cell_box').to_linear().round_near_zero().inner + + # get atoms in ortho frame, and then add the affine shift + frame = atoms.get_atoms('ortho').transform_atoms(AffineTransform3D.translate(origin)) \ + .round_near_zero().with_type() + else: + bbox = atoms.bbox_atoms() + ortho = numpy.diag(bbox.size) + origin = bbox.min + + frame = atoms.get_atoms('local').with_type() + + types = frame.unique(subset='type') + types = types.with_mass().sort('type') now = localtime() - p(f"# Generated by atomlib on {now.isoformat(' ', 'seconds')}\n") + comment = f"# Generated by atomlib on {now.isoformat(' ', 'seconds')}" + + headers = {} + sections = [] - p(f" {len(frame):8} atoms") - p(f" {len(types):8} atom types\n") + headers['atoms'] = len(frame) + headers['atom types'] = len(types) for (s, low, diff) in zip(('x', 'y', 'z'), origin, ortho.diagonal()): - p(f" {low:16.7f} {low + diff:14.7f} {s}lo {s}hi") + headers[f"{s}lo {s}hi"] = (low, low + diff) - p(f"\n {ortho[0, 1]:16.7f} {ortho[0, 2]:14.7f} {ortho[1, 2]:14.7f} xy xz yz") + headers['xy xz yz'] = (ortho[0, 1], ortho[0, 2], ortho[1, 2]) - p(f"\nMasses\n") - for (ty, sym, mass) in types.select(('type', 'symbol', 'mass')).rows(): - p(f" {ty:8} {mass:14.7f} # {sym}") + body = [ + f" {ty:8} {sym:>4}\n" + for (ty, sym) in types.select('type', 'symbol').rows() + ] + sections.append(LMPSection("Atom Type Labels", tuple(body))) - p(f"\nAtoms # atomic\n") - for (i, (ty, (x, y, z))) in enumerate(frame.select(('type', 'coords')).rows()): - p(f" {i+1:8} {ty:4} {x:14.7f} {y:14.7f} {z:14.7f}") + if 'mass' in types: + body = [ + f" {ty:8} {mass:14.7f} # {sym}\n" + for (ty, sym, mass) in types.select(('type', 'symbol', 'mass')).rows() + ] + sections.append(LMPSection("Masses", tuple(body))) + + body = [ + f" {i+1:8} {ty:4} {x:14.7f} {y:14.7f} {z:14.7f}\n" + for (i, (ty, (x, y, z))) in enumerate(frame.select(('type', 'coords')).rows()) + ] + sections.append(LMPSection("Atoms", tuple(body), 'atomic')) if (velocities := frame.velocities()) is not None: - p(f"\nVelocities\n") - for (i, (v_x, v_y, v_z)) in enumerate(velocities): - p(f" {i+1:8} {v_x:14.7f} {v_y:14.7f} {v_z:14.7f}") + body = [ + f" {i+1:8} {v_x:14.7f} {v_y:14.7f} {v_z:14.7f}\n" + for (i, (v_x, v_y, v_z)) in enumerate(velocities) + ] + sections.append(LMPSection("Velocities", tuple(body))) + + return LMP(comment, headers, tuple(sections)) + + @staticmethod + def from_file(file: FileOrPath) -> LMP: + with open_file(file, 'r') as f: + return LMPReader(f).parse() + + def write(self, file: FileOrPath): + with open_file(file, 'w') as f: + print((self.comment or "") + '\n', file=f) + + # print headers + for (name, val) in self.headers.items(): + val = _HEADER_FMT.get(name, lambda s: f"{s:8}")(val) + print(f" {val} {name}", file=f) + + # print sections + for section in self.sections: + l = section.name + if section.style is not None: + l += f' # {section.style}' + print(f"\n{l}\n", file=f) + + f.writelines(section.body) + + +@dataclass +class LMPSection: + name: str + body: t.Tuple[str, ...] + style: t.Optional[str] = None + + +class LMPReader: + def __init__(self, f: TextIOBase): + self.line = 0 + self._file: TextIOBase = f + self._buf: t.Optional[str] = None + + def _split_comment(self, line: str) -> t.Tuple[str, t.Optional[str]]: + split = _COMMENT_RE.split(line, maxsplit=1) + return (split[0], split[1] if len(split) > 1 else None) + + def parse(self) -> LMP: + # parse comment + comment = self.next_line(skip_blank=False) + if comment is None: + raise ValueError("Unexpected EOF (file is blank)") + if comment.isspace(): + comment = None + else: + comment = comment[:-1] + + headers = self.parse_headers() + sections = self.parse_sections(headers) + + return LMP(comment, headers, sections) + + def parse_headers(self) -> t.Dict[str, t.Any]: + headers: t.Dict[str, t.Any] = {} + while True: + line = self.peek_line() + if line is None: + break + body = self._split_comment(line)[0] + + if (match := _HEADER_KW_RE.search(body)) is None: + # probably a body + break + self.next_line() + + name = match[0] + value = body[:match.start(0)].strip() + + try: + if name in _HEADER_PARSE: + value = _HEADER_PARSE[name](value) + except Exception as e: + raise ValueError(f"While parsing header '{name}' at line {self.line}: Failed to parse value '{value}") from e + + #print(f"header {name} => {value} (type {type(value)})") + headers[name] = value + + return headers + + def parse_sections(self, headers: t.Dict[str, t.Any]) -> t.Tuple[LMPSection, ...]: + first = True + + sections: t.List[LMPSection] = [] + + while True: + line = self.next_line() + if line is None: + break + name, comment = self._split_comment(line) + name = name.strip() + + try: + n_lines_header = _SECTION_KWS[name] + except KeyError: + if first: + raise ValueError(f"While parsing line {self.line}: Unknown header or section keyword '{line}'") from None + else: + raise ValueError(f"While parsing line {self.line}: Unknown section keyword '{line}'") from None + + try: + if n_lines_header is None: + # special case for PairIJ Coeffs: + n = int(headers['atom types']) + n_lines = (n * (n + 1)) // 2 + else: + n_lines = int(headers[n_lines_header]) + except KeyError: + raise ValueError(f"While parsing body section '{name}' at line {self.line}: " + f"Missing required header '{n_lines_header or 'atom types'}'") from None + + style = comment if name in _SECTION_STYLE_KWS else None + if style is not None: + style = style.strip() + + #print(f"section '{name}' @ {self.line}, {n_lines} lines, style {style}") + + lines = self.collect_lines(n_lines) + if lines is None: + raise ValueError(f"While parsing body section '{name}' starting at line {self.line}: " + f"Unexpected EOF before {n_lines} lines were read") + + sections.append(LMPSection( + name, tuple(lines), style, + )) + first = False + + return tuple(sections) + + def _try_fill_buf(self, skip_blank: bool = True) -> t.Optional[str]: + if self._buf is None: + try: + # skip blank lines + while True: + self._buf = next(self._file) + self.line += 1 + if not (skip_blank and self._buf.isspace()): + break + except StopIteration: + pass + return self._buf + + def peek_line(self, skip_blank: bool = True) -> t.Optional[str]: + return self._try_fill_buf(skip_blank) + + def next_line(self, skip_blank: bool = True) -> t.Optional[str]: + line = self._try_fill_buf(skip_blank) + self._buf = None + return line + + def collect_lines(self, n: int) -> t.Optional[t.List[str]]: + assert self._buf is None + lines = [] + try: + for _ in range(n): + while True: + l = next(self._file) + if not l.isspace(): + lines.append(l) + break + except StopIteration: + return None + self.line += n + return lines + + +def _parse_whitespace_separated(rows: t.Iterable[str], schema: t.Dict[str, t.Type[polars.DataType]]) -> polars.DataFrame: + # TODO more general version of this across io types + return polars.DataFrame([ + row.split()[:len(schema)] + for row in rows + ], schema=schema, orient='row') + + +def write_lmp(atoms: HasAtoms, f: FileOrPath): + LMP.from_atoms(atoms).write(f) + return + + +def _parse_seq(f: t.Callable[[str], t.Any], n: int) -> t.Callable[[str], t.Tuple[t.Any, ...]]: + def inner(s: str) -> t.Tuple[t.Any, ...]: + vals = s.strip().split() + if len(vals) != n: + raise ValueError(f"Expected {n} values, instead got {len(vals)}") + return tuple(map(f, vals)) + + return inner + + +_parse_2float = _parse_seq(float, 2) +_parse_3float = _parse_seq(float, 3) +_fmt_2float = lambda vals: f"{vals[0]:16.7f} {vals[1]:14.7f}" +_fmt_3float = lambda vals: f"{vals[0]:16.7f} {vals[1]:14.7f} {vals[2]:14.7f}" + + +_HEADER_KWS = { + "xlo xhi", "ylo yhi", "zlo zhi", "xy xz yz", + "atoms", "bonds", "angles", "dihedrals", "impropers", + "atom types", "bond types", "angle types", "dihedral types", "improper types", + "extra bond per atom", "extra angle per atom", "extra dihedral per atom", + "extra improper per atom", "extra special per atom", + "ellipsoids", "lines", + "triangles", "bodies", +} + +_HEADER_PARSE: t.Dict[str, t.Callable[[str], t.Any]] = { + 'atoms': int, 'bonds': int, 'angles': int, 'dihedrals': int, 'impropers': int, + 'ellipsoids': int, 'lines': int, 'triangles': int, 'bodies': int, + 'atom types': int, 'bond types': int, 'angle types': int, 'dihedral types': int, 'improper types': int, + 'extra bond per atom': int, 'extra angle per atom': int, 'extra dihedral per atom': int, + 'extra improper per atom': int, 'extra special per atom': int, + 'xlo xhi': _parse_2float, 'ylo yhi': _parse_2float, 'zlo zhi': _parse_2float, + 'xy xz yz': _parse_3float, +} + +_HEADER_FMT: t.Dict[str, t.Callable[[t.Any], str]] = { + 'xlo xhi': _fmt_2float, 'ylo yhi': _fmt_2float, 'zlo zhi': _fmt_2float, + 'xy xz yz': _fmt_3float, +} + +_SECTION_KWS: t.Dict[str, t.Optional[str]] = { + # atom property sections + "Atoms": "atoms", "Velocities": "atoms", "Masses": "atom types", + "Ellipsoids": "ellipsoids", "Lines": "lines", "Triangles": "triangles", "Bodies": "bodies", + # molecular topology sections + "Bonds": "bonds", "Angles": "angles", "Dihedrals": "dihedrals", "Impropers": "impropers", + # type label maps + "Atom Type Labels": "atom types", "Bond Type Labels": "bond types", "Angle Type Labels": "angle types", + "Dihedral Type Labels": "dihedral types", "Improper Type Labels": "improper types", + # force field sections + "Pair Coeffs": "atom types", "PairIJ Coeffs": None, "Bond Coeffs": "bond types", "Angle Coeffs": "angle types", + "Dihedral Coeffs": "dihedral types", "Improper Coeffs": "improper types", + # class 2 force field sections + "BondBond Coeffs": "angle types", "BondAngle Coeffs": "angle types", "MiddleBondTorision Coeffs": "dihedral types", + "EndBondTorsion Coeffs": "dihedral types", "AngleTorsion Coeffs": "dihedral types", "AngleAngleTorsion Coeffs": "dihedral types", + "BondBond13 Coeffs": "dihedral types", "AngleAngle Coeffs": "improper types", +} +_SECTION_STYLE_KWS: t.Set[str] = { + "Atoms", "Pair Coeffs", "PairIJ Coeffs", "Bond Coeffs", "Angle Coeffs", "Dihedral Coeffs", "Improper Coeffs", +} + +_HEADER_KW_RE = re.compile("{}$".format('|'.join(map(re.escape, _HEADER_KWS)))) +_COMMENT_RE = re.compile(r"\s#") \ No newline at end of file diff --git a/atomlib/io/test_lmp.py b/atomlib/io/test_lmp.py index 6694ea9..34d8af9 100644 --- a/atomlib/io/test_lmp.py +++ b/atomlib/io/test_lmp.py @@ -1,12 +1,16 @@ +import re from io import StringIO -from . import write_lmp -from .. import AtomCell, Atoms +import pytest +from .lmp import LMP, LMPSection +from .. import AtomCell, Atoms, Cell +from ..transform import AffineTransform3D -def test_write_lmp(): - cell = AtomCell.from_unit_cell(Atoms({ + +def test_lmp_from_atoms(): + atoms = AtomCell.from_unit_cell(Atoms({ 'x': [0., 5., 8.], 'y': [2., 5., 9.], 'z': [2., 5., 9.], @@ -16,36 +20,297 @@ def test_write_lmp(): 'elem': [5, 1, 18], }), cell_size=[4., 6., 10.]) - f = StringIO() + lmp = LMP.from_atoms(atoms) + + assert lmp.comment is not None + assert lmp.comment.startswith("# Generated by atomlib") + + assert lmp.headers == { + 'atoms': 3, + 'atom types': 3, + 'xlo xhi': (0., 4.), + 'ylo yhi': (0., 6.), + 'zlo zhi': (0., 10.), + 'xy xz yz': (0., 0., 0.), + } + + assert lmp.sections == ( + LMPSection('Atom Type Labels', ( +" 1 H\n", +" 2 B\n", +" 3 Ar\n", + )), + LMPSection('Masses', ( +" 1 1.0080000 # H\n", +" 2 10.8100004 # B\n", +" 3 39.9500008 # Ar\n", + )), + LMPSection('Atoms', ( +" 1 2 0.0000000 2.0000000 2.0000000\n", +" 2 1 5.0000000 5.0000000 5.0000000\n", +" 3 3 8.0000000 9.0000000 9.0000000\n", + ), style='atomic'), + LMPSection('Velocities', ( +" 1 1.0000000 2.0000000 2.0000000\n", +" 2 -1.0000000 4.0000000 2.0000000\n", +" 3 2.0000000 -2.0000000 -1.0000000\n", + )) + ) + + +def test_write_lmp(): + lmp = LMP( + "my comment", + { + 'atoms': 3, + 'impropers': 1, + }, + ( + LMPSection('Atoms', ( +" 1 1 1 0 0\n", +" 2 1 0 1 0\n", +" 3 1 0 0 1\n", + ), 'atomic'), + LMPSection('Impropers', ( +"2 20 0.0548311\n", + )) + ) + ) + + buf = StringIO() + lmp.write(buf) + + assert buf.getvalue() == """my comment - write_lmp(cell, f) - s = f.getvalue() - assert s.startswith("# Generated by atomlib") - assert s.split('\n', 1)[1] == """ 3 atoms - 3 atom types + 1 impropers + +Atoms # atomic + + 1 1 1 0 0 + 2 1 0 1 0 + 3 1 0 0 1 + +Impropers + +2 20 0.0548311 +""" + + +def test_lmp_get_atoms_typemap(): + f = StringIO(""" +3 atoms +3 atom types + +Atoms + 1 1 1.0 0.0 0.0 + 2 2 0.0 1.0 0.0 + 3 3 0.0 0.0 1.0 +""") + lmp = LMP.from_file(f) + atoms = lmp.get_atoms({1: 'Al+', 2: 7, 3: 8}) + + atoms.assert_equal(AtomCell({ + 'i': [1, 2, 3], + 'type': [1, 2, 3], + 'x': [1., 0., 0.], + 'y': [0., 1., 0.], + 'z': [0., 0., 1.], + 'elem': [13, 7, 8], + 'symbol': ['Al+', 'N', 'O'], + }, Cell(affine=AffineTransform3D().translate(-0.5, -0.5, -0.5), cell_size=[1., 1., 1.]), frame='local')) + + +def test_lmp_get_atoms_inferred(): + f = StringIO(""" +3 atoms +3 atom types + +Atom Type Labels + 1 Al+ + 2 N + 3 O + +Atoms + 1 Al+ 1.0 0.0 0.0 + 2 N 0.0 1.0 0.0 + 3 3 0.0 0.0 1.0 +""") + lmp = LMP.from_file(f) + atoms = lmp.get_atoms() + + atoms.assert_equal(AtomCell({ + 'i': [1, 2, 3], + 'type': [1, 2, 3], + 'x': [1., 0., 0.], + 'y': [0., 1., 0.], + 'z': [0., 0., 1.], + 'elem': [13, 7, 8], + 'symbol': ['Al+', 'N', 'O'], + }, Cell(affine=AffineTransform3D().translate(-0.5, -0.5, -0.5), cell_size=[1., 1., 1.]), frame='local')) + + +def test_lmp_get_atoms_missing_type_map(): + f = StringIO("""This should fail because we can't infer elements from atom types +3 atoms +1 atom types + +Atoms + + 1 1 0.0 0.0 0.0 + 2 1 0.0 0.0 0.0 + 3 1 0.0 0.0 0.0 +""") + + lmp = LMP.from_file(f) + + with pytest.raises(ValueError, match=re.escape( + "Failed to auto-detect elements from type labels. Please pass 'type_map' explicitly" + )): + lmp.get_atoms() + + +def test_lmp_get_atoms_invalid_type_map(): + f = StringIO(""" +3 atoms +1 atom types + +Atoms - 0.0000000 4.0000000 xlo xhi - 0.0000000 6.0000000 ylo yhi - 0.0000000 10.0000000 zlo zhi + 1 1 0.0 0.0 0.0 + 2 1 0.0 0.0 0.0 + 3 1 0.0 0.0 0.0 +""") - 0.0000000 0.0000000 0.0000000 xy xz yz + lmp = LMP.from_file(f) -Masses + with pytest.raises(ValueError, match=re.escape( + "Invalid type map" + )): + lmp.get_atoms({1: "El"}) - 1 1.0080000 # H - 2 10.8100004 # B - 3 39.9500008 # Ar + +def test_lmp_get_atoms_missing_type_in_map(): + f = StringIO(""" +4 atoms +4 atom types + +Atoms + + 1 1 0.0 0.0 0.0 + 2 2 0.0 0.0 0.0 + 3 3 0.0 0.0 0.0 + 4 4 0.0 0.0 0.0 +""") + + lmp = LMP.from_file(f) + + with pytest.raises(ValueError, match=re.escape( + "Missing type_map specification for atom type(s): 3, 4" + )): + print(lmp.get_atoms({1: "Al", 2: "N"})) + + + +def test_lmp_get_atoms_unreadable_labels(): + f = StringIO(""" +4 atoms +2 atom types + +Atom Type Labels + +1 Au +2 El Atoms # atomic - 1 2 0.0000000 2.0000000 2.0000000 - 2 1 5.0000000 5.0000000 5.0000000 - 3 3 8.0000000 9.0000000 9.0000000 + 1 1 0.0 0.0 0.0 + 2 1 0.0 0.0 0.0 + 3 2 0.0 0.0 0.0 + 4 2 0.0 0.0 0.0 +""") + + lmp = LMP.from_file(f) + + with pytest.raises(ValueError, match=re.escape( + "Failed to auto-detect elements from type labels. Please pass 'type_map' explicitly" + )): + print(lmp.get_atoms()) + + +def test_lmp_get_atoms_missing_label(): + f = StringIO(""" +4 atoms +3 atom types + +Atom Type Labels + 1 Al+ + 2 N + 3 O + +Atoms + 1 Al+ 1.0 0.0 0.0 + 2 Fail 0.0 1.0 0.0 + 3 3 0.0 0.0 1.0 + 4 2 0.0 0.0 1.0 +""") + + lmp = LMP.from_file(f) + + with pytest.raises(ValueError, match=re.escape( + 'While parsing section Atoms: Unknown atom label or invalid atom type' + )): + print(lmp.get_atoms()) + + +def test_lmp_get_atoms_wrong_style(): + f = StringIO(""" +4 atoms +2 atom types + +Atoms # charge + + 1 1 1.0 0.0 0.0 0.0 + 2 1 1.0 0.0 0.0 0.0 + 3 2 -1.0 0.0 0.0 0.0 + 4 2 -1.0 0.0 0.0 0.0 +""") + + lmp = LMP.from_file(f) + + with pytest.raises(ValueError, match=re.escape( + "Only 'atomic' atom_style is supported, instead got 'charge'" + )): + print(lmp.get_atoms({1: "Na", 2: "Cl"})) + + +def test_lmp_failed_join_velocity(): + f = StringIO(""" +4 atoms +2 atom types + +Atom Type Labels + 1 Al + 2 N + +Atoms # atomic + + 1 1 1.0 0.0 0.0 0.0 + 2 1 1.0 0.0 0.0 0.0 + 3 2 -1.0 0.0 0.0 0.0 + 4 2 -1.0 0.0 0.0 0.0 Velocities - 1 1.0000000 2.0000000 2.0000000 - 2 -1.0000000 4.0000000 2.0000000 - 3 2.0000000 -2.0000000 -1.0000000 -""" + 2 0.0 0.0 0.0 + 3 0.0 0.0 0.0 + 5 0.0 0.0 0.0 + 6 0.0 0.0 0.0 +""") + + lmp = LMP.from_file(f) + + with pytest.raises(ValueError, match=re.escape( + "Missing velocities for 2/4 atoms" + )): + print(lmp.get_atoms()) \ No newline at end of file diff --git a/atomlib/io/util.py b/atomlib/io/util.py new file mode 100644 index 0000000..d12147c --- /dev/null +++ b/atomlib/io/util.py @@ -0,0 +1,60 @@ + +from itertools import islice +from io import TextIOBase +import typing as t + +import polars + + +def parse_file_whitespace_separated( + f: TextIOBase, schema: t.Dict[str, t.Type[polars.DataType]], *, + n_rows: t.Optional[int] = None, start_row: int = 0, + allow_extra_cols: bool = False, + allow_initial_whitespace: bool = True, + allow_comment: bool = True +) -> polars.DataFrame: + start_pos = f.tell() + + def make_row_gen() -> t.Iterator[str]: + f.seek(start_pos) + for line in f if n_rows is None else islice(f, n_rows): + yield line + + return _parse_rows_whitespace_separated( + make_row_gen, schema, start_row=start_row, + allow_extra_cols=allow_extra_cols, + allow_initial_whitespace=allow_initial_whitespace, + allow_comment=allow_comment + ) + + +def _parse_rows_whitespace_separated( + make_row_gen: t.Callable[[], t.Iterator[str]], + schema: t.Dict[str, t.Type[polars.DataType]], *, + start_row: int = 0, + allow_extra_cols: bool = False, + allow_initial_whitespace: bool = True, + allow_comment: bool = True +) -> polars.DataFrame: + + regex = "".join(( + "^", + r"\s*" if allow_initial_whitespace else "", + r"\s+".join(rf"(?<{col}>\S+)" for col in schema.keys()), + '' if allow_extra_cols else (r'\s*#' if allow_comment else r'\s*$') + )) + + df = polars.LazyFrame({'s': make_row_gen()}, schema={'s': polars.Utf8}).select(polars.col('s').str.extract_groups(regex)).unnest('s').collect() + + if len(df.lazy().filter(polars.col(df.columns[0]).is_null()).first().collect()): + # failed to match + first_failed_row = df.lazy().with_row_index('i').filter(polars.col(df.columns[0]).is_null()).first().collect()['i'][0] + row = next(islice(make_row_gen(), first_failed_row, None)) + if allow_comment: + row = row.split('#', maxsplit=1)[0] + raise ValueError(f"At line {start_row + first_failed_row}: Failed to parse line '{row}'. Expected {len(schema)} columns.") + + # TODO better error handling here + df = df.select(polars.col(col).cast(dtype) for (col, dtype) in schema.items()) + + return df \ No newline at end of file diff --git a/atomlib/mixins.py b/atomlib/mixins.py index 9597dfd..f1e8d68 100644 --- a/atomlib/mixins.py +++ b/atomlib/mixins.py @@ -11,7 +11,7 @@ from .atoms import HasAtomsT from .atomcell import HasAtomCell as _HasAtomCell - from .io import CIF, CIFDataBlock, XYZ, XYZFormat, XSF, CFG, FileType, FileOrPath + from .io import CIF, CIFDataBlock, XYZ, XYZFormat, XSF, CFG, LMP, FileType, FileOrPath from .io.mslice import MSliceFile, BinaryFileOrPath else: @@ -92,6 +92,12 @@ def read_cfg(cls: t.Type[HasAtomsT], f: t.Union[FileOrPath, CFG]) -> HasAtomsT: from .io import read_cfg return _cast_atoms(read_cfg(f), cls) + @classmethod + def read_lmp(cls: t.Type[HasAtomsT], f: t.Union[FileOrPath, LMP], type_map: t.Optional[t.Dict[int, t.Union[str, int]]] = None) -> HasAtomsT: + """Read a structure from a LAAMPS data file.""" + from .io import read_lmp + return _cast_atoms(read_lmp(f, type_map=type_map), cls) + def write_cif(self, f: FileOrPath): from .io import write_cif write_cif(self, f) diff --git a/atomlib/util.py b/atomlib/util.py index 4a99862..2771721 100644 --- a/atomlib/util.py +++ b/atomlib/util.py @@ -10,6 +10,7 @@ import numpy from numpy.typing import NDArray +import polars from .types import ParamSpec, Concatenate @@ -145,7 +146,29 @@ def proc_seed(seed: t.Optional[object], entropy: object) -> t.Optional[NDArray[n return numpy.frombuffer(state.digest(), dtype=numpy.uint32) +class CheckedJoinError(Exception): + def __init__(self, missing_keys: t.Sequence[t.Any]): + self.missing_keys: t.Tuple[t.Any, ...] = tuple(missing_keys) + super().__init__() + + def __str__(self) -> str: + return f"Missing match for key(s): '{', '.join(map(repr, self.missing_keys))}'" + + +def checked_left_join(lhs: polars.DataFrame, rhs: polars.DataFrame, on: t.Optional[str] = None, *, + left_on: t.Optional[str] = None, right_on: t.Optional[str] = None) -> polars.DataFrame: + df = lhs.join(rhs, how='inner', on=on, left_on=left_on, right_on=right_on, validate='m:1') + + if len(df) < len(lhs): + missing_rows = lhs.join(rhs, how='anti', on=on, left_on=left_on, right_on=right_on) + col = t.cast(str, left_on or on) + missing = missing_rows.select(polars.col(col).unique()).to_series() + raise CheckedJoinError(tuple(missing)) + + return df + + __all__ = [ 'open_file', 'open_file_binary', 'FileOrPath', 'BinaryFileOrPath', - 'opt_classmethod', 'localtime', 'map_some', 'proc_seed', + 'opt_classmethod', 'localtime', 'map_some', 'proc_seed', 'checked_left_join', ] diff --git a/tests/baseline_files/AlN_out.lmp b/tests/baseline_files/AlN_out.lmp index 365db8d..4793897 100644 --- a/tests/baseline_files/AlN_out.lmp +++ b/tests/baseline_files/AlN_out.lmp @@ -2,13 +2,16 @@ 4 atoms 2 atom types - 2.0000000 5.1300000 xlo xhi 0.0000000 2.7106595 ylo yhi -5.0000000 0.0200000 zlo zhi - -1.5650000 0.0000000 0.0000000 xy xz yz +Atom Type Labels + + 10 Al + 20 N + Masses 10 5.0000000 # Al diff --git a/tests/baseline_files/AlN_roundtrip.lmp b/tests/baseline_files/AlN_roundtrip.lmp new file mode 100644 index 0000000..9d8d980 --- /dev/null +++ b/tests/baseline_files/AlN_roundtrip.lmp @@ -0,0 +1,25 @@ +# Generated by atomlib on 2024-04-02 13:01:30-04:00 + + 4 atoms + 2 atom types + 2.0000000 5.1300000 xlo xhi + 0.0000000 2.7106595 ylo yhi + -5.0000000 0.0200000 zlo zhi + -1.5650000 0.0000000 0.0000000 xy xz yz + +Atom Type Labels + + 10 Al + 20 N + +Masses + + 10 5.0000000 # Al + 20 10.0000000 # N + +Atoms # charge + + 1 10 0.0 3.5650000 0.9031460 -2.4951000 + 2 10 0.0 2.0000000 1.8062910 0.0133770 + 3 20 0.0 3.5650000 0.9031460 -0.5815030 + 4 20 0.0 2.0000000 1.8062910 -3.0899800 diff --git a/tests/input_files/labeled.lmp b/tests/input_files/labeled.lmp new file mode 100644 index 0000000..973e61e --- /dev/null +++ b/tests/input_files/labeled.lmp @@ -0,0 +1,32 @@ +# Generated by atomlib on 2024-04-02 13:01:30-04:00 + + 4 atoms + 2 atom types + 0.0000000 3.1300000 xlo xhi + 0.0000000 2.7106595 ylo yhi + 0.0000000 5.0200000 zlo zhi + -1.5650000 0.0000000 0.0000000 xy xz yz + +Atom Type Labels + + 10 Al1 + 20 N2 + +Masses + + 10 5.0000000 + 20 10.0000000 + +Atoms # atomic + + 1 10 0.0 0.0 0.0 + 3 20 0.0 0.0 1.0 + 4 20 1.0 0.0 1.0 + 2 10 0.0 1.0 0.0 + +Velocities + +4 1.0 0.0 1.0 +2 0.0 1.0 0.0 +1 0.0 0.0 0.0 +3 0.0 0.0 1.0 \ No newline at end of file diff --git a/tests/test_io.py b/tests/test_io.py index e369ab0..ffbcc65 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -233,4 +233,28 @@ def test_lmp_write_aln(s: StringIO, aln): .transform(AffineTransform3D.translate([2., 0., -5.]) .rotate([0.125, 0.582, -1.20], 29. * numpy.pi/180.)) - cell.write_lmp(s) \ No newline at end of file + cell.write_lmp(s) + + +@check_equals_file('AlN_roundtrip.lmp', skip_lines=1) +def test_lmp_roundtrip(s: StringIO): + path = OUTPUT_PATH / 'AlN_roundtrip.lmp' + lmp = LMP.from_file(path) + lmp.write(s) + + +@check_parse_structure('labeled.lmp') +def test_lmp_labeled(aln_ortho): + return AtomCell.from_ortho(Atoms({ + 'elem': [7, 13, 13, 7], + 'type': [20, 10, 10, 20], + 'symbol': ['N2', 'Al1', 'Al1', 'N2'], + 'mass': [10., 5., 5., 10.], + 'x': [1., 0., 0., 0.], + 'y': [0., 1., 0., 0.], + 'z': [1., 0., 0., 1.], + 'v_x': [1., 0., 0., 0.], + 'v_y': [0., 1., 0., 0.], + 'v_z': [1., 0., 0., 1.], + 'i': [4, 2, 1, 3], + }), aln_ortho, frame='local') \ No newline at end of file