Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Convert NWP grib files to Zarr intermediate #357

Merged
merged 56 commits into from
Nov 16, 2021
Merged
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
10582cf
sketching out convert_NWP_grib_to_zarr.py
JackKelly Nov 9, 2021
c834c95
Experimenting with ways of loading NWPs in a notebook
JackKelly Nov 9, 2021
57d05b2
slight tweak to docstring
JackKelly Nov 9, 2021
19e6d6a
very early draft of NWP to zarr script. Auto generated from ipython …
JackKelly Nov 10, 2021
3964b40
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 10, 2021
7d1ff79
removing convert_NWP_grib_to_zarr.py
JackKelly Nov 11, 2021
b4cdbef
merge
JackKelly Nov 11, 2021
1442913
renaming convert_NWP_grib_to_zarr.py
JackKelly Nov 11, 2021
c4595f2
fix bug
JackKelly Nov 11, 2021
c96b67c
setting convert_NWP_grib_to_zarr.py to be executable
JackKelly Nov 11, 2021
54e99fe
fix bug
JackKelly Nov 11, 2021
2837bfc
Big speedup of load_grib_file by goign back to using idx files. Ramd…
JackKelly Nov 11, 2021
62344ac
Speed up reshape_1d_to_2d from 7 seconds to 0.5 seconds by pre-loading
JackKelly Nov 11, 2021
6457d72
New script. Should be a lot faster. Still single threaded.
JackKelly Nov 11, 2021
ea96e35
Chunk and compression and float16
JackKelly Nov 11, 2021
aae62ef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 11, 2021
8f29d07
remove break
JackKelly Nov 11, 2021
6495807
merge
JackKelly Nov 11, 2021
cadca3d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 11, 2021
d91929b
fix import of numcodecs
JackKelly Nov 11, 2021
59a4b25
merge
JackKelly Nov 11, 2021
bbe516f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 11, 2021
52eeed5
multiple processes for converting NWPs to Zarr.
JackKelly Nov 12, 2021
df15b5a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
7a1938c
Restart processing from where it last left off
JackKelly Nov 12, 2021
ea3a439
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
3497245
attempt to fix problem of using too much mem
JackKelly Nov 12, 2021
3491081
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
8d7a61f
Try letting each process save to zarr, using a chain of locks to guar…
JackKelly Nov 12, 2021
c24c7d5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
6f501c2
use map not imap
JackKelly Nov 12, 2021
e151772
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
8472071
revert. Again
JackKelly Nov 12, 2021
5dab321
revert
JackKelly Nov 12, 2021
0234aca
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
9971a0e
back to single thread for now
JackKelly Nov 12, 2021
ba9540b
merge
JackKelly Nov 12, 2021
9c30ba6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 12, 2021
76a4bcc
Start tidying up script. Revert back to using multiprocessing.Pool
JackKelly Nov 13, 2021
9dfd1da
Use Semaphore to attempt to prevent reader processes from getting ahe…
JackKelly Nov 13, 2021
52c836a
ignore ecCodes warning
JackKelly Nov 13, 2021
0403a06
filter eccodes logger warning
JackKelly Nov 13, 2021
df3b63e
print timings for passing data through multiprocessing.Queue. Answer…
JackKelly Nov 13, 2021
711b611
Try using ThreadPoolExecutor. Returns objects immediately. But over…
JackKelly Nov 13, 2021
d3de7cb
Finally got idea working with chain of locks. Seems like best so far
JackKelly Nov 13, 2021
a5eec95
search across all directories again
JackKelly Nov 13, 2021
cba7741
Write detailed description of how the code works
JackKelly Nov 13, 2021
793f96a
Tidy up code a lot. Use click.
JackKelly Nov 13, 2021
6aab090
Code is now quite tidy. Need to add some docstrings.
JackKelly Nov 13, 2021
d30e86c
convert_NWP_grib_to_zarr.py is finally done, I think
JackKelly Nov 13, 2021
d528994
remove old ipython notebook
JackKelly Nov 13, 2021
80420f4
Simplify logger formatting
JackKelly Nov 13, 2021
9b2ca71
Updating documentation
JackKelly Nov 15, 2021
d1f9353
Tweak comments and logging.
JackKelly Nov 15, 2021
b84487f
Change formatting of dataset.init_time in logging
JackKelly Nov 15, 2021
a0033b9
Make note of broken reject-regexs #389
JackKelly Nov 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Nov 11, 2021
commit bbe516f1d92fd5fcfa1c214c68c61037e63efecd
158 changes: 85 additions & 73 deletions scripts/convert_NWP_grib_to_zarr.py
Original file line number Diff line number Diff line change
@@ -4,33 +4,33 @@
# In[110]:


