Skip to content

Commit

Permalink
Split into HRV and all other Zarr stores (#23)
Browse files Browse the repository at this point in the history
* Split into HRV and all other Zarr stores

* Add to script

* Remove call on import
  • Loading branch information
jacobbieker authored Oct 27, 2021
1 parent 24060f0 commit dc87680
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 26 deletions.
16 changes: 13 additions & 3 deletions satip/intermediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
def create_or_update_zarr_with_native_files(
directory: str,
zarr_path: str,
hrv_zarr_path: str,
region: str,
spatial_chunk_size: int = 256,
temporal_chunk_size: int = 1,
Expand Down Expand Up @@ -49,11 +50,12 @@ def create_or_update_zarr_with_native_files(
# Check if zarr already exists
if not zarr_exists:
# Inital zarr path before then appending
dataset = native_wrapper((compressed_native_files[0], region))
dataset, hrv_dataset = native_wrapper((compressed_native_files[0], region))
save_dataset_to_zarr(dataset, zarr_path=zarr_path, zarr_mode="w")
save_dataset_to_zarr(hrv_dataset, zarr_path=hrv_zarr_path, zarr_mode="w")

pool = multiprocessing.Pool(processes=4)
for dataset in pool.imap_unordered(
for dataset, hrv_dataset in pool.imap_unordered(
native_wrapper,
zip(
compressed_native_files[1:] if not zarr_exists else compressed_native_files,
Expand All @@ -67,11 +69,19 @@ def create_or_update_zarr_with_native_files(
x_size_per_chunk=spatial_chunk_size,
y_size_per_chunk=spatial_chunk_size,
timesteps_per_chunk=temporal_chunk_size,
channel_chunk_size=11
)
save_dataset_to_zarr(
hrv_dataset,
zarr_path=hrv_zarr_path,
x_size_per_chunk=spatial_chunk_size,
y_size_per_chunk=spatial_chunk_size,
timesteps_per_chunk=temporal_chunk_size,
channel_chunk_size=1
)
del dataset


def native_wrapper(filename_and_area):
filename, area = filename_and_area
return load_native_to_dataset(filename, area)

100 changes: 78 additions & 22 deletions satip/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import pandas as pd

from typing import Union, Any
from typing import Union, Any, Tuple

import os
import subprocess
import zarr
import xarray as xr
import numpy as np
from satpy import Scene
from pathlib import Path
import datetime
Expand Down Expand Up @@ -47,7 +48,7 @@ def decompress(full_bzip_filename: Path, temp_pth: Path) -> str:
return full_nat_filename


def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, None]:
def load_native_to_dataset(filename: Path, area: str) -> Union[Tuple[xr.DataArray, xr.DataArray], Tuple[None, None]]:
"""
Load compressed native files into an Xarray dataset, resampling to the same grid for the HRV channel,
and replacing small chunks of NaNs with interpolated values, and add a time coordinate
Expand All @@ -58,17 +59,66 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non
Returns:
Returns Xarray DataArray if script worked, else returns None
"""
compressor = Compressor()
hrv_compressor = Compressor(variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595]))
compressor = Compressor(mins=np.array(
[
-2.5118103,
-64.83977,
63.404694,
2.844452,
199.10002,
-17.254883,
-26.29155,
-1.1009827,
-2.4184198,
199.57048,
198.95093,
]
),
maxs=np.array(
[
69.60857,
339.15588,
340.26526,
317.86752,
313.2767,
315.99194,
274.82297,
93.786545,
101.34922,
249.91806,
286.96323,
]
),
variable_order=[
"IR_016",
"IR_039",
"IR_087",
"IR_097",
"IR_108",
"IR_120",
"IR_134",
"VIS006",
"VIS008",
"WV_062",
"WV_073",
],
)
temp_directory = filename.parent
try:
# IF decompression fails, pass
decompressed_filename: str = decompress(filename, temp_directory)
except subprocess.CalledProcessError:
return None
return None, None
scene = Scene(filenames={"seviri_l1b_native": [decompressed_filename]})
scene.load(
hrv_scene = Scene(filenames={"seviri_l1b_native": [decompressed_filename]})
hrv_scene.load(
[
"HRV",
]
)
scene.load(
[
"IR_016",
"IR_039",
"IR_087",
Expand All @@ -82,17 +132,29 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non
"WV_073",
]
)

# While we wnat to avoid resampling as much as possible,
# HRV is the only one different than the others, so to make it simpler, resample all to HRV resolution (3x the resolution)
scene = scene.resample()

# HRV covers a smaller portion of the disk than other bands, so use that as the bounds
# Selected bounds emprically for have no NaN values from off disk image, and covering the UK + a bit
scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
hrv_scene = hrv_scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
dataarray: xr.DataArray = convert_scene_to_dataarray(scene, band="IR_016", area=area)
hrv_dataarray: xr.DataArray = convert_scene_to_dataarray(hrv_scene, band="HRV", area=area)
# Delete file off disk
os.remove(decompressed_filename)

# If any NaNs still exist, then don't return it
if is_dataset_clean(dataarray) and is_dataset_clean(hrv_dataarray):
# Compress and return
dataarray = compressor.compress(dataarray)
hrv_dataarray = hrv_compressor.compress(hrv_dataarray)
return dataarray, hrv_dataarray
else:
return None, None


def convert_scene_to_dataarray(scene: Scene, band: str, area: str) -> xr.DataArray:
scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
# Lat and Lon are the same for all the channels now
lon, lat = scene["HRV"].attrs["area"].get_lonlats()
lon, lat = scene[band].attrs["area"].get_lonlats()
osgb_x, osgb_y = lat_lon_to_osgb(lat, lon)
dataset: xr.Dataset = scene.to_xarray_dataset()
# Add coordinate arrays, since x and y changes for each pixel, cannot replace dataset x,y coords with these directly
Expand All @@ -115,16 +177,7 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non
dim="y", max_gap=2
)

# Delete file off disk
os.remove(decompressed_filename)

# If any NaNs still exist, then don't return it
if is_dataset_clean(dataarray):
# Compress and return
dataarray = compressor.compress(dataarray)
return dataarray
else:
return None
return dataarray


get_time_as_unix = (
Expand All @@ -143,6 +196,7 @@ def save_dataset_to_zarr(
timesteps_per_chunk: int = 1,
y_size_per_chunk: int = 256,
x_size_per_chunk: int = 256,
channel_chunk_size: int = 12
) -> None:
"""
Save an Xarray DataArray into a Zarr file
Expand All @@ -157,6 +211,8 @@ def save_dataset_to_zarr(
"""
dataarray = dataarray.transpose(*["time", "x", "y", "variable"])
# We convert the datetime to seconds since the Unix epoch as otherwise appending to the Zarr store results in the
# first timestep being duplicated, and the actual timestep being thrown away.
dataarray["time"] = get_time_as_unix(dataarray)

_, x_size, y_size, _ = dataarray.shape
Expand All @@ -171,7 +227,7 @@ def save_dataset_to_zarr(
timesteps_per_chunk,
y_size_per_chunk,
x_size_per_chunk,
12,
channel_chunk_size,
)
dataarray = xr.Dataset({"stacked_eumetsat_data": dataarray.chunk(chunks)})

Expand Down
8 changes: 7 additions & 1 deletion scripts/convert_native_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
"--zarr_path",
"-zarr",
default="eumetsat_zarr.zarr",
prompt="The location for the Zarr file",
prompt="The location for the non-HRV Zarr file",
)
@click.option(
"--hrv_zarr_path",
"-hrv_zarr",
default="hrv_eumetsat_zarr.zarr",
prompt="The location for the HRV Zarr file",
)
@click.option(
"--region",
Expand Down

0 comments on commit dc87680

Please sign in to comment.