Skip to content

Commit

Permalink
Relocated add_to_rmg_libraries from main into a new libraries util
Browse files Browse the repository at this point in the history
Extracted functions and added a race condition handler
  • Loading branch information
alongd committed Jun 15, 2024
1 parent 67ff5fa commit 55c2e3c
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 114 deletions.
128 changes: 14 additions & 114 deletions t3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,15 @@
from arkane import Arkane
from rmgpy import settings as rmg_settings
from rmgpy.chemkin import load_chemkin_file
from rmgpy.data.kinetics import KineticsLibrary
from rmgpy.data.kinetics.library import LibraryReaction
from rmgpy.data.thermo import ThermoLibrary
from rmgpy.exceptions import (ChemicallySignificantEigenvaluesError,
ChemkinError,
ModifiedStrongCollisionError,
NetworkError,
)
from rmgpy.kinetics import Arrhenius, KineticsData
from rmgpy.reaction import Reaction
from rmgpy.rmg.pdep import PDepReaction
from rmgpy.species import Species
from rmgpy.thermo import NASAPolynomial, NASA, ThermoData, Wilhoit

from arc.common import (get_number_with_ordinal_indicator,
get_ordinal_indicator,
Expand All @@ -54,6 +50,7 @@
from t3.runners.rmg_runner import rmg_runner
from t3.schema import InputBase
from t3.simulate.factory import simulate_factory
from t3.utils.libraries import add_to_rmg_libraries
from t3.utils.writer import write_pdep_network_file, write_rmg_input_file

RMG_THERMO_LIB_BASE_PATH = os.path.join(rmg_settings['database.directory'], 'thermo', 'libraries')
Expand Down Expand Up @@ -382,11 +379,13 @@ def set_paths(self,
'ARC thermo lib': os.path.join(iteration_path, 'ARC', 'output', 'RMG libraries', 'thermo',
f"{self.qm['project'] if 'project' in self.qm else self.project}.py"),
'ARC kinetics lib': os.path.join(iteration_path, 'ARC', 'output', 'RMG libraries', 'kinetics'),
'RMG T3 thermo lib': os.path.join(project_directory, 'Libraries', f"{self.t3['options']['library_name']}.py"),
'RMG T3 kinetics lib': os.path.join(project_directory, 'Libraries', f"{self.t3['options']['library_name']}"),
'shared RMG T3 thermo lib': os.path.join(RMG_THERMO_LIB_BASE_PATH, f"{self.t3['options']['shared_library_name']}.py")
'T3 thermo lib': os.path.join(project_directory, 'Libraries', f"{self.t3['options']['library_name']}.py"),
'T3 kinetics lib': os.path.join(project_directory, 'Libraries', f"{self.t3['options']['library_name']}"),
'shared T3 thermo lib': os.path.join(self.t3['options']['external_library_path'] or RMG_THERMO_LIB_BASE_PATH,
f"{self.t3['options']['shared_library_name']}.py")
if self.t3['options']['shared_library_name'] is not None else None,
'shared RMG T3 kinetics lib': os.path.join(RMG_KINETICS_LIB_BASE_PATH, f"{self.t3['options']['shared_library_name']}")
'shared T3 kinetics lib': os.path.join(self.t3['options']['external_library_path'] or RMG_KINETICS_LIB_BASE_PATH,
f"{self.t3['options']['shared_library_name']}")
if self.t3['options']['shared_library_name'] is not None else None,
}

Expand Down Expand Up @@ -572,7 +571,10 @@ def process_arc_run(self):
)
if len(converged_spc_keys) or len(converged_rxn_keys):
# we calculated something, add to thermo/kinetic library
self.add_to_rmg_libraries()
add_to_rmg_libraries(library_name=self.t3['options']['library_name'],
shared_library_name=self.t3['options']['shared_library_name'],
paths=self.paths,
logger=self.logger)
# clear the calculated objects from self.qm:
self.qm['species'], self.qm['reactions'] = list(), list()
self.dump_species_and_reactions()
Expand All @@ -598,9 +600,9 @@ def run_rmg(self, restart_rmg: bool = False):
"""
self.logger.info(f'Running RMG (tolerance = {self.get_current_rmg_tol()}, iteration {self.iteration})...')

# Use the RMG T3 libraries only if they exist and not already in use.
# Use the T3 libraries only if they exist and not already in use.
for token in ['thermo', 'kinetics']:
t3_lib, shared_rmg_lib = self.paths[f'RMG T3 {token} lib'], self.paths[f'shared RMG T3 {token} lib']
t3_lib, shared_rmg_lib = self.paths[f'T3 {token} lib'], self.paths[f'shared T3 {token} lib']
if shared_rmg_lib is not None:
self.add_library_to_rmg_run(library_name=shared_rmg_lib, library_type=token)
self.add_library_to_rmg_run(library_name=t3_lib, library_type=token)
Expand Down Expand Up @@ -1346,108 +1348,6 @@ def add_reaction(self,
self.reactions[rxn_key]['reasons'].append(reason)
return None

def add_to_rmg_libraries(self):
"""
Creates RMG libraries in the RMG database repository if they don't already exist,
and appends with the respective entries from the libraries generated by ARC.
Todo:
Tests kinetics libraries.
"""
# 1. Thermo:
arc_thermo_lib_path = self.paths['ARC thermo lib']
rmg_t3_thermo_lib_path = self.paths['RMG T3 thermo lib']
local_context = {
'ThermoData': ThermoData,
'Wilhoit': Wilhoit,
'NASAPolynomial': NASAPolynomial,
'NASA': NASA,
}
if os.path.isfile(arc_thermo_lib_path) and os.path.isfile(rmg_t3_thermo_lib_path):
# This thermo library already exists in the RMG database: Load it, append new entries, and save.
rmg_thermo_lib, arc_thermo_lib = ThermoLibrary(), ThermoLibrary()
rmg_thermo_lib.load(path=rmg_t3_thermo_lib_path, local_context=local_context, global_context=dict())
arc_thermo_lib.load(path=arc_thermo_lib_path, local_context=local_context, global_context=dict())
arc_description = arc_thermo_lib.long_desc
description_to_append = '\n'
append = False
for line in arc_description.splitlines():
if 'Overall time since project initiation' in line:
append = False
if append:
description_to_append += line + '\n'
if 'Considered the following' in line:
append = True
rmg_thermo_lib.long_desc += description_to_append
for entry in arc_thermo_lib.entries.values():
entry_species = Species(molecule=[entry.item])
entry_species.generate_resonance_structures(keep_isomorphic=False, filter_structures=True)
for existing_entry in rmg_thermo_lib.entries.values():
if entry_species.is_isomorphic(existing_entry.item):
if entry.label != existing_entry.label:
self.logger.warning(f"Not adding species {entry.label} to the "
f"{self.t3['options']['library_name']} thermo library, "
f"the species seems to already exist under the label "
f"{existing_entry.label}.")
break
rmg_thermo_lib.entries[entry.label] = entry
rmg_thermo_lib.save(path=rmg_t3_thermo_lib_path)
else:
# This thermo library doesn't exist in the RMG database: Just copy the library generated by ARC.
if os.path.isfile(arc_thermo_lib_path):
if not os.path.isdir(os.path.dirname(rmg_t3_thermo_lib_path)):
os.makedirs(os.path.dirname(rmg_t3_thermo_lib_path))
shutil.copy(arc_thermo_lib_path, rmg_t3_thermo_lib_path)

# 2. Kinetics:
arc_kinetics_lib_path = self.paths['ARC kinetics lib']
rmg_t3_kinetics_lib_path = self.paths['RMG T3 kinetics lib']
local_context = {
'KineticsData': KineticsData,
'Arrhenius': Arrhenius,
}
if os.path.isdir(arc_kinetics_lib_path) and os.path.isdir(rmg_t3_kinetics_lib_path):
# This kinetics library already exists (in the RMG database or locally): Load it, append entries, and save.
rmg_kinetics_lib, arc_kinetics_lib = KineticsLibrary(), KineticsLibrary()
rmg_kinetics_lib.load(path=rmg_t3_kinetics_lib_path, local_context=local_context, global_context=dict())
arc_kinetics_lib.load(path=arc_kinetics_lib_path, local_context=local_context, global_context=dict())
arc_description = arc_kinetics_lib.long_desc
description_to_append = '\n'
append = False
for line in arc_description.splitlines():
if 'Overall time since project initiation' in line:
append = False
if append:
description_to_append += line + '\n'
if 'Considered the following' in line:
append = True
rmg_kinetics_lib.long_desc += description_to_append
for entry in arc_kinetics_lib.entries.values():
entry_reaction = Reaction(reactants=entry.item.reactants[:],
products=entry.item.products[:],
specific_collider=entry.item.specific_collider,
kinetics=entry.data,
duplicate=entry.item.duplicate,
reversible=entry.item.reversible,
allow_pdep_route=entry.item.allow_pdep_route,
elementary_high_p=entry.item.elementary_high_p,
)
for existing_entry in rmg_kinetics_lib.entries.values():
if entry_reaction.is_isomorphic(existing_entry.item):
self.logger.warning(f"Not adding reaction {entry.label} to the "
f"{self.t3['options']['library_name']} kinetics library, "
f"the reaction seems to already exist under the label "
f"{existing_entry.label}.")
break
rmg_kinetics_lib.entries[entry.label] = entry
rmg_kinetics_lib.save(path=rmg_t3_kinetics_lib_path)
else:
# This kinetics library doesn't exist in the RMG database: Just copy the library generated by ARC.
if os.path.isfile(arc_kinetics_lib_path):
if not os.path.isdir(os.path.dirname(rmg_t3_kinetics_lib_path)):
os.makedirs(rmg_t3_kinetics_lib_path)
shutil.copy(arc_kinetics_lib_path, rmg_t3_kinetics_lib_path)

def dump_species_and_reactions(self):
"""
Dump self.species and self.reactions in case T3 needs to be restarted.
Expand Down Expand Up @@ -1492,7 +1392,7 @@ def add_library_to_rmg_run(self,
library_type: str,
) -> None:
"""
Add a library to the RMG run.
Add a T3-generated library to the RMG run.
Args:
library_name (str): The library name.
Expand Down
189 changes: 189 additions & 0 deletions t3/utils/libraries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""
t3 utils libraries module
for working with RMG thermo and kinetics libraries
"""