import datetime
import re
from pathlib import Path
from typing import Union

import cfgrib
import matplotlib.pyplot as plt
import numcodecs
import numpy as np
import pandas as pd
import re
import datetime
import xarray as xr
from typing import Union
import numcodecs


# This notebook is research for [GitHub issue #344: Convert NWP grib files to Zarr intermediate](https://github.com/openclimatefix/nowcasting_dataset/issues/344).
#
#
# Useful links:
#
#
# * [Met Office's data docs](http://cedadocs.ceda.ac.uk/1334/1/uk_model_data_sheet_lores1.pdf)
#
#
# Known differences between the old Zarr (UKV__2018-01_to_2019-12__chunks__variable10__init_time1__step1__x548__y704__.zarr) and the new zarr:
#
#
# * Images in the old zarr were top-to-bottom. Images in the new Zarr follow the ordering in the grib files: bottom-to-top.
# * The x and y coordinates are different by 1km each.
# * The new Zarr is float16?
# * The new Zarr has a few more variables.
#
#
# Done:
#
#
# * Merge Wholesale1 and 2 (2 includes dswrf, lcc, mcc, and hcc)
# * Remove dimensions we don't care about (e.g. select temperature at 1 meter, not 0 meters)
# * Reshape images from 1D to 2D.
@@ -43,10 +43,10 @@
# * Write to leonardo's SSD
# * Do we need to combine all the DataArrays into a single DataArray (with "variable" being a dimension?). The upside is that then a single Zarr chunk can include multiple variables. The downside is that we lose the metadata (but that's not a huge problem, maybe?)
# * Convert to float16?
#
#
# Some outstanding questions / Todo items
#
#
#
#
# * Experiment with compression
# * Load some data to make sure float16 is fine!
# * Separately log "bad files" to a CSV file?
@@ -55,8 +55,8 @@
# * Parallelise
# * Convert to script
# * Use click to set source and target directories
#
#
#
#

# In[65]:

@@ -70,7 +70,7 @@
DY_METERS = DX_METERS = 2_000
NORTH = 1223_000
SOUTH = -185_000
WEST = -239_000
WEST = -239_000
EAST = 857_000

# Note that the UKV NWPs y is top-to-bottom
@@ -86,7 +86,9 @@

# NWP_PATH = Path("/home/jack/Data/NWP/01")
# NWP_PATH = Path("/media/jack/128GB_USB/NWPs") # Must not include trailing slash!!!
NWP_PATH = Path("/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/UK_Met_Office/UKV/native")
NWP_PATH = Path(
"/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/UK_Met_Office/UKV/native"
)


# In[4]:
@@ -118,11 +120,11 @@

def grib_filename_to_datetime(full_filename: Path) -> datetime.datetime:
"""Parse the grib filename and return the datetime encoded in the filename.

Returns a datetime.
For example, if the filename is '202101010000_u1096_ng_umqv_Wholesale1.grib',
then the returned datetime will be datetime(year=2021, month=1, day=1, hour=0, minute=0).

Raises RuntimeError if the filename does not match the expected regex pattern.
"""
# Get the base_filename, which will be of the form '202101010000_u1096_ng_umqv_Wholesale1.grib'
@@ -139,18 +141,20 @@ def grib_filename_to_datetime(full_filename: Path) -> datetime.datetime:
# . matches any character.
# $ matches the end of the string.
regex_pattern_string = (
'^' # Match the beginning of the string.
'(?P<year>\d{4})' # Match the year.
'(?P<month>\d{2})' # Match the month.
'(?P<day>\d{2})' # Match the day.
'(?P<hour>\d{2})' # Match the hour.
'(?P<minute>\d{2})' # Match the minute.
'_u1096_ng_umqv_Wholesale\d\.grib$' # Match the end of the string (escape the fullstop).
"^" # Match the beginning of the string.
"(?P<year>\d{4})" # Match the year.
"(?P<month>\d{2})" # Match the month.
"(?P<day>\d{2})" # Match the day.
"(?P<hour>\d{2})" # Match the hour.
"(?P<minute>\d{2})" # Match the minute.
"_u1096_ng_umqv_Wholesale\d\.grib$" # Match the end of the string (escape the fullstop).
)
regex_pattern = re.compile(regex_pattern_string)
regex_match = regex_pattern.match(base_filename)
if regex_match is None:
raise RuntimeError(f"Filename '{full_filename}' does not conform to expected regex pattern '{regex_pattern_string}'!")
raise RuntimeError(
f"Filename '{full_filename}' does not conform to expected regex pattern '{regex_pattern_string}'!"
)

