From 442ef24283c2db2f6dc2a12ed3004d9a1f98562e Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Wed, 27 Oct 2021 16:35:02 +0100 Subject: [PATCH 1/3] Split into HRV and all other Zarr stores --- satip/intermediate.py | 16 ++++++- satip/utils.py | 100 ++++++++++++++++++++++++++++++++---------- 2 files changed, 92 insertions(+), 24 deletions(-) diff --git a/satip/intermediate.py b/satip/intermediate.py index c60d64a7..1e3272a1 100644 --- a/satip/intermediate.py +++ b/satip/intermediate.py @@ -15,6 +15,7 @@ def create_or_update_zarr_with_native_files( directory: str, zarr_path: str, + hrv_zarr_path: str, region: str, spatial_chunk_size: int = 256, temporal_chunk_size: int = 1, @@ -49,11 +50,12 @@ def create_or_update_zarr_with_native_files( # Check if zarr already exists if not zarr_exists: # Inital zarr path before then appending - dataset = native_wrapper((compressed_native_files[0], region)) + dataset, hrv_dataset = native_wrapper((compressed_native_files[0], region)) save_dataset_to_zarr(dataset, zarr_path=zarr_path, zarr_mode="w") + save_dataset_to_zarr(hrv_dataset, zarr_path=hrv_zarr_path, zarr_mode="w") pool = multiprocessing.Pool(processes=4) - for dataset in pool.imap_unordered( + for dataset, hrv_dataset in pool.imap_unordered( native_wrapper, zip( compressed_native_files[1:] if not zarr_exists else compressed_native_files, @@ -67,6 +69,15 @@ def create_or_update_zarr_with_native_files( x_size_per_chunk=spatial_chunk_size, y_size_per_chunk=spatial_chunk_size, timesteps_per_chunk=temporal_chunk_size, + channel_chunk_size=11 + ) + save_dataset_to_zarr( + hrv_dataset, + zarr_path=hrv_zarr_path, + x_size_per_chunk=spatial_chunk_size, + y_size_per_chunk=spatial_chunk_size, + timesteps_per_chunk=temporal_chunk_size, + channel_chunk_size=1 ) del dataset @@ -75,3 +86,4 @@ def native_wrapper(filename_and_area): filename, area = filename_and_area return load_native_to_dataset(filename, area) +create_or_update_zarr_with_native_files("/home/jacob/Development/Satip", "zarr_test.zarr", "hrv_zarr_test.zarr", "UK") diff --git a/satip/utils.py b/satip/utils.py index ab7c0e6a..2c51ff41 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1,11 +1,12 @@ import pandas as pd -from typing import Union, Any +from typing import Union, Any, Tuple import os import subprocess import zarr import xarray as xr +import numpy as np from satpy import Scene from pathlib import Path import datetime @@ -47,7 +48,7 @@ def decompress(full_bzip_filename: Path, temp_pth: Path) -> str: return full_nat_filename -def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, None]: +def load_native_to_dataset(filename: Path, area: str) -> Union[Tuple[xr.DataArray, xr.DataArray], Tuple[None, None]]: """ Load compressed native files into an Xarray dataset, resampling to the same grid for the HRV channel, and replacing small chunks of NaNs with interpolated values, and add a time coordinate @@ -58,17 +59,66 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non Returns: Returns Xarray DataArray if script worked, else returns None """ - compressor = Compressor() + hrv_compressor = Compressor(variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595])) + compressor = Compressor(mins=np.array( + [ + -2.5118103, + -64.83977, + 63.404694, + 2.844452, + 199.10002, + -17.254883, + -26.29155, + -1.1009827, + -2.4184198, + 199.57048, + 198.95093, + ] + ), + maxs=np.array( + [ + 69.60857, + 339.15588, + 340.26526, + 317.86752, + 313.2767, + 315.99194, + 274.82297, + 93.786545, + 101.34922, + 249.91806, + 286.96323, + ] + ), + variable_order=[ + "IR_016", + "IR_039", + "IR_087", + "IR_097", + "IR_108", + "IR_120", + "IR_134", + "VIS006", + "VIS008", + "WV_062", + "WV_073", + ], + ) temp_directory = filename.parent try: # IF decompression fails, pass decompressed_filename: str = decompress(filename, temp_directory) except subprocess.CalledProcessError: - return None + return None, None scene = Scene(filenames={"seviri_l1b_native": [decompressed_filename]}) - scene.load( + hrv_scene = Scene(filenames={"seviri_l1b_native": [decompressed_filename]}) + hrv_scene.load( [ "HRV", + ] + ) + scene.load( + [ "IR_016", "IR_039", "IR_087", @@ -82,17 +132,29 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non "WV_073", ] ) - - # While we wnat to avoid resampling as much as possible, - # HRV is the only one different than the others, so to make it simpler, resample all to HRV resolution (3x the resolution) - scene = scene.resample() - # HRV covers a smaller portion of the disk than other bands, so use that as the bounds # Selected bounds emprically for have no NaN values from off disk image, and covering the UK + a bit scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area]) + hrv_scene = hrv_scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area]) + dataarray: xr.DataArray = convert_scene_to_dataarray(scene, band="IR_016", area=area) + hrv_dataarray: xr.DataArray = convert_scene_to_dataarray(hrv_scene, band="HRV", area=area) + # Delete file off disk + os.remove(decompressed_filename) + + # If any NaNs still exist, then don't return it + if is_dataset_clean(dataarray) and is_dataset_clean(hrv_dataarray): + # Compress and return + dataarray = compressor.compress(dataarray) + hrv_dataarray = hrv_compressor.compress(hrv_dataarray) + return dataarray, hrv_dataarray + else: + return None, None + +def convert_scene_to_dataarray(scene: Scene, band: str, area: str) -> xr.DataArray: + scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area]) # Lat and Lon are the same for all the channels now - lon, lat = scene["HRV"].attrs["area"].get_lonlats() + lon, lat = scene[band].attrs["area"].get_lonlats() osgb_x, osgb_y = lat_lon_to_osgb(lat, lon) dataset: xr.Dataset = scene.to_xarray_dataset() # Add coordinate arrays, since x and y changes for each pixel, cannot replace dataset x,y coords with these directly @@ -115,16 +177,7 @@ def load_native_to_dataset(filename: Path, area: str) -> Union[xr.DataArray, Non dim="y", max_gap=2 ) - # Delete file off disk - os.remove(decompressed_filename) - - # If any NaNs still exist, then don't return it - if is_dataset_clean(dataarray): - # Compress and return - dataarray = compressor.compress(dataarray) - return dataarray - else: - return None + return dataarray get_time_as_unix = ( @@ -143,6 +196,7 @@ def save_dataset_to_zarr( timesteps_per_chunk: int = 1, y_size_per_chunk: int = 256, x_size_per_chunk: int = 256, + channel_chunk_size: int = 12 ) -> None: """ Save an Xarray DataArray into a Zarr file @@ -157,6 +211,8 @@ def save_dataset_to_zarr( """ dataarray = dataarray.transpose(*["time", "x", "y", "variable"]) + # We convert the datetime to seconds since the Unix epoch as otherwise appending to the Zarr store results in the + # first timestep being duplicated, and the actual timestep being thrown away. dataarray["time"] = get_time_as_unix(dataarray) _, x_size, y_size, _ = dataarray.shape @@ -171,7 +227,7 @@ def save_dataset_to_zarr( timesteps_per_chunk, y_size_per_chunk, x_size_per_chunk, - 12, + channel_chunk_size, ) dataarray = xr.Dataset({"stacked_eumetsat_data": dataarray.chunk(chunks)}) From 81fcebe12cf541c8325e0a4d604eff324ae04ce9 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Wed, 27 Oct 2021 16:36:53 +0100 Subject: [PATCH 2/3] Add to script --- scripts/convert_native_to_zarr.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/scripts/convert_native_to_zarr.py b/scripts/convert_native_to_zarr.py index 2a04b923..0840e0ca 100644 --- a/scripts/convert_native_to_zarr.py +++ b/scripts/convert_native_to_zarr.py @@ -13,7 +13,13 @@ "--zarr_path", "-zarr", default="eumetsat_zarr.zarr", - prompt="The location for the Zarr file", + prompt="The location for the non-HRV Zarr file", +) +@click.option( + "--hrv_zarr_path", + "-hrv_zarr", + default="hrv_eumetsat_zarr.zarr", + prompt="The location for the HRV Zarr file", ) @click.option( "--region", From 0318169ac58655e35ad9bcae0d4d825ad1923904 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Wed, 27 Oct 2021 16:38:44 +0100 Subject: [PATCH 3/3] Remove call on import --- satip/intermediate.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/satip/intermediate.py b/satip/intermediate.py index 1e3272a1..fa0be98f 100644 --- a/satip/intermediate.py +++ b/satip/intermediate.py @@ -85,5 +85,3 @@ def create_or_update_zarr_with_native_files( def native_wrapper(filename_and_area): filename, area = filename_and_area return load_native_to_dataset(filename, area) - -create_or_update_zarr_with_native_files("/home/jacob/Development/Satip", "zarr_test.zarr", "hrv_zarr_test.zarr", "UK")