Skip to content

Commit

Permalink
Merge branch 'develop' into feature/surface-heights-and-thermistor-de…
Browse files Browse the repository at this point in the history
…pths
  • Loading branch information
BaptisteVandecrux authored Aug 15, 2024
2 parents 2818bcb + cba8fd9 commit 885fae3
Show file tree
Hide file tree
Showing 17 changed files with 2,181 additions and 2,168 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:

jobs:
build:
test:
name: unit_test
runs-on: ubuntu-latest
strategy:
Expand All @@ -19,6 +19,9 @@ jobs:
uses: actions/checkout@v3
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: Install eccodes
run : |
sudo apt-get install -y libeccodes-dev
- name: Install dependencies
shell: bash
run: |
Expand All @@ -30,4 +33,4 @@ jobs:
- name: Run unit tests
shell: bash
run: |
python3 -m unittest discover tests.e2e
python3 -m unittest discover tests
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
package_data={
"pypromice.tx": ["payload_formats.csv", "payload_types.csv"],
"pypromice.qc.percentiles": ["thresholds.csv"],
"pypromice.postprocess": ["station_configurations.toml", "positions_seed.csv"],
"pypromice.postprocess": ["positions_seed.csv"],
},
install_requires=['numpy~=1.23', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'Bottleneck', 'netcdf4', 'pyDataverse==0.3.1', 'eccodes', 'scikit-learn>=1.1.0'],
# extras_require={'postprocess': ['eccodes','scikit-learn>=1.1.0']},
Expand All @@ -48,6 +48,8 @@
'make_metadata_csv = pypromice.postprocess.make_metadata_csv:main',
'get_watsontx = pypromice.tx.get_watsontx:get_watsontx',
'get_bufr = pypromice.postprocess.get_bufr:main',
'create_bufr_files = pypromice.postprocess.create_bufr_files:main',
'bufr_to_csv = pypromice.postprocess.bufr_to_csv:main',
'get_msg = pypromice.tx.get_msg:get_msg'
],
},
Expand Down
7 changes: 6 additions & 1 deletion src/pypromice/postprocess/bufr_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

from pypromice.postprocess.bufr_utilities import read_bufr_file

if __name__ == "__main__":

def main():
parser = argparse.ArgumentParser("BUFR to CSV converter")
parser.add_argument("path", type=Path)
args = parser.parse_args()

print(read_bufr_file(args.path).to_csv())


if __name__ == "__main__":
main()
93 changes: 78 additions & 15 deletions src/pypromice/postprocess/bufr_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def round(value: float):

return round