import datetime
import os
import shutil
import time

from typing import Dict, TYPE_CHECKING

from rmgpy.data.kinetics import KineticsLibrary
from rmgpy.data.thermo import ThermoLibrary
from rmgpy.kinetics import Arrhenius, KineticsData
from rmgpy.reaction import Reaction
from rmgpy.thermo import NASAPolynomial, NASA, ThermoData, Wilhoit
from rmgpy.species import Species

if TYPE_CHECKING:
from t3.logger import Logger


def add_to_rmg_libraries(library_name: str,
shared_library_name: str,
paths: Dict[str, str],
logger: 'Logger',
):
"""
Creates RMG libraries in the RMG database repository if they don't already exist,
and appends with the respective entries from the libraries generated by ARC.
Args:
library_name (str): The name of the RMG library.
shared_library_name (str): The name of an RMG database library shared between T3 projects.
paths (Dict[str, str]): T3's dictionary of paths.
logger (Logger): Instance of T3's Logger class.
"""
for token in ['thermo', 'kinetics']:
arc_lib_path, t3_lib_path, shared_lib_path = \
paths[f'ARC {token} lib'], paths[f'T3 {token} lib'], paths[f'shared T3 {token} lib']
if token == 'thermo':
local_context = {
'ThermoData': ThermoData,
'Wilhoit': Wilhoit,
'NASAPolynomial': NASAPolynomial,
'NASA': NASA,
}
else:
local_context = {
'KineticsData': KineticsData,
'Arrhenius': Arrhenius,
}
for to_lib_path, lib_name, race in zip([shared_lib_path, t3_lib_path],
[library_name, shared_library_name],
[True, False]):
if os.path.isfile(arc_lib_path) and to_lib_path is not None and os.path.isfile(to_lib_path):
append_to_rmg_library(library_name=lib_name,
from_lib_path=arc_lib_path,
to_lib_path=to_lib_path,
local_context=local_context,
lib_type=token,
logger=logger,
race=race,
)
else:
# The destination library (T3's or the shared) doesn't exist. Just copy the library generated by ARC.
if os.path.isfile(arc_lib_path) and to_lib_path is not None:
if not os.path.isdir(os.path.dirname(to_lib_path)):
os.makedirs(os.path.dirname(to_lib_path))
shutil.copy(arc_lib_path, to_lib_path)


