diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 1f9d9b92..75a9439b 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -74,6 +74,11 @@ class ScratchEnsemble(object): off to gain more fined tuned control. manifest: dict or filename to use for manifest. If filename, it must be a yaml-file that will be parsed to a single dict. + batch (dict): List of functions (load_*) that + should be run at time of initialization for each realization. + Each element is a length 1 dictionary with the function name to run as + the key and each keys value should be the function arguments as a dict. + """ def __init__( @@ -85,6 +90,7 @@ def __init__( runpathfilter=None, autodiscovery=True, manifest=None, + batch=None, ): self._name = ensemble_name # ensemble name self._realizations = {} # dict of ScratchRealization objects, @@ -130,13 +136,13 @@ def __init__( # Search and locate minimal set of files # representing the realizations. count = self.add_realizations( - paths, realidxregexp, autodiscovery=autodiscovery + paths, realidxregexp, autodiscovery=autodiscovery, batch=batch ) if isinstance(runpathfile, str) and runpathfile: - count = self.add_from_runpathfile(runpathfile, runpathfilter) + count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch) if isinstance(runpathfile, pd.DataFrame) and not runpathfile.empty: - count = self.add_from_runpathfile(runpathfile, runpathfilter) + count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch) if manifest: # The _manifest variable is set using a property decorator @@ -206,7 +212,9 @@ def _shortcut2path(keys, shortpath): # calling function handle further errors. return shortpath - def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): + def add_realizations( + self, paths, realidxregexp=None, autodiscovery=True, batch=None + ): """Utility function to add realizations to the ensemble. Realizations are identified by their integer index. @@ -221,6 +229,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): to file system. Absolute or relative paths. autodiscovery (boolean): whether files can be attempted auto-discovered + batch (list): Batch commands sent to each realization. Returns: count (int): Number of realizations successfully added. @@ -237,7 +246,10 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): count = 0 for realdir in globbedpaths: realization = ScratchRealization( - realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + batch=batch, ) if realization.index is None: logger.critical( @@ -253,7 +265,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): logger.info("add_realizations() found %d realizations", len(self._realizations)) return count - def add_from_runpathfile(self, runpath, runpathfilter=None): + def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None): """Add realizations from a runpath file typically coming from ERT. @@ -270,6 +282,7 @@ def add_from_runpathfile(self, runpath, runpathfilter=None): a Pandas DataFrame parsed from a runpath file runpathfilter (str). A filter which each filepath has to match in order to be included. Default None which means not filter + batch (list): Batch commands to be sent to each realization. Returns: int: Number of successfully added realizations. @@ -300,7 +313,10 @@ def add_from_runpathfile(self, runpath, runpathfilter=None): continue logger.info("Adding realization from %s", row["runpath"]) realization = ScratchRealization( - row["runpath"], index=int(row["index"]), autodiscovery=False + row["runpath"], + index=int(row["index"]), + autodiscovery=False, + batch=batch, ) # Use the ECLBASE from the runpath file to # ensure we recognize the correct UNSMRY file @@ -859,6 +875,26 @@ def drop(self, localpath, **kwargs): except ValueError: pass # Allow localpath to be missing in some realizations + def process_batch(self, batch=None): + """Process a list of functions to run/apply + + This is equivalent to calling each function individually + but this enables more efficient concurrency. It is meant + to be used for functions that modifies the realization + object, not for functions that returns a dataframe already. + + Args: + batch (list): Each list element is a dictionary with one key, + being a function names, value pr key is a dict with keyword + arguments to be supplied to each function. + Returns: + ScratchEnsemble: This ensemble object (self), for it + to be picked up by ProcessPoolExecutor and pickling. + """ + for realization in self._realizations.values(): + realization.process_batch(batch) + return self + def apply(self, callback, **kwargs): """Callback functionalty, apply a function to every realization diff --git a/src/fmu/ensemble/ensembleset.py b/src/fmu/ensemble/ensembleset.py index e99af1d6..8a2de982 100644 --- a/src/fmu/ensemble/ensembleset.py +++ b/src/fmu/ensemble/ensembleset.py @@ -54,7 +54,10 @@ class EnsembleSet(object): autodiscovery: boolean, sent to initializing Realization objects, instructing them on whether certain files should be auto-discovered. - + batch (dict): List of functions (load_*) that + should be run at time of initialization for each realization. + Each element is a length 1 dictionary with the function name to run as + the key and each keys value should be the function arguments as a dict. """ def __init__( @@ -67,6 +70,7 @@ def __init__( iterregexp=None, batchregexp=None, autodiscovery=True, + batch=None, ): self._name = name self._ensembles = {} # Dictionary indexed by each ensemble's name. @@ -101,6 +105,10 @@ def __init__( return if ensembles and isinstance(ensembles, list): + if batch: + logger.warning( + "Batch commands not procesed when loading finished ensembles" + ) for ensemble in ensembles: if isinstance(ensemble, (ScratchEnsemble, VirtualEnsemble)): self._ensembles[ensemble.name] = ensemble @@ -108,7 +116,6 @@ def __init__( logger.warning("Supplied object was not an ensemble") if not self._ensembles: logger.warning("No ensembles added to EnsembleSet") - if frompath: self.add_ensembles_frompath( frompath, @@ -116,6 +123,7 @@ def __init__( iterregexp, batchregexp, autodiscovery=autodiscovery, + batch=batch, ) if not self._ensembles: logger.warning("No ensembles added to EnsembleSet") @@ -124,7 +132,7 @@ def __init__( if not os.path.exists(runpathfile): logger.error("Could not open runpath file %s", runpathfile) raise IOError - self.add_ensembles_fromrunpath(runpathfile) + self.add_ensembles_fromrunpath(runpathfile, batch=batch) if not self._ensembles: logger.warning("No ensembles added to EnsembleSet") @@ -171,6 +179,7 @@ def add_ensembles_frompath( iterregexp=None, batchregexp=None, autodiscovery=True, + batch=None, ): """Convenience function for adding multiple ensembles. @@ -189,7 +198,10 @@ def add_ensembles_frompath( autodiscovery: boolean, sent to initializing Realization objects, instructing them on whether certain files should be auto-discovered. - + batch (dict): List of functions (load_*) that + should be run at time of initialization for each realization. + Each element is a length 1 dictionary with the function name to run as + the key and each keys value should be the function arguments as a dict. """ # Try to catch the most common use case and make that easy: if isinstance(paths, str): @@ -244,7 +256,7 @@ def add_ensembles_frompath( for path in globbedpaths: real = None iterr = None # 'iter' is a builtin.. - batch = None + batchname = None for path_comp in reversed(path.split(os.path.sep)): realmatch = re.match(realidxregexp, path_comp) if realmatch: @@ -258,9 +270,9 @@ def add_ensembles_frompath( for path_comp in reversed(path.split(os.path.sep)): batchmatch = re.match(batchregexp, path_comp) if batchmatch: - batch = str(itermatch.group(1)) + batchname = str(itermatch.group(1)) break - df_row = {"path": path, "real": real, "iter": iterr, "batch": batch} + df_row = {"path": path, "real": real, "iter": iterr, "batch": batchname} paths_df = paths_df.append(df_row, ignore_index=True) paths_df.fillna(value="Unknown", inplace=True) # Initialize ensemble objects for each iter found: @@ -282,10 +294,11 @@ def add_ensembles_frompath( pathsforiter, realidxregexp=realidxregexp, autodiscovery=autodiscovery, + batch=batch, ) self._ensembles[ens.name] = ens - def add_ensembles_fromrunpath(self, runpathfile): + def add_ensembles_fromrunpath(self, runpathfile, batch=None): """Add one or many ensembles from an ERT runpath file. autodiscovery is not an argument, it is by default set to False @@ -305,7 +318,10 @@ def add_ensembles_fromrunpath(self, runpathfile): # Make a runpath slice, and initialize from that: ens_runpath = runpath_df[runpath_df["iter"] == iterr] ens = ScratchEnsemble( - "iter-" + str(iterr), runpathfile=ens_runpath, autodiscovery=False + "iter-" + str(iterr), + runpathfile=ens_runpath, + autodiscovery=False, + batch=batch, ) self._ensembles[ens.name] = ens @@ -431,6 +447,23 @@ def drop(self, localpath, **kwargs): except ValueError: pass # Allow localpath to be missing in some ensembles. + def process_batch(self, batch=None): + """Process a list of functions to run/apply + + This is equivalent to calling each function individually + but this enables more efficient concurrency. It is meant + to be used for functions that modifies the realization + object, not for functions that returns a dataframe already. + + Args: + batch (list): Each list element is a dictionary with one key, + being a function names, value pr key is a dict with keyword + arguments to be supplied to each function. + """ + for ensemble in self._ensembles.values(): + if isinstance(ensemble, ScratchEnsemble): + ensemble.process_batch(batch) + def apply(self, callback, **kwargs): """Callback functionalty, apply a function to every realization diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 4d2e4a31..aaf56aa7 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -84,9 +84,15 @@ class ScratchRealization(object): override anything else. autodiscovery (boolean): whether the realization should try to auto-discover certain data (UNSMRY files in standard location) + batch (dict): List of functions (load_*) that + should be run at time of initialization. Each element is a + length 1 dictionary with the function name to run as the key + and each keys value should be the function arguments as a dict. """ - def __init__(self, path, realidxregexp=None, index=None, autodiscovery=True): + def __init__( + self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None + ): self._origpath = os.path.abspath(path) self.index = None self._autodiscovery = autodiscovery @@ -165,8 +171,55 @@ def __init__(self, path, realidxregexp=None, index=None, autodiscovery=True): if os.path.exists(os.path.join(abspath, "parameters.txt")): self.load_txt("parameters.txt") + if batch: + self.process_batch(batch) + logger.info("Initialized %s", abspath) + def process_batch(self, batch): + """Process a list of functions to run/apply + + This is equivalent to calling each function individually + but this enables more efficient concurrency. It is meant + to be used for functions that modifies the realization + object, not for functions that returns a dataframe already. + + Args: + batch (list): Each list element is a dictionary with one key, + being a function names, value pr key is a dict with keyword + arguments to be supplied to each function. + Returns: + ScratchRealization: This realization object (self), for it + to be picked up by ProcessPoolExecutor and pickling. + """ + assert isinstance(batch, list) + allowed_functions = [ + "apply", + "find_files", + "load_smry", + "load_txt", + "load_file", + "load_csv", + "load_status", + "load_scalar", + ] + for cmd in batch: + assert isinstance(cmd, dict) + assert len(cmd) == 1 + fn_name = list(cmd.keys())[0] + logger.info( + "Batch processing (#%d): %s with args %s", + self.index, + fn_name, + str(cmd[fn_name]), + ) + if fn_name not in allowed_functions: + logger.warning("process_batch skips illegal function: %s", fn_name) + continue + assert isinstance(cmd[fn_name], dict) + getattr(self, fn_name)(**cmd[fn_name]) + return self + def runpath(self): """Return the runpath ("root") of the realization diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index 01baf9ef..ee82c8c9 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -179,6 +179,43 @@ def test_reek001(tmp="TMP"): assert len(reekensemble.keys()) == keycount - 1 +def test_batch(): + """Test batch processing at time of object initialization""" + if "__file__" in globals(): + # Easen up copying test code into interactive sessions + testdir = os.path.dirname(os.path.abspath(__file__)) + else: + testdir = os.path.abspath(".") + + ens = ScratchEnsemble( + "reektest", + testdir + "/data/testensemble-reek001/" + "realization-*/iter-0", + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + ], + ) + assert len(ens.get_df("npv.txt")) == 5 + assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490 + assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25 + + # Also possible to batch process afterwards: + ens = ScratchEnsemble( + "reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" + ) + ens.process_batch( + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + ], + ) + assert len(ens.get_df("npv.txt")) == 5 + assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490 + assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25 + + def test_emptyens(): """Check that we can initialize an empty ensemble""" ens = ScratchEnsemble("emptyens") @@ -251,7 +288,6 @@ def test_reek001_scalars(): assert isinstance(npv, pd.DataFrame) assert "REAL" in npv assert "npv.txt" in npv # filename is the column name - print(npv) assert len(npv) == 5 assert npv.dtypes["REAL"] == int assert npv.dtypes["npv.txt"] == object diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py index 122c52b2..1cbad066 100644 --- a/tests/test_ensembleset.py +++ b/tests/test_ensembleset.py @@ -223,6 +223,33 @@ def rms_vol2df(kwargs): assert isinstance(ensset4["iter-0"], ScratchEnsemble) assert isinstance(ensset4["iter-1"], ScratchEnsemble) + # Try the batch command feature: + ensset5 = EnsembleSet( + "reek001", + frompath=ensdir, + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + ], + ) + assert len(ensset5.get_df("npv.txt")) == 10 + assert len(ensset5.get_df("unsmry--yearly")) == 50 + assert len(ensset5.get_df("unsmry--daily")) == 10980 + + # Try batch processing after initialization: + ensset6 = EnsembleSet("reek001", frompath=ensdir) + ensset6.process_batch( + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + ], + ) + assert len(ensset5.get_df("npv.txt")) == 10 + assert len(ensset5.get_df("unsmry--yearly")) == 50 + assert len(ensset5.get_df("unsmry--daily")) == 10980 + # Delete the symlink and leftover from apply-testing when we are done. for real_dir in glob.glob(ensdir + "/realization-*"): if not SKIP_FMU_TOOLS: diff --git a/tests/test_realization.py b/tests/test_realization.py index 91935634..baf3d815 100644 --- a/tests/test_realization.py +++ b/tests/test_realization.py @@ -27,7 +27,7 @@ SKIP_FMU_TOOLS = True fmux = etc.Interaction() -logger = fmux.basiclogger(__name__, level="WARNING") +logger = fmux.basiclogger(__name__, level="INFO") if not fmux.testsetup(): raise SystemExit() @@ -160,6 +160,29 @@ def test_single_realization(): real.load_csv("bogus.csv") +def test_batch(): + """Test batch processing at time of object initialization""" + if "__file__" in globals(): + # Easen up copying test code into interactive sessions + testdir = os.path.dirname(os.path.abspath(__file__)) + else: + testdir = os.path.abspath(".") + + realdir = os.path.join(testdir, "data/testensemble-reek001", "realization-0/iter-0") + real = ensemble.ScratchRealization( + realdir, + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + {"illegal-ignoreme": {}}, + ], + ) + assert real.get_df("npv.txt") == 3444 + assert len(real.get_df("unsmry--daily")["FOPR"]) > 2 + assert len(real.get_df("unsmry--yearly")["FOPT"]) > 2 + + def test_volumetric_rates(): """Test computation of volumetric rates from cumulative vectors""" @@ -802,6 +825,21 @@ def rms_vol2df(kwargs): ) assert real.get_df("geogrid--oil")["STOIIP_OIL"].sum() > 0 + # Run rms_vol2df in batch when initializing: + real = ensemble.ScratchRealization( + realdir, + batch=[ + { + "apply": { + "callback": rms_vol2df, + "filename": "share/results/volumes/" + "geogrid_vol_oil_1.txt", + "localpath": "share/results/volumes/geogrid--oil.csv", + } + } + ], + ) + assert real.get_df("geogrid--oil")["STOIIP_OIL"].sum() > 0 + def test_drop(): """Test the drop functionality, where can delete