Skip to content

Commit

Permalink
Add batch processing function (#78)
Browse files Browse the repository at this point in the history
* Add batch processing function

* Forward batch command from ensembles and ensemblesets

* Batch processing after init on ensembles

* Demonstrate that batch works with apply
  • Loading branch information
berland authored Dec 20, 2019
1 parent 085916c commit e880a05
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 19 deletions.
50 changes: 43 additions & 7 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class ScratchEnsemble(object):
off to gain more fined tuned control.
manifest: dict or filename to use for manifest. If filename,
it must be a yaml-file that will be parsed to a single dict.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""

def __init__(
Expand All @@ -85,6 +90,7 @@ def __init__(
runpathfilter=None,
autodiscovery=True,
manifest=None,
batch=None,
):
self._name = ensemble_name # ensemble name
self._realizations = {} # dict of ScratchRealization objects,
Expand Down Expand Up @@ -130,13 +136,13 @@ def __init__(
# Search and locate minimal set of files
# representing the realizations.
count = self.add_realizations(
paths, realidxregexp, autodiscovery=autodiscovery
paths, realidxregexp, autodiscovery=autodiscovery, batch=batch
)

if isinstance(runpathfile, str) and runpathfile:
count = self.add_from_runpathfile(runpathfile, runpathfilter)
count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch)
if isinstance(runpathfile, pd.DataFrame) and not runpathfile.empty:
count = self.add_from_runpathfile(runpathfile, runpathfilter)
count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch)

if manifest:
# The _manifest variable is set using a property decorator
Expand Down Expand Up @@ -206,7 +212,9 @@ def _shortcut2path(keys, shortpath):
# calling function handle further errors.
return shortpath

def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
def add_realizations(
self, paths, realidxregexp=None, autodiscovery=True, batch=None
):
"""Utility function to add realizations to the ensemble.
Realizations are identified by their integer index.
Expand All @@ -221,6 +229,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
to file system. Absolute or relative paths.
autodiscovery (boolean): whether files can be attempted
auto-discovered
batch (list): Batch commands sent to each realization.
Returns:
count (int): Number of realizations successfully added.
Expand All @@ -237,7 +246,10 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
count = 0
for realdir in globbedpaths:
realization = ScratchRealization(
realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
if realization.index is None:
logger.critical(
Expand All @@ -253,7 +265,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
logger.info("add_realizations() found %d realizations", len(self._realizations))
return count

def add_from_runpathfile(self, runpath, runpathfilter=None):
def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
"""Add realizations from a runpath file typically
coming from ERT.
Expand All @@ -270,6 +282,7 @@ def add_from_runpathfile(self, runpath, runpathfilter=None):
a Pandas DataFrame parsed from a runpath file
runpathfilter (str). A filter which each filepath has to match
in order to be included. Default None which means not filter
batch (list): Batch commands to be sent to each realization.
Returns:
int: Number of successfully added realizations.
Expand Down Expand Up @@ -300,7 +313,10 @@ def add_from_runpathfile(self, runpath, runpathfilter=None):
continue
logger.info("Adding realization from %s", row["runpath"])
realization = ScratchRealization(
row["runpath"], index=int(row["index"]), autodiscovery=False
row["runpath"],
index=int(row["index"]),
autodiscovery=False,
batch=batch,
)
# Use the ECLBASE from the runpath file to
# ensure we recognize the correct UNSMRY file
Expand Down Expand Up @@ -859,6 +875,26 @@ def drop(self, localpath, **kwargs):
except ValueError:
pass # Allow localpath to be missing in some realizations

def process_batch(self, batch=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
Returns:
ScratchEnsemble: This ensemble object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
for realization in self._realizations.values():
realization.process_batch(batch)
return self

def apply(self, callback, **kwargs):
"""Callback functionalty, apply a function to every realization
Expand Down
51 changes: 42 additions & 9 deletions src/fmu/ensemble/ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ class EnsembleSet(object):
autodiscovery: boolean, sent to initializing Realization objects,
instructing them on whether certain files should be
auto-discovered.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""

def __init__(
Expand All @@ -67,6 +70,7 @@ def __init__(
iterregexp=None,
batchregexp=None,
autodiscovery=True,
batch=None,
):
self._name = name
self._ensembles = {} # Dictionary indexed by each ensemble's name.
Expand Down Expand Up @@ -101,21 +105,25 @@ def __init__(
return

if ensembles and isinstance(ensembles, list):
if batch:
logger.warning(
"Batch commands not procesed when loading finished ensembles"
)
for ensemble in ensembles:
if isinstance(ensemble, (ScratchEnsemble, VirtualEnsemble)):
self._ensembles[ensemble.name] = ensemble
else:
logger.warning("Supplied object was not an ensemble")
if not self._ensembles:
logger.warning("No ensembles added to EnsembleSet")

if frompath:
self.add_ensembles_frompath(
frompath,
realidxregexp,
iterregexp,
batchregexp,
autodiscovery=autodiscovery,
batch=batch,
)
if not self._ensembles:
logger.warning("No ensembles added to EnsembleSet")
Expand All @@ -124,7 +132,7 @@ def __init__(
if not os.path.exists(runpathfile):
logger.error("Could not open runpath file %s", runpathfile)
raise IOError
self.add_ensembles_fromrunpath(runpathfile)
self.add_ensembles_fromrunpath(runpathfile, batch=batch)
if not self._ensembles:
logger.warning("No ensembles added to EnsembleSet")

Expand Down Expand Up @@ -171,6 +179,7 @@ def add_ensembles_frompath(
iterregexp=None,
batchregexp=None,
autodiscovery=True,
batch=None,
):
"""Convenience function for adding multiple ensembles.
Expand All @@ -189,7 +198,10 @@ def add_ensembles_frompath(
autodiscovery: boolean, sent to initializing Realization objects,
instructing them on whether certain files should be
auto-discovered.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""
# Try to catch the most common use case and make that easy:
if isinstance(paths, str):
Expand Down Expand Up @@ -244,7 +256,7 @@ def add_ensembles_frompath(
for path in globbedpaths:
real = None
iterr = None # 'iter' is a builtin..
batch = None
batchname = None
for path_comp in reversed(path.split(os.path.sep)):
realmatch = re.match(realidxregexp, path_comp)
if realmatch:
Expand All @@ -258,9 +270,9 @@ def add_ensembles_frompath(
for path_comp in reversed(path.split(os.path.sep)):
batchmatch = re.match(batchregexp, path_comp)
if batchmatch:
batch = str(itermatch.group(1))
batchname = str(itermatch.group(1))
break
df_row = {"path": path, "real": real, "iter": iterr, "batch": batch}
df_row = {"path": path, "real": real, "iter": iterr, "batch": batchname}
paths_df = paths_df.append(df_row, ignore_index=True)
paths_df.fillna(value="Unknown", inplace=True)
# Initialize ensemble objects for each iter found:
Expand All @@ -282,10 +294,11 @@ def add_ensembles_frompath(
pathsforiter,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
self._ensembles[ens.name] = ens

def add_ensembles_fromrunpath(self, runpathfile):
def add_ensembles_fromrunpath(self, runpathfile, batch=None):
"""Add one or many ensembles from an ERT runpath file.
autodiscovery is not an argument, it is by default set to False
Expand All @@ -305,7 +318,10 @@ def add_ensembles_fromrunpath(self, runpathfile):
# Make a runpath slice, and initialize from that:
ens_runpath = runpath_df[runpath_df["iter"] == iterr]
ens = ScratchEnsemble(
"iter-" + str(iterr), runpathfile=ens_runpath, autodiscovery=False
"iter-" + str(iterr),
runpathfile=ens_runpath,
autodiscovery=False,
batch=batch,
)
self._ensembles[ens.name] = ens

Expand Down Expand Up @@ -431,6 +447,23 @@ def drop(self, localpath, **kwargs):
except ValueError:
pass # Allow localpath to be missing in some ensembles.

def process_batch(self, batch=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
"""
for ensemble in self._ensembles.values():
if isinstance(ensemble, ScratchEnsemble):
ensemble.process_batch(batch)

def apply(self, callback, **kwargs):
"""Callback functionalty, apply a function to every realization
Expand Down
55 changes: 54 additions & 1 deletion src/fmu/ensemble/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,15 @@ class ScratchRealization(object):
override anything else.
autodiscovery (boolean): whether the realization should try to
auto-discover certain data (UNSMRY files in standard location)
batch (dict): List of functions (load_*) that
should be run at time of initialization. Each element is a
length 1 dictionary with the function name to run as the key
and each keys value should be the function arguments as a dict.
"""

def __init__(self, path, realidxregexp=None, index=None, autodiscovery=True):
def __init__(
self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None
):
self._origpath = os.path.abspath(path)
self.index = None
self._autodiscovery = autodiscovery
Expand Down Expand Up @@ -165,8 +171,55 @@ def __init__(self, path, realidxregexp=None, index=None, autodiscovery=True):
if os.path.exists(os.path.join(abspath, "parameters.txt")):
self.load_txt("parameters.txt")

if batch:
self.process_batch(batch)

logger.info("Initialized %s", abspath)

def process_batch(self, batch):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
Returns:
ScratchRealization: This realization object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
assert isinstance(batch, list)
allowed_functions = [
"apply",
"find_files",
"load_smry",
"load_txt",
"load_file",
"load_csv",
"load_status",
"load_scalar",
]
for cmd in batch:
assert isinstance(cmd, dict)
assert len(cmd) == 1
fn_name = list(cmd.keys())[0]
logger.info(
"Batch processing (#%d): %s with args %s",
self.index,
fn_name,
str(cmd[fn_name]),
)
if fn_name not in allowed_functions:
logger.warning("process_batch skips illegal function: %s", fn_name)
continue
assert isinstance(cmd[fn_name], dict)
getattr(self, fn_name)(**cmd[fn_name])
return self

def runpath(self):
"""Return the runpath ("root") of the realization
Expand Down
38 changes: 37 additions & 1 deletion tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,43 @@ def test_reek001(tmp="TMP"):
assert len(reekensemble.keys()) == keycount - 1


def test_batch():
"""Test batch processing at time of object initialization"""
if "__file__" in globals():
# Easen up copying test code into interactive sessions
testdir = os.path.dirname(os.path.abspath(__file__))
else:
testdir = os.path.abspath(".")

ens = ScratchEnsemble(
"reektest",
testdir + "/data/testensemble-reek001/" + "realization-*/iter-0",
batch=[
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
],
)
assert len(ens.get_df("npv.txt")) == 5
assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490
assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25

# Also possible to batch process afterwards:
ens = ScratchEnsemble(
"reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0"
)
ens.process_batch(
batch=[
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
],
)
assert len(ens.get_df("npv.txt")) == 5
assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490
assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25


def test_emptyens():
"""Check that we can initialize an empty ensemble"""
ens = ScratchEnsemble("emptyens")
Expand Down Expand Up @@ -251,7 +288,6 @@ def test_reek001_scalars():
assert isinstance(npv, pd.DataFrame)
assert "REAL" in npv
assert "npv.txt" in npv # filename is the column name
print(npv)
assert len(npv) == 5
assert npv.dtypes["REAL"] == int
assert npv.dtypes["npv.txt"] == object
Expand Down
Loading

0 comments on commit e880a05

Please sign in to comment.