def append_to_rmg_library(library_name: str,
from_lib_path: str,
to_lib_path: str,
local_context: dict,
lib_type: str,
logger: 'Logger',
race: bool = False,
):
"""
Append the entries from the ARC-generated library to an RMG library.
Args:
library_name (str): The name of the RMG library.
from_lib_path (str): The path to the ARC-generated library.
to_lib_path (str): The path to the RMG library to append to.
local_context (dict): The local context to use when loading the libraries.
lib_type (str): The type of the library (either 'thermo' or 'kinetics').
logger (Logger): Instance of T3's Logger class.
race (bool, optional): Whether to take measures to avoid a race condition when appending to the library.
"""
race_path = os.path.join(os.path.dirname(to_lib_path), f'{library_name}.race')
if race:
race_free = check_race_condition(race_path)
if not race_free:
logger.error(f'Could not write to library {to_lib_path} due to a race condition.\n'
f'Check whether it is safe to delete the {race_path} file to continue.')
return
from_lib, to_lib = (ThermoLibrary(), ThermoLibrary()) if lib_type == 'thermo' \
else (KineticsLibrary(), KineticsLibrary())
to_lib.load(path=to_lib_path, local_context=local_context, global_context=dict())
from_lib.load(path=from_lib_path, local_context=local_context, global_context=dict())
from_description = from_lib.long_desc
description_to_append = '\n'
append = False
for line in from_description.splitlines():
if 'Overall time since project initiation' in line:
append = False
if append:
description_to_append += line + '\n'
if 'Considered the following' in line:
append = True
to_lib.long_desc += description_to_append
for entry in from_lib.entries.values():
skip_entry = False
if lib_type == 'thermo':
entry_species = Species(molecule=[entry.item])
entry_species.generate_resonance_structures(keep_isomorphic=False, filter_structures=True)
for existing_entry in to_lib.entries.values():
if entry_species.is_isomorphic(existing_entry.item):
if entry.label != existing_entry.label:
logger.warning(f"Not adding species {entry.label} to the {library_name} thermo library, "
f"the species seems to already exist under the label {existing_entry.label}.")
skip_entry = True
break
elif lib_type == 'kinetics':
entry_reaction = Reaction(reactants=entry.item.reactants[:],
products=entry.item.products[:],
specific_collider=entry.item.specific_collider,
kinetics=entry.data,
duplicate=entry.item.duplicate,
reversible=entry.item.reversible,
allow_pdep_route=entry.item.allow_pdep_route,
elementary_high_p=entry.item.elementary_high_p,
)
for existing_entry in to_lib.entries.values():
if entry_reaction.is_isomorphic(existing_entry.item):
logger.warning(f"Not adding reaction {entry.label} to the {library_name} kinetics library, "
f"the reaction seems to already exist under the label {existing_entry.label}.")
skip_entry = True
break
if not skip_entry:
to_lib.entries[entry.label] = entry
to_lib.save(path=to_lib_path)
lift_race_condition(race_path)


def check_race_condition(race_path: str,
) -> bool:
"""
Check for a race condition and avoid one by creating a race holder file.
Args:
race_path (str): The path to the race file to check.
Returns:
bool: Whether there is no race condition and T3 may continue (True) or an unavoidable race exists (False).
"""
counter = 0
while os.path.isfile(race_path):
with open(race_path, 'r') as f:
content = f.read()
if content:
creation_date = content.split(' ')[-1]
creation_datetime = datetime.datetime.strptime(creation_date, "%H%M%S_%b%d_%Y")
time_delta = datetime.datetime.now() - creation_datetime
if time_delta.total_seconds() > 1000:
lift_race_condition(race_path)
return True
counter += 1
time.sleep(10)
if counter > 1000:
return False
with open(race_path, 'w') as f:
f.write(f'Race created at {datetime.datetime.now().strftime("%H%M%S_%b%d_%Y")}')
return True


def lift_race_condition(race_path: str) -> None:
"""
Lift the race condition by deleting the race holder file.
Args:
race_path (str): The path to the race file to check.
"""
if os.path.isfile(race_path):
os.remove(race_path)

0 comments on commit 55c2e3c

Please sign in to comment.