# Convert strings to ints:
regex_groups = {key: int(value) for key, value in regex_match.groupdict().items()}
@@ -163,32 +167,32 @@ def grib_filename_to_datetime(full_filename: Path) -> datetime.datetime:

def decode_and_group_grib_filenames(filenames: list[Path]) -> pd.Series:
"""Returns a pd.Series where the index is the datetime of the NWP init time.

And the values are the full_filename of each grib file.
"""
n_filenames = len(filenames)
nwp_init_datetimes = np.full(shape=n_filenames, fill_value=np.NaN, dtype='datetime64[ns]')
nwp_init_datetimes = np.full(shape=n_filenames, fill_value=np.NaN, dtype="datetime64[ns]")
for i, filename in enumerate(filenames):
nwp_init_datetimes[i] = grib_filename_to_datetime(filename)

# Swap index and values
map_datetime_to_filename = pd.Series(filenames, index=nwp_init_datetimes)

return map_datetime_to_filename.sort_index()


# In[23]:


def load_grib_file(full_filename: Union[Path, str], verbose: bool=False) -> xr.Dataset:
def load_grib_file(full_filename: Union[Path, str], verbose: bool = False) -> xr.Dataset:
"""Merges and loads all contiguous xr.Datasets from the grib file.

Removes unnecessary variables. Picks heightAboveGround=1meter for temperature.

Returns an xr.Dataset which has been loaded from disk. Loading from disk at this point
takes about 2 seconds for a 250 MB grib file, but speeds up reshape_1d_to_2d
from about 7 seconds to 0.5 seconds :)

Args:
full_filename: The full filename (including the path) of a single grib file.
verbose: If True then print out some useful debugging information.
@@ -207,35 +211,43 @@ def load_grib_file(full_filename: Union[Path, str], verbose: bool=False) -> xr.D

if verbose:
print("\nDataset", i, "before processing:\n", ds, "\n")

# For temperature, we want the temperature at 1 meter above ground,
# not at 0 meters above ground. The early NWPs (definitely in the 2016-03-22 NWPs),
# heightAboveGround only has 1 entry ("1" meter above ground) and `heightAboveGround` isn't set as a dimension for `t`.
# In later NWPs, 'heightAboveGround' has 2 values (0, 1) is a dimension for `t`.
if 't' in ds and 'heightAboveGround' in ds['t'].dims:
if "t" in ds and "heightAboveGround" in ds["t"].dims:
ds = ds.sel(heightAboveGround=1)

# Delete unnecessary variables.
vars_to_delete = [
'unknown', 'valid_time', 'heightAboveGround', 'heightAboveGroundLayer',
'atmosphere', 'cloudBase', 'surface', 'meanSea', 'level']
"unknown",
"valid_time",
"heightAboveGround",
"heightAboveGroundLayer",
"atmosphere",
"cloudBase",
"surface",
"meanSea",
"level",
]
for var_name in vars_to_delete:
try:
del ds[var_name]
except KeyError as e:
if verbose:
print('var name not in dataset:', e)
print("var name not in dataset:", e)
else:
if verbose:
print('Deleted', var_name)
print("Deleted", var_name)

if verbose:
print("\nDataset", i, "after processing:\n", ds, "\n")
print("**************************************************")

datasets_from_grib[i] = ds
del ds

merged_ds = xr.merge(datasets_from_grib)
del datasets_from_grib # Save memory
print("Loading...")
@@ -247,26 +259,27 @@ def load_grib_file(full_filename: Union[Path, str], verbose: bool=False) -> xr.D

def reshape_1d_to_2d(dataset: xr.Dataset) -> xr.Dataset:
"""Convert 1D into 2D array.