# Enforce precision
# Note the sensor accuracies listed here:
# https://essd.copernicus.org/articles/13/3819/2021/#section8
Expand All @@ -64,28 +65,82 @@ class BUFRVariables:
* heightOfSensorAboveLocalGroundOrDeckOfMarinePlatformWSPD: Corresponds to "#7#heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform" which is height if anemometer relative to ground or deck of marine platform.
"""
wmo_id: str

# Station type: "mobile" or "land"
# ===============================
# Fixed land station schema: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307080
# Mobile station schema: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307090

station_type: str

# WMO station identifier
# Land stations: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/301090
# Mobile stations: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/301092
# ======================================================================================================
wmo_id: str
timestamp: datetime.datetime
relativeHumidity: float = attrs.field(converter=round_converter(0))
airTemperature: float = attrs.field(converter=round_converter(1))
pressure: float = attrs.field(converter=round_converter(1))
windDirection: float = attrs.field(converter=round_converter(0))
windSpeed: float = attrs.field(converter=round_converter(1))
latitude: float = attrs.field(converter=round_converter(6))
longitude: float = attrs.field(converter=round_converter(6))

# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/005001
# Scale: 5, unit: degrees
# TODO: Test if eccodes does the rounding as well. The rounding is was 6 which is larger that the scale.
latitude: float = attrs.field(converter=round_converter(5))
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/006001
# Scale: 5, unit: degrees
longitude: float = attrs.field(converter=round_converter(5))

# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007030
# Scale: 1, unit: m
heightOfStationGroundAboveMeanSeaLevel: float = attrs.field(
converter=round_converter(2)
converter=round_converter(1)
)
#
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007031
# Scale: 1, unit: m
heightOfBarometerAboveMeanSeaLevel: float = attrs.field(
converter=round_converter(2),
converter=round_converter(1),
)

# Pressure information
# ====================
# Definition table: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302031
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007004
# Scale: -1, unit: Pa
pressure: float = attrs.field(converter=round_converter(-1))
# There are two other pressure variables in the template: 302001 and 010062.

# Basic synoptic "instantaneous" data
# ===================================
# Definition table: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302035
# This section only include the temperature and humidity data (302032).
# Precipitation and cloud data are currently ignored.
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007032
# Scale: 2, unit: m
# This is the first appearance of this variable id.
heightOfSensorAboveLocalGroundOrDeckOfMarinePlatformTempRH: float = attrs.field(
converter=round_converter(4),
converter=round_converter(2),
)
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/012101
# Scale: 2, unit: K
airTemperature: float = attrs.field(converter=round_converter(2))
# There is also a Dewpoint temperature in this template: 012103 which is currently unused.
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/012103
# Scale: 0, unit: %
relativeHumidity: float = attrs.field(converter=round_converter(0))

# Basic synoptic "period" data
# ============================
# Definition table: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302043
# Wind data: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302042
# Wind direction: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/011001
# Scale: 0, unit: degrees
windDirection: float = attrs.field(converter=round_converter(0))
# Wind speed: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/011002
# Scale: 1, unit: m/s
windSpeed: float = attrs.field(converter=round_converter(1))
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007032
# Scale: 2, unit: m
# This is the 7th appearance of this variable id.
heightOfSensorAboveLocalGroundOrDeckOfMarinePlatformWSPD: float = attrs.field(
converter=round_converter(4)
converter=round_converter(2)
)

def as_series(self) -> pd.Series:
Expand Down Expand Up @@ -129,6 +184,7 @@ def __eq__(self, other: "BUFRVariables"):

BUFR_TEMPLATES = {
"mobile": {
# Template definition: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307090
"unexpandedDescriptors": (307090), # message template, "synopMobil"
"edition": 4, # latest edition
"masterTableNumber": 0,
Expand All @@ -144,6 +200,7 @@ def __eq__(self, other: "BUFRVariables"):
"compressedData": 0,
},
"land": {
# Template definition: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307080
"unexpandedDescriptors": (307080), # message template, "synopLand"
"edition": 4, # latest edition
"masterTableNumber": 0,
Expand Down Expand Up @@ -246,6 +303,11 @@ def set_station(ibufr, station_type: str, wmo_id: str):
elif station_type == "land":
# StationNumber for land stations are integeres
wmo_id_int = int(wmo_id)
if wmo_id_int >= 1024:
raise ValueError(
f"Invalid WMO ID {wmo_id}. Land station number must be less than 1024."
"See https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/001002"
)
station_config = dict(stationNumber=wmo_id_int)
else:
raise Exception(f"Unsupported station station type {station_type}")
Expand Down Expand Up @@ -485,5 +547,6 @@ def read_bufr_file(path: PathLike) -> pd.DataFrame:
message_vars = read_bufr_message(fp)
if message_vars is None:
break
lines.append(message_vars)
return pd.DataFrame(lines).rename_axis("message_index")
lines.append(message_vars.as_series())
data_frame = pd.DataFrame(lines).set_index("wmo_id")
return data_frame
178 changes: 178 additions & 0 deletions src/pypromice/postprocess/create_bufr_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import logging
from pathlib import Path
from typing import Sequence, List

import pandas as pd
from pypromice.station_configuration import load_station_configuration_mapping

from pypromice.postprocess.get_bufr import (
get_bufr,
DEFAULT_LIN_REG_TIME_LIMIT,
DEFAULT_POSITION_SEED_PATH,
)

main_logger = logging.getLogger(__name__)


def create_bufr_files(
input_files: Sequence[Path],
station_configuration_root: Path,
period_start: str,
period_end: str,
output_root: Path,
override: bool,
break_on_error: bool = False,
output_filename_suffix: str = "geus_",
):
"""
Generate hourly bufr files from the for all input files
Parameters
----------
input_files
Paths to csv l3 hourly data files
station_configuration_root
Root directory containing station configuration toml files
period_start
Datetime string for period start. Eg '2024-01-01T00:00' or '20240101
period_end
Datetime string for period end
output_root
Output dir for both bufr files for individual stations and compiled. Organized in two sub directories.
override
If False: Skip a period if the compiled output file exists.
break_on_error
If True: Stop processing if an error occurs
output_filename_suffix
Suffix for the compiled output file
"""
periods = pd.date_range(period_start, period_end, freq="H")
output_individual_root = output_root / "individual"
output_compiled_root = output_root / "compiled"
output_individual_root.mkdir(parents=True, exist_ok=True)
output_compiled_root.mkdir(parents=True, exist_ok=True)

station_configuration_mapping = load_station_configuration_mapping(
station_configuration_root,
skip_unexpected_fields=True,
)

for period in periods:
period: pd.Timestamp
date_str = period.strftime("%Y%m%dT%H%M")
main_logger.info(f"Processing {date_str}")
output_dir_path = output_individual_root / f"{date_str}"
output_file_path = (
output_compiled_root / f"{output_filename_suffix}{date_str}.bufr"
)

main_logger.info(f"{period}, {date_str}")
if override or not output_file_path.exists():
get_bufr(
bufr_out=output_dir_path,
input_files=input_files,
store_positions=False,
positions_filepath=None,
linear_regression_time_limit=DEFAULT_LIN_REG_TIME_LIMIT,
timestamps_pickle_filepath=None,
target_timestamp=period,
station_configuration_mapping=station_configuration_mapping,
positions_seed_path=DEFAULT_POSITION_SEED_PATH,
break_on_error=break_on_error,
)

with output_file_path.open("wb") as fp_dst:
for src_path in output_dir_path.glob("*.bufr"):
with src_path.open("rb") as fp_src:
fp_dst.write(fp_src.read())
else:
main_logger.info(f"Output file exists. Skipping {output_file_path}")


# %%


def main():
import argparse
import glob
import sys

logger_format_string = "%(asctime)s; %(levelname)s; %(name)s; %(message)s"
logging.basicConfig(
level=logging.ERROR,
stream=sys.stdout,
format=logger_format_string,
)

main_handler = logging.StreamHandler(sys.stdout)
main_handler.setLevel(logging.INFO)
formatter = logging.Formatter(logger_format_string)
main_handler.setFormatter(formatter)
main_logger.addHandler(main_handler)
main_logger.setLevel(logging.INFO)

parser = argparse.ArgumentParser("Create BUFR files from L3 tx .csv files.")
parser.add_argument(
"--input_files",
"--l3-filepath",
"-i",
type=Path,
nargs="+",
required=True,
help="Path to L3 tx .csv files. Can be direct paths or glob patterns",
)
parser.add_argument(
"--period_start",
"-s",
required=True,
help="Datetime string for period start. Eg '2024-01-01T00:00' or '20240101",
)
parser.add_argument(
"--period_end", "-e", required=True, help="Datetime string for period end"
)
parser.add_argument(
"--output_root",
"-o",
required=True,
type=Path,
help="Output dir for both bufr files for individual stations and compiled. Organized in two sub directories.",
)
parser.add_argument(
"--station_configuration_root",
"-c",
required=True,
type=Path,
help="Root directory containing station configuration toml files",
)
parser.add_argument(
"--override",
"-f",
default=False,
action="store_true",
help="Recreate and overide existing output files",
)
args = parser.parse_args()

# Interpret all input file paths as glob patterns if they don't exist
input_files: List[Path] = list()
for path in args.input_files:
if path.exists():
input_files.append(path)
else:
# The input path might be a glob pattern
input_files += map(Path, glob.glob(path.as_posix()))

main_logger.info(f"Processing {len(input_files)} input files")
create_bufr_files(
input_files=input_files,
period_start=args.period_start,
period_end=args.period_end,
output_root=args.output_root,
override=args.override,
station_configuration_root=args.station_configuration_root,
)


if __name__ == "__main__":
main()
Loading

0 comments on commit 885fae3

Please sign in to comment.