Skip to content

Commit

Permalink
Improve interim results by include additional mapsets (#399)
Browse files Browse the repository at this point in the history
* Add possibility to configure pattern to save addional mapsets in the interim resutls

Co-authored-by: anikaweinmann <[email protected]>
  • Loading branch information
anikaweinmann and anikaweinmann authored Dec 14, 2022
1 parent be5299e commit 955175f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 0 deletions.
14 changes: 14 additions & 0 deletions job_resumption.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ The interim results will be deleted automatically if a job resource is successfu
- Startup actinia with above config in preferred way, e.g.
`cd ~/repos/actinia` + press F5

## Additional mapsets
For parallelization on different regions some GRASS GIS processes might create
additional mapsets and use the data from these mapsets in further calculations
without copying them to the temporary mapsets. To add the possibility to also
resumpt jobs where such addional mapsets are created in a previous step you can
configure additional mapsets which should be included in the interim results
by setting a pattern for the mapset name, e.g.:
```
[MISC]
save_interim_results = onError
save_interim_results_endpoints_cfg = /etc/default/actinia_interim_endpoints.csv
include_additional_mapset_pattern = test_tmp_*
```


## Job resumption examples
```
Expand Down
12 changes: 12 additions & 0 deletions src/actinia_core/core/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def __init__(self):
"AsyncEphemeralExportResource".lower(): "AsyncEphemeralExportResource",
"AsyncPersistentResource".lower(): "AsyncPersistentResource",
}
self.INCLUDE_ADDITIONAL_MAPSET_PATTERN = None

"""
LOGGING
Expand Down Expand Up @@ -649,6 +650,11 @@ def write(self, path=DEFAULT_CONFIG_PATH):
"INTERIM_SAVING_ENDPOINTS",
str(self.INTERIM_SAVING_ENDPOINTS),
)
config.set(
"MISC",
"INCLUDE_ADDITIONAL_MAPSET_PATTERN",
str(self.INCLUDE_ADDITIONAL_MAPSET_PATTERN),
)

config.add_section("LOGGING")
config.set("LOGGING", "LOG_INTERFACE", self.LOG_INTERFACE)
Expand Down Expand Up @@ -913,6 +919,12 @@ def read(self, path=DEFAULT_CONFIG_PATH):
self.INTERIM_SAVING_ENDPOINTS.update(
endpoints_dict
)
if config.has_option(
"MISC", "INCLUDE_ADDITIONAL_MAPSET_PATTERN"
):
self.INCLUDE_ADDITIONAL_MAPSET_PATTERN = config.get(
"MISC", "INCLUDE_ADDITIONAL_MAPSET_PATTERN"
)

if config.has_section("LOGGING"):
if config.has_option("LOGGING", "LOG_INTERFACE"):
Expand Down
60 changes: 60 additions & 0 deletions src/actinia_core/core/interim_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import os
import subprocess
import shutil
from fnmatch import filter
from .messages_logger import MessageLogger
from actinia_core.core.common.config import global_config, DEFAULT_CONFIG_PATH
from actinia_core.core.common.exceptions import RsyncError
Expand Down Expand Up @@ -83,6 +84,9 @@ def __init__(self, user_id, resource_id, iteration, endpoint):
self.iteration = iteration if iteration is not None else 1
self.old_pc_step = None
self.endpoint = endpoint
self.include_additional_mapset_pattern = (
global_config.INCLUDE_ADDITIONAL_MAPSET_PATTERN
)

def set_old_pc_step(self, old_pc_step):
"""Set method for the number of the successfully finished steps of
Expand Down Expand Up @@ -197,6 +201,29 @@ def _compare_sha512sums_of_folders(self, folder1, folder2):
else:
return False

def rsync_additional_mapsets(self, dest_path):
"""Using rsync to update additional mapsets from interim results to
temporary mapset
Args:
dest_path (str): Path to destination folder where the additional
mapset should be saved
"""

src_path = (
f"{self._get_interim_mapset_path(self.old_pc_step)}_add_mapsets"
)
if not os.path.isdir(src_path):
return

for mapset in os.listdir(src_path):
src = os.path.join(src_path, mapset)
dest = os.path.join(dest_path, mapset)
rsync_status = self.rsync_mapsets(src, dest)
if rsync_status != "success":
self.logger.info(
f"Syncing additional mapset <{mapset}> failed."
)

def rsync_mapsets(self, src, dest):
"""Using rsync to update the mapset folder.
Args:
Expand Down Expand Up @@ -278,6 +305,25 @@ def _get_interim_path(self):
self.resource_id,
)

def _get_included_additional_mapset_paths(
self, temp_mapset_path, progress_step
):
"""Returns lists with source paths of hte additional mapsets and
destination paths for them"""

if self.include_additional_mapset_pattern:
pattern = self.include_additional_mapset_pattern
tmp_path = os.path.dirname(temp_mapset_path)
dest_path = (
f"{self._get_interim_mapset_path(progress_step)}_add_mapsets"
)
mapsets = filter(os.listdir(tmp_path), pattern)
srcs = [os.path.join(tmp_path, mapset) for mapset in mapsets]
dests = [os.path.join(dest_path, mapset) for mapset in mapsets]
return srcs, dests
else:
return [], []

def _get_interim_mapset_path(self, progress_step):
"""Returns path where the interim mapset is saved"""
return os.path.join(
Expand Down Expand Up @@ -309,6 +355,12 @@ def save_interim_results(
self.logger.info("Saving interim results of step %d" % progress_step)
dest_mapset = self._get_interim_mapset_path(progress_step)
dest_tmpdir = self._get_interim_tmpdir_path(progress_step)
addm_src, addm_dest = self._get_included_additional_mapset_paths(
temp_mapset_path, progress_step
)

if temp_mapset_path is None:
return

if progress_step == 1 or force_copy is True:
# copy temp mapset for first step
Expand All @@ -318,6 +370,8 @@ def save_interim_results(
"Maspset %s and temp_file_path %s are copied"
% (temp_mapset_path, temp_file_path)
)
for m_src, m_dest in zip(addm_src, addm_dest):
shutil.copytree(m_src, m_dest)
else:
old_dest_mapset = self._get_interim_mapset_path(progress_step - 1)
old_dest_tmpdir = self._get_interim_tmpdir_path(progress_step - 1)
Expand All @@ -330,3 +384,9 @@ def save_interim_results(
self._saving_folder(
temp_file_path, dest_tmpdir, old_dest_tmpdir, progress_step
)
# saving additional mapsets
_, old_dests = self._get_included_additional_mapset_paths(
temp_mapset_path, progress_step - 1
)
for m_src, m_dest, old_dest in zip(addm_src, addm_dest, old_dests):
self._saving_folder(m_src, m_dest, old_dest, progress_step)
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ def _create_temporary_mapset(
"Error while rsyncing of interim results to new temporare "
"mapset"
)
self.interim_result.rsync_additional_mapsets(
os.path.dirname(self.temp_mapset_path)
)
if interim_result_file_path:
self.message_logger.info(
"Rsync interim result file path to temporary GRASS DB"
Expand Down

0 comments on commit 955175f

Please sign in to comment.