Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split into HRV and all other Zarr stores #23

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions satip/intermediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
def create_or_update_zarr_with_native_files(
directory: str,
zarr_path: str,
hrv_zarr_path: str,
region: str,
spatial_chunk_size: int = 256,
temporal_chunk_size: int = 1,
Expand Down Expand Up @@ -49,11 +50,12 @@ def create_or_update_zarr_with_native_files(
# Check if zarr already exists
if not zarr_exists:
# Inital zarr path before then appending
dataset = native_wrapper((compressed_native_files[0], region))
dataset, hrv_dataset = native_wrapper((compressed_native_files[0], region))
save_dataset_to_zarr(dataset, zarr_path=zarr_path, zarr_mode="w")
save_dataset_to_zarr(hrv_dataset, zarr_path=hrv_zarr_path, zarr_mode="w")

pool = multiprocessing.Pool(processes=4)
for dataset in pool.imap_unordered(
for dataset, hrv_dataset in pool.imap_unordered(
native_wrapper,
zip(
compressed_native_files[1:] if not zarr_exists else compressed_native_files,
Expand All @@ -67,11 +69,19 @@ def create_or_update_zarr_with_native_files(
x_size_per_chunk=spatial_chunk_size,
y_size_per_chunk=spatial_chunk_size,
timesteps_per_chunk=temporal_chunk_size,
channel_chunk_size=11
)
save_dataset_to_zarr(
hrv_dataset,
zarr_path=hrv_zarr_path,
x_size_per_chunk=spatial_chunk_size,
y_size_per_chunk=spatial_chunk_size,
timesteps_per_chunk=temporal_chunk_size,
channel_chunk_size=1
)
del dataset


def native_wrapper(filename_and_area):
filename, area = filename_and_area
return load_native_to_dataset(filename, area)

100 changes: 78 additions & 22 deletions satip/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import pandas as pd

from typing import Union, Any
from typing import Union, Any, Tuple

import os
import subprocess
import zarr
import xarray as xr
import numpy as np
from satpy import Scene
from pathlib import Path
import datetime
Expand Down Expand Up @@ -47,7 +48,7 @@ def decompress(full_bzip_filename: Path, temp_pth: Path) -> str:
return full_nat_filename


def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, None]:
def load_native_to_dataset(filename: Path, area: str) -> Union[Tuple[xr.DataArray, xr.DataArray], Tuple[None, None]]:
"""
Load compressed native files into an Xarray dataset, resampling to the same grid for the HRV channel,
and replacing small chunks of NaNs with interpolated values, and add a time coordinate
Expand All @@ -58,17 +59,66 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non
Returns:
Returns Xarray DataArray if script worked, else returns None
"""
compressor = Compressor()
hrv_compressor = Compressor(variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595]))
compressor = Compressor(mins=np.array(
[
-2.5118103,
-64.83977,
63.404694,
2.844452,
199.10002,
-17.254883,
-26.29155,
-1.1009827,
-2.4184198,
199.57048,
198.95093,
]
),
maxs=np.array(
[
69.60857,
339.15588,
340.26526,
317.86752,
313.2767,
315.99194,
274.82297,
93.786545,
101.34922,
249.91806,
286.96323,
]
),
variable_order=[
"IR_016",
"IR_039",
"IR_087",
"IR_097",
"IR_108",
"IR_120",
"IR_134",
"VIS006",
"VIS008",
"WV_062",
"WV_073",
],
)
temp_directory = filename.parent
try:
# IF decompression fails, pass
decompressed_filename: str = decompress(filename, temp_directory)
except subprocess.CalledProcessError:
return None
return None, None
scene = Scene(filenames={"seviri_l1b_native": [decompressed_filename]})
scene.load(
hrv_scene = Scene(filenames={"seviri_l1b_native": [decompressed_filename]})
hrv_scene.load(
[
"HRV",
]
)
scene.load(
[
"IR_016",
"IR_039",
"IR_087",
Expand All @@ -82,17 +132,29 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non
"WV_073",
]
)

# While we wnat to avoid resampling as much as possible,
# HRV is the only one different than the others, so to make it simpler, resample all to HRV resolution (3x the resolution)
scene = scene.resample()

# HRV covers a smaller portion of the disk than other bands, so use that as the bounds
# Selected bounds emprically for have no NaN values from off disk image, and covering the UK + a bit
scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
hrv_scene = hrv_scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
dataarray: xr.DataArray = convert_scene_to_dataarray(scene, band="IR_016", area=area)
hrv_dataarray: xr.DataArray = convert_scene_to_dataarray(hrv_scene, band="HRV", area=area)
# Delete file off disk
os.remove(decompressed_filename)

# If any NaNs still exist, then don't return it
if is_dataset_clean(dataarray) and is_dataset_clean(hrv_dataarray):
# Compress and return
dataarray = compressor.compress(dataarray)
hrv_dataarray = hrv_compressor.compress(hrv_dataarray)
return dataarray, hrv_dataarray
else:
return None, None


def convert_scene_to_dataarray(scene: Scene, band: str, area: str) -> xr.DataArray:
scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
# Lat and Lon are the same for all the channels now
lon, lat = scene["HRV"].attrs["area"].get_lonlats()
lon, lat = scene[band].attrs["area"].get_lonlats()
osgb_x, osgb_y = lat_lon_to_osgb(lat, lon)
dataset: xr.Dataset = scene.to_xarray_dataset()
# Add coordinate arrays, since x and y changes for each pixel, cannot replace dataset x,y coords with these directly
Expand All @@ -115,16 +177,7 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non
dim="y", max_gap=2
)

# Delete file off disk
os.remove(decompressed_filename)

# If any NaNs still exist, then don't return it
if is_dataset_clean(dataarray):
# Compress and return
dataarray = compressor.compress(dataarray)
return dataarray
else:
return None
return dataarray


get_time_as_unix = (
Expand All @@ -143,6 +196,7 @@ def save_dataset_to_zarr(
timesteps_per_chunk: int = 1,
y_size_per_chunk: int = 256,
x_size_per_chunk: int = 256,
channel_chunk_size: int = 12
) -> None:
"""
Save an Xarray DataArray into a Zarr file
Expand All @@ -157,6 +211,8 @@ def save_dataset_to_zarr(

"""
dataarray = dataarray.transpose(*["time", "x", "y", "variable"])
# We convert the datetime to seconds since the Unix epoch as otherwise appending to the Zarr store results in the
# first timestep being duplicated, and the actual timestep being thrown away.
dataarray["time"] = get_time_as_unix(dataarray)

_, x_size, y_size, _ = dataarray.shape
Expand All @@ -171,7 +227,7 @@ def save_dataset_to_zarr(
timesteps_per_chunk,
y_size_per_chunk,
x_size_per_chunk,
12,
channel_chunk_size,
)
dataarray = xr.Dataset({"stacked_eumetsat_data": dataarray.chunk(chunks)})

Expand Down
8 changes: 7 additions & 1 deletion scripts/convert_native_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
"--zarr_path",
"-zarr",
default="eumetsat_zarr.zarr",
prompt="The location for the Zarr file",
prompt="The location for the non-HRV Zarr file",
)
@click.option(
"--hrv_zarr_path",
"-hrv_zarr",
default="hrv_eumetsat_zarr.zarr",
prompt="The location for the HRV Zarr file",
)
@click.option(
"--region",
Expand Down