For each `step`, the pixel values in the grib files represent a 2D image. But, in the grib,
the values are in a flat 1D array (indexed by the `values` dimension).
The ordering of the pixels in the grib are left to right, bottom to top.

We reshape every data variable at once using this trick.
"""
# Adapted from https://stackoverflow.com/a/62667154

# Don't reshape yet. Instead just create new coordinates,
# which give the `x` and `y` position of each position in the `values` dimension:
dataset = dataset.assign_coords(
{
'x': ('values', np.tile(EASTING, reps=NUM_ROWS)),
'y': ('values', np.repeat(NORTHING, repeats=NUM_COLS))
})

"x": ("values", np.tile(EASTING, reps=NUM_ROWS)),
"y": ("values", np.repeat(NORTHING, repeats=NUM_COLS)),
}
)

# Now set "values" to be a MultiIndex, indexed by `y` and `x`:
dataset = dataset.set_index(values=("y", "x"))

# Now unstack. This gets rid of the `values` dimension and indexes
# the data variables using `y` and `x`.
return dataset.unstack("values")
@@ -288,7 +301,7 @@ def append_to_zarr(dataset: xr.Dataset, zarr_path: Union[str, Path]):
zarr_path = Path(zarr_path)
if zarr_path.exists():
to_zarr_kwargs = dict(
append_dim = "init_time",
append_dim="init_time",
)
else:
to_zarr_kwargs = dict(
@@ -298,28 +311,27 @@ def append_to_zarr(dataset: xr.Dataset, zarr_path: Union[str, Path]):
# https://github.com/pydata/xarray/issues/5969 and
# http://xarray.pydata.org/en/stable/user-guide/io.html#time-units
encoding={
'init_time': {
'units': 'nanoseconds since 1970-01-01'
},
"init_time": {"units": "nanoseconds since 1970-01-01"},
"UKV": {
"compressor": numcodecs.Blosc(cname="zstd", clevel=5),
}
"compressor": numcodecs.Blosc(cname="zstd", clevel=5),
},
},
)

dataset = (
dataset
.to_array(dim="variable", name="UKV")
dataset.to_array(dim="variable", name="UKV")
.to_dataset()
.astype(np.float16)
.rename({'time': 'init_time'})
.chunk({
"init_time": 1,
"step": 1,
"y": len(dataset.y) // 2,
"x": len(dataset.x) // 2,
"variable": -1
})
.rename({"time": "init_time"})
.chunk(
{
"init_time": 1,
"step": 1,
"y": len(dataset.y) // 2,
"x": len(dataset.x) // 2,
"variable": -1,
}
)
)

dataset.to_zarr(zarr_path, **to_zarr_kwargs)
@@ -331,7 +343,7 @@ def append_to_zarr(dataset: xr.Dataset, zarr_path: Union[str, Path]):
def load_grib_for_single_nwp_init_time(full_filenames: list[Path]) -> xr.Dataset:
datasets_for_nwp_init_datetime = []
for full_filename in full_filenames:
print('Opening', full_filename)
print("Opening", full_filename)
try:
dataset_for_filename = load_grib_file(full_filename)
except EOFError as e:
@@ -369,18 +381,18 @@ def load_grib_for_single_nwp_init_time(full_filenames: list[Path]) -> xr.Dataset
if n_filenames != 2:
print(n_filenames, "filenames found for", nwp_init_datetime_utc, ". Expected 2. Skipping!")
continue

# Load files.
full_filenames = map_datetime_to_grib_filename[nwp_init_datetime_utc]
ds = load_grib_for_single_nwp_init_time(full_filenames)

# Save to Zarr.
if ds is not None:
print("Saving to Zarr...")
append_to_zarr(
ds,
ds,
# "/home/jack/data/nwp.zarr"
"/mnt/storage_ssd/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/UK_Met_Office/UKV/zarr/test.zarr",
)

print("Done!")