Skip to content

Commit

Permalink
feat: add JSON definition file parsing for #310
Browse files Browse the repository at this point in the history
  • Loading branch information
tsutterley committed Jul 15, 2024
1 parent 10bdc56 commit ab602f5
Show file tree
Hide file tree
Showing 10 changed files with 830 additions and 110 deletions.
198 changes: 193 additions & 5 deletions pyTMD/io/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
UPDATE HISTORY:
Updated 07/2024: added new FES2022 and FES2022_load to list of models
added JSON format for model definition files
Updated 05/2024: make subscriptable and allow item assignment
Updated 04/2024: append v-components of velocity only to netcdf format
Updated 11/2023: revert TPXO9-atlas currents changes to separate dicts
Expand Down Expand Up @@ -44,6 +45,7 @@
import re
import io
import copy
import json
import pathlib

class model:
Expand Down Expand Up @@ -1337,31 +1339,56 @@ def pathfinder(self, model_file: str | pathlib.Path | list):
# return the complete output path
return output_file

def from_file(self, definition_file: str | pathlib.Path | io.IOBase):
def from_file(self,
definition_file: str | pathlib.Path | io.IOBase,
format: str = 'ascii'
):
"""
Create a model object from an input definition file
Parameters
----------
definition_file: str, pathlib.Path or io.IOBase
model definition file for creating model object
format: str
format of the input definition file
- ``'ascii'`` for tab-delimited definition file
- ``'json'`` for JSON formatted definition file
"""
# variable with parameter definitions
parameters = {}
# Opening definition file and assigning file ID number
if isinstance(definition_file, io.IOBase):
fid = copy.copy(definition_file)
else:
definition_file = pathlib.Path(definition_file).expanduser()
fid = definition_file.open(mode="r", encoding='utf8')
# load and parse definition file type
if (format.lower() == 'ascii'):
self.from_ascii(fid)
elif (format.lower() == 'json'):
self.from_json(fid)
# close the definition file
fid.close()
# return the model object
return self

def from_ascii(self, fid: io.IOBase):
"""
Load and parse tab-delimited definition file
Parameters
----------
fid: io.IOBase
open definition file object
"""
# variable with parameter definitions
parameters = {}
# for each line in the file will extract the parameter (name and value)
for fileline in fid:
# Splitting the input line between parameter name and value
part = fileline.rstrip().split(maxsplit=1)
# filling the parameter definition variable
parameters[part[0]] = part[1]
# close the parameter file
fid.close()
# convert from dictionary to model variable
temp = self.from_dict(parameters)
# verify model name, format and type
Expand Down Expand Up @@ -1539,6 +1566,167 @@ def from_file(self, definition_file: str | pathlib.Path | io.IOBase):
# return the model parameters
return temp

def from_json(self, fid: io.IOBase):
"""
Load and parse JSON definition file
Parameters
----------
fid: io.IOBase
open definition file object
"""
# load JSON file
parameters = json.load(fid)
# convert from dictionary to model variable
temp = self.from_dict(parameters)
# verify model name, format and type
assert temp.name
assert temp.format in ('OTIS','ATLAS','TMD3','netcdf','GOT','FES')
assert temp.type
assert temp.model_file
# split model file into list if an ATLAS, GOT or FES file
# model files can be comma, tab or space delimited
# extract full path to tide model files
# extract full path to tide grid file
if temp.format in ('OTIS','ATLAS','TMD3'):
assert temp.grid_file
# check if grid file is relative
if (temp.directory is not None):
temp.grid_file = temp.directory.joinpath(temp.grid_file).resolve()
else:
temp.grid_file = pathlib.Path(temp.grid_file).expanduser()
# extract model files
if (temp.type == ['u','v']) and (temp.directory is not None):
# use glob strings to find files in directory
for key, glob_string in temp.model_file.items():
temp.model_file[key] = list(temp.directory.glob(glob_string))
# attempt to extract model directory
try:
temp.model_directory = temp.model_file['u'][0].parent
except (IndexError, AttributeError) as exc:
message = f'No model files found with {glob_string}'
raise FileNotFoundError(message) from exc
elif (temp.type == 'z') and (temp.directory is not None):
# use glob strings to find files in directory
glob_string = copy.copy(temp.model_file)

temp.model_file = list(temp.directory.glob(glob_string))
# attempt to extract model directory
try:
temp.model_directory = temp.model_file[0].parent
except (IndexError, AttributeError) as exc:
message = f'No model files found with {glob_string}'
raise FileNotFoundError(message) from exc
elif (temp.type == ['u','v']) and isinstance(temp.model_file, dict):
# resolve paths to model files for each direction
for key, model_file in temp.model_file.items():
temp.model_file[key] = [pathlib.Path(f).expanduser() for f in
model_file]
# copy directory dictionaries
temp.model_directory = temp.model_file['u'][0].parent
elif (temp.type == 'z') and isinstance(temp.model_file, list):
# resolve paths to model files
temp.model_file = [pathlib.Path(f).expanduser() for f in
temp.model_file]
temp.model_directory = temp.model_file[0].parent
else:
# fully defined single file case
temp.model_file = pathlib.Path(temp.model_file).expanduser()
temp.model_directory = temp.model_file.parent
elif temp.format in ('netcdf',):
assert temp.grid_file
# check if grid file is relative
if (temp.directory is not None):
temp.grid_file = temp.directory.joinpath(temp.grid_file).resolve()
else:
temp.grid_file = pathlib.Path(temp.grid_file).expanduser()
# extract model files
if (temp.type == ['u','v']) and (temp.directory is not None):
# use glob strings to find files in directory
for key, glob_string in temp.model_file.items():
temp.model_file[key] = list(temp.directory.glob(glob_string))
# attempt to extract model directory
try:
temp.model_directory = temp.model_file['u'][0].parent
except (IndexError, AttributeError) as exc:
message = f'No model files found with {glob_string}'
raise FileNotFoundError(message) from exc
elif (temp.type == 'z') and (temp.directory is not None):
# use glob strings to find files in directory
glob_string = copy.copy(temp.model_file)
temp.model_file = list(temp.directory.glob(glob_string))
# attempt to extract model directory
try:
temp.model_directory = temp.model_file[0].parent
except (IndexError, AttributeError) as exc:
message = f'No model files found with {glob_string}'
raise FileNotFoundError(message) from exc
elif (temp.type == ['u','v']):
# resolve paths to model files for each direction
for key, model_file in temp.model_file.items():
temp.model_file[key] = [pathlib.Path(f).expanduser() for f in
model_file]
# copy to directory dictionaries
temp.model_directory = temp.model_file['u'][0].parent
elif (temp.type == 'z'):
# resolve paths to model files
temp.model_file = [pathlib.Path(f).expanduser() for f in
temp.model_file]
temp.model_directory = temp.model_file[0].parent
elif temp.format in ('FES','GOT'):
# extract model files
if (temp.type == ['u','v']) and (temp.directory is not None):
# use glob strings to find files in directory
for key, glob_string in temp.model_file.items():
temp.model_file[key] = list(temp.directory.glob(glob_string))
# build model directory dictionaries
temp.model_directory = {}
for key, val in temp.model_file.items():
# attempt to extract model directory
try:
temp.model_directory[key] = val[0].parent
except (IndexError, AttributeError) as exc:
message = f'No model files found with {glob_string[key]}'
raise FileNotFoundError(message) from exc
elif (temp.type == 'z') and (temp.directory is not None):
# use glob strings to find files in directory
glob_string = copy.copy(temp.model_file)

temp.model_file = list(temp.directory.glob(glob_string))
# attempt to extract model directory
try:
temp.model_directory = temp.model_file[0].parent
except (IndexError, AttributeError) as exc:
message = f'No model files found with {glob_string}'
elif (temp.type == ['u','v']):
# resolve paths to model files for each direction
for key, model_file in temp.model_file.items():
temp.model_file[key] = [pathlib.Path(f).expanduser() for f in
model_file]
# build model directory dictionaries
temp.model_directory = {}
for key, val in temp.model_file.items():
temp.model_directory[key] = val[0].parent
elif (temp.type == 'z'):
# resolve paths to model files
temp.model_file = [pathlib.Path(f).expanduser() for f in
temp.model_file]
temp.model_directory = temp.model_file[0].parent
# verify that projection attribute exists for projected models
if temp.format in ('OTIS','ATLAS','TMD3'):
assert temp.projection
# convert scale from string to float
if temp.format in ('netcdf','GOT','FES'):
assert temp.scale
# assert that FES model has a version
# get model constituents from constituent files
if temp.format in ('FES',):
assert temp.version
if (temp.constituents is None):
temp.parse_constituents()
# return the model parameters
return temp

def parse_constituents(self) -> list:
"""
Parses tide model files for a list of model constituents
Expand Down
77 changes: 77 additions & 0 deletions test/def_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
def_to_json.py (07/2024)
Converts a definition file to a json file
"""
import re
import json
import pathlib
import argparse

def read_definition_file(definition_file):
parameters = {}
fid = open(definition_file, 'r')
for fileline in fid:
# Splitting the input line between parameter name and value
part = fileline.rstrip().split(maxsplit=1)
# filling the parameter definition variable
parameters[part[0]] = part[1]
fid.close()
return parameters

# PURPOSE: create argument parser
def arguments():
parser = argparse.ArgumentParser(
description="""Converts a definition file to a json file"
""",
fromfile_prefix_chars="@"
)
# command line parameters
parser.add_argument('infile',
type=pathlib.Path, nargs='+',
help='Definition file to convert')
parser.add_argument('--verbose', '-v',
action='store_true',
help='Verbose output')
parser.add_argument('--cleanup', '-c',
action='store_true',
help='Remove original definition files')
return parser

def main():
# Read the system arguments listed after the program
parser = arguments()
args,_ = parser.parse_known_args()
# iterate over each input file
for definition_file in args.infile:
print(f'{definition_file} -->') if args.verbose else None
# Reading each definition file
parameters = read_definition_file(definition_file)
if re.search(r';', parameters['model_file']):
# split model into list of files for each direction
model_file_u, model_file_v = parameters['model_file'].split(';')
parameters['model_file'] = dict(
u=re.split(r'[\s\,]+', model_file_u),
v=re.split(r'[\s\,]+', model_file_v)
)
elif re.search(r',', parameters['model_file']):
# split model into list of files
parameters['model_file'] = re.split(r'[\s\,]+', parameters['model_file'])
if 'constituents' in parameters and re.search(r',', parameters['constituents']):
parameters['constituents'] = re.split(r'[\s\,]+', parameters['constituents'])
if 'type' in parameters and re.search(r',', parameters['type']):
parameters['type'] = re.split(r'[\s\,]+', parameters['type'])
if 'compressed' in parameters:
parameters['compressed'] = eval(parameters['compressed'])
if 'scale' in parameters:
parameters['scale'] = float(parameters['scale'])
# Writing the parameters to a json file
json_file = definition_file.with_suffix('.json')
print(f'\t{json_file}') if args.verbose else None
with open(json_file, 'w') as fid:
json.dump(parameters, fid, indent=4)
# Removing the definition file
if args.cleanup:
definition_file.unlink()

if __name__ == '__main__':
main()
10 changes: 10 additions & 0 deletions test/model_CATS2008.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"format": "OTIS",
"name": "CATS2008",
"model_file": "CATS2008/hf.CATS2008.out",
"grid_file": "CATS2008/grid_CATS2008",
"projection": "CATS2008",
"type": "z",
"variable": "tide_ocean",
"reference": "https://doi.org/10.15784/601235"
}
11 changes: 11 additions & 0 deletions test/model_FES2012.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"format": "FES",
"name": "FES2012",
"model_file": "fes2012/*_FES2012_SLEV.nc.gz",
"type": "z",
"version": "FES2012",
"variable": "tide_ocean",
"scale": 0.01,
"compressed": true,
"reference": "https://www.aviso.altimetry.fr/en/data/products/auxiliary-products/global-tide-fes.html"
}
Loading

0 comments on commit ab602f5

Please sign in to comment.