diff --git a/.dockerignore b/.dockerignore index 9106c441..d5d15798 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,4 +6,4 @@ docs literature logs nbs -.git \ No newline at end of file +.git diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aac9b834..8580d981 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,9 +19,9 @@ repos: - id: pydocstyle args: [ - --convention=google, - "--add-ignore=D200,D202,D210,D212,D415", - "satip", + --convention=google, + "--add-ignore=D200,D202,D210,D212,D415", + "satip", ] - repo: https://github.com/PyCQA/flake8 rev: 4.0.1 @@ -29,12 +29,12 @@ repos: - id: flake8 args: [ - --max-line-length, - "100", - --extend-ignore=E203, - --per-file-ignores, - "__init__.py:F401", - "satip", + --max-line-length, + "100", + --extend-ignore=E203, + --per-file-ignores, + "__init__.py:F401", + "satip", ] - repo: https://github.com/PyCQA/isort rev: 5.10.1 @@ -52,4 +52,4 @@ repos: rev: v2.4.1 hooks: - id: prettier - types: [yaml] \ No newline at end of file + types: [yaml] diff --git a/satip/compression.py b/satip/compression.py index d607a162..f46a98d0 100644 --- a/satip/compression.py +++ b/satip/compression.py @@ -1,7 +1,8 @@ -import xarray as xr -import numpy as np from typing import Union +import numpy as np +import xarray as xr + class Compressor: def __init__( diff --git a/satip/download.py b/satip/download.py index 99e57d6c..a2191bfb 100644 --- a/satip/download.py +++ b/satip/download.py @@ -6,22 +6,23 @@ # ############ -import fsspec -import requests.exceptions -import yaml -from satip import eumetsat -import pandas as pd -import os -import math import logging -import subprocess +import math import multiprocessing -from itertools import repeat +import os +import subprocess import time +from datetime import datetime, timedelta +from itertools import repeat +from typing import Callable, List, Optional, Tuple + +import fsspec import numpy as np -from typing import Optional, List, Tuple, Callable +import pandas as pd +import requests.exceptions +import yaml -from datetime import datetime, timedelta +from satip import eumetsat _LOG = logging.getLogger("satip.download") _LOG.setLevel(logging.INFO) diff --git a/satip/eumetsat.py b/satip/eumetsat.py index aa713402..0dbce229 100644 --- a/satip/eumetsat.py +++ b/satip/eumetsat.py @@ -1,16 +1,15 @@ -import pandas as pd - -from typing import Union, List -import datetime -import zipfile import copy +import datetime import os -from io import BytesIO import re import urllib +import zipfile +from io import BytesIO +from typing import List, Union -from requests.auth import HTTPBasicAuth +import pandas as pd import requests +from requests.auth import HTTPBasicAuth from satip import utils diff --git a/satip/geospatial.py b/satip/geospatial.py index 6d7c111b..bbb73d0a 100644 --- a/satip/geospatial.py +++ b/satip/geospatial.py @@ -1,6 +1,6 @@ import datetime from numbers import Number -from typing import Tuple, List +from typing import List, Tuple import numpy as np import pyproj @@ -30,7 +30,7 @@ class Transformers: """ def __init__(self): - """ Init """ + """Init""" self._lat_lon_to_osgb = None self.make_transformers() @@ -44,7 +44,7 @@ def make_transformers(self): @property def lat_lon_to_osgb(self): - """ lat-lon to OSGB property """ + """lat-lon to OSGB property""" return self._lat_lon_to_osgb @@ -53,7 +53,7 @@ def lat_lon_to_osgb(self): def download_grids(): - """ The transformer grid sometimes need updating """ + """The transformer grid sometimes need updating""" pyproj.transformer.TransformerGroup(crs_from=WGS84, crs_to=OSGB).download_grids(verbose=True) transformers.make_transformers() diff --git a/satip/intermediate.py b/satip/intermediate.py index 2f9a153a..9714e32e 100644 --- a/satip/intermediate.py +++ b/satip/intermediate.py @@ -1,25 +1,25 @@ -from satip.utils import ( - load_native_to_dataset, - save_dataset_to_zarr, - check_if_timestep_exists, -) -from satip.eumetsat import eumetsat_filename_to_datetime, eumetsat_cloud_name_to_datetime -import os -import pandas as pd -from pathlib import Path import multiprocessing +import os from itertools import repeat +from pathlib import Path + +import pandas as pd import xarray as xr from tqdm import tqdm +from satip.eumetsat import eumetsat_cloud_name_to_datetime, eumetsat_filename_to_datetime +from satip.utils import check_if_timestep_exists, load_native_to_dataset, save_dataset_to_zarr + -def split_per_month(directory: str, - zarr_path: str, - hrv_zarr_path: str, - region: str, - temp_directory: str = "/mnt/ramdisk/", - spatial_chunk_size: int = 256, - temporal_chunk_size: int = 1, ): +def split_per_month( + directory: str, + zarr_path: str, + hrv_zarr_path: str, + region: str, + temp_directory: str = "/mnt/ramdisk/", + spatial_chunk_size: int = 256, + temporal_chunk_size: int = 1, +): """ Splits the Zarr creation into multiple, month-long Zarr files for parallel writing @@ -47,10 +47,11 @@ def split_per_month(directory: str, for month in month_directories: if not os.path.isdir(os.path.join(directory, year, month)): continue - month_directory = os.path.join(directory, year.split('/')[0], month.split('/')[0]) + month_directory = os.path.join(directory, year.split("/")[0], month.split("/")[0]) month_zarr_path = zarr_path + f"_{year.split('/')[0]}_{month.split('/')[0]}.zarr" - hrv_month_zarr_path = hrv_zarr_path + f"_{year.split('/')[0]}" \ - f"_{month.split('/')[0]}.zarr" + hrv_month_zarr_path = ( + hrv_zarr_path + f"_{year.split('/')[0]}" f"_{month.split('/')[0]}.zarr" + ) dirs.append(month_directory) zarrs.append(month_zarr_path) hrv_zarrs.append(hrv_month_zarr_path) @@ -58,15 +59,16 @@ def split_per_month(directory: str, if not zarr_exists: # Inital zarr path before then appending compressed_native_files = list(Path(month_directory).rglob("*.bz2")) - dataset, hrv_dataset = load_native_to_dataset(compressed_native_files[0], - temp_directory, - region) + dataset, hrv_dataset = load_native_to_dataset( + compressed_native_files[0], temp_directory, region + ) save_dataset_to_zarr(dataset, zarr_path=month_zarr_path, zarr_mode="w") save_dataset_to_zarr(hrv_dataset, zarr_path=hrv_month_zarr_path, zarr_mode="w") print(dirs) print(zarrs) pool = multiprocessing.Pool(processes=16) - for _ in tqdm(pool.imap_unordered( + for _ in tqdm( + pool.imap_unordered( wrapper, zip( dirs, @@ -75,17 +77,18 @@ def split_per_month(directory: str, repeat(temp_directory), repeat(region), repeat(spatial_chunk_size), - repeat(temporal_chunk_size) - ), - )): + repeat(temporal_chunk_size), + ), + ) + ): print("Month done") def wrapper(args): dirs, zarrs, hrv_zarrs, temp_directory, region, spatial_chunk_size, temporal_chunk_size = args - create_or_update_zarr_with_native_files(dirs, zarrs, hrv_zarrs, temp_directory, region, - spatial_chunk_size, - temporal_chunk_size) + create_or_update_zarr_with_native_files( + dirs, zarrs, hrv_zarrs, temp_directory, region, spatial_chunk_size, temporal_chunk_size + ) def create_or_update_zarr_with_native_files( @@ -136,7 +139,7 @@ def create_or_update_zarr_with_native_files( x_size_per_chunk=spatial_chunk_size, y_size_per_chunk=spatial_chunk_size, timesteps_per_chunk=temporal_chunk_size, - channel_chunk_size=11 + channel_chunk_size=11, ) save_dataset_to_zarr( hrv_dataset, @@ -144,7 +147,7 @@ def create_or_update_zarr_with_native_files( x_size_per_chunk=spatial_chunk_size, y_size_per_chunk=spatial_chunk_size, timesteps_per_chunk=temporal_chunk_size, - channel_chunk_size=1 + channel_chunk_size=1, ) except Exception as e: print(f"Failed with: {e}") @@ -155,9 +158,10 @@ def create_or_update_zarr_with_native_files( def pool_init(q): - global processed_queue # make queue global in workers + global processed_queue # make queue global in workers processed_queue = q + def native_wrapper(filename_and_area): filename, area = filename_and_area processed_queue.put(load_native_to_dataset(filename, area)) diff --git a/satip/utils.py b/satip/utils.py index ec7b7b2b..159d9d3c 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1,19 +1,19 @@ -import pandas as pd - -from typing import Union, Any, Tuple - +import datetime +import logging import os import subprocess -import zarr -import xarray as xr +import warnings +from pathlib import Path +from typing import Any, Tuple, Union + import numpy as np +import pandas as pd +import xarray as xr +import zarr from satpy import Scene -from pathlib import Path -import datetime -import logging -from satip.geospatial import lat_lon_to_osgb, GEOGRAPHIC_BOUNDS + from satip.compression import Compressor, is_dataset_clean -import warnings +from satip.geospatial import GEOGRAPHIC_BOUNDS, lat_lon_to_osgb warnings.filterwarnings("ignore", message="divide by zero encountered in true_divide") warnings.filterwarnings("ignore", message="invalid value encountered in sin") @@ -51,8 +51,9 @@ def decompress(full_bzip_filename: Path, temp_pth: Path) -> str: return full_nat_filename -def load_native_to_dataset(filename: Path, temp_directory: Path, area: str) -> Union[Tuple[ - xr.DataArray, xr.DataArray], Tuple[None, None]]: +def load_native_to_dataset( + filename: Path, temp_directory: Path, area: str +) -> Union[Tuple[xr.DataArray, xr.DataArray], Tuple[None, None]]: """ Load compressed native files into an Xarray dataset, resampling to the same grid for the HRV channel, and replacing small chunks of NaNs with interpolated values, and add a time coordinate @@ -64,22 +65,25 @@ def load_native_to_dataset(filename: Path, temp_directory: Path, area: str) -> U Returns: Returns Xarray DataArray if script worked, else returns None """ - hrv_compressor = Compressor(variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595])) - compressor = Compressor(mins=np.array( - [ - -2.5118103, - -64.83977, - 63.404694, - 2.844452, - 199.10002, - -17.254883, - -26.29155, - -1.1009827, - -2.4184198, - 199.57048, - 198.95093, - ] - ), + hrv_compressor = Compressor( + variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595]) + ) + compressor = Compressor( + mins=np.array( + [ + -2.5118103, + -64.83977, + 63.404694, + 2.844452, + 199.10002, + -17.254883, + -26.29155, + -1.1009827, + -2.4184198, + 199.57048, + 198.95093, + ] + ), maxs=np.array( [ 69.60857, @@ -159,9 +163,9 @@ def convert_scene_to_dataarray(scene: Scene, band: str, area: str) -> xr.DataArr lon, lat = scene[band].attrs["area"].get_lonlats() osgb_x, osgb_y = lat_lon_to_osgb(lat, lon) dataset: xr.Dataset = scene.to_xarray_dataset() - osgb_y = osgb_y[:,0] - osgb_x = osgb_x[0,:] - dataset = dataset.assign_coords(x=osgb_x,y=osgb_y) + osgb_y = osgb_y[:, 0] + osgb_x = osgb_x[0, :] + dataset = dataset.assign_coords(x=osgb_x, y=osgb_y) # Round to the nearest 5 minutes dataset.attrs["end_time"] = pd.Timestamp(dataset.attrs["end_time"]).round("5 min") @@ -182,12 +186,7 @@ def convert_scene_to_dataarray(scene: Scene, band: str, area: str) -> xr.DataArr return dataarray -get_time_as_unix = ( - lambda da: pd.Series( - pd.to_datetime(da.time.values) - ) - .values -) +get_time_as_unix = lambda da: pd.Series(pd.to_datetime(da.time.values)).values def save_dataset_to_zarr( @@ -197,7 +196,7 @@ def save_dataset_to_zarr( timesteps_per_chunk: int = 1, y_size_per_chunk: int = 256, x_size_per_chunk: int = 256, - channel_chunk_size: int = 12 + channel_chunk_size: int = 12, ) -> None: """ Save an Xarray DataArray into a Zarr file @@ -235,7 +234,7 @@ def save_dataset_to_zarr( # One last check again just incase chunking causes any issues print("Failing clean check after chunking") return - dataarray = dataarray.fillna(-1) # Fill NaN with -1, even if none should exist + dataarray = dataarray.fillna(-1) # Fill NaN with -1, even if none should exist dataarray = xr.Dataset({"stacked_eumetsat_data": dataarray}) zarr_mode_to_extra_kwargs = { @@ -293,6 +292,7 @@ def check_if_timestep_exists(dt: datetime.datetime, zarr_dataset: xr.Dataset) -> else: return False + def create_markdown_table(table_info: dict, index_name: str = "Id") -> str: """ Returns a string for a markdown table, formatted @@ -331,10 +331,10 @@ def create_markdown_table(table_info: dict, index_name: str = "Id") -> str: # Cell def set_up_logging( - name: str, - log_dir: str, - main_logging_level: str = "DEBUG", - ) -> logging.Logger: + name: str, + log_dir: str, + main_logging_level: str = "DEBUG", +) -> logging.Logger: """ `set_up_logging` initialises and configures a custom logger for `satip`. The logging level of the file and @@ -386,7 +386,7 @@ def set_up_logging( logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARNING", "WARN", "INFO", "DEBUG", "NOTSET"] assert ( - main_logging_level in logging_levels + main_logging_level in logging_levels ), f"main_logging_level must be one of {', '.join(logging_levels)}" logger.setLevel(getattr(logging, main_logging_level)) diff --git a/scripts/convert_native_to_zarr.py b/scripts/convert_native_to_zarr.py index 85a945c3..a093a0de 100644 --- a/scripts/convert_native_to_zarr.py +++ b/scripts/convert_native_to_zarr.py @@ -1,6 +1,7 @@ -from satip.intermediate import create_or_update_zarr_with_native_files, split_per_month import click +from satip.intermediate import create_or_update_zarr_with_native_files, split_per_month + @click.command() @click.option( @@ -26,7 +27,7 @@ "-temp", default="/mnt/ramdisk/", prompt="Where to store temp directory", - ) +) @click.option( "--region", default="UK", diff --git a/scripts/get_raw_eumetsat_data.py b/scripts/get_raw_eumetsat_data.py index 4a2e8bf4..f98afe28 100644 --- a/scripts/get_raw_eumetsat_data.py +++ b/scripts/get_raw_eumetsat_data.py @@ -9,6 +9,7 @@ import click import pandas as pd + import satip.download NATIVE_FILESIZE_MB = 102.210123 diff --git a/scripts/move_files.py b/scripts/move_files.py index ea159c68..2bbe54f1 100644 --- a/scripts/move_files.py +++ b/scripts/move_files.py @@ -1,33 +1,41 @@ -import shutil import glob +import shutil -zarr_paths = list(glob.glob("/mnt/storage_ssd/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/satellite/EUMETSAT/SEVIRI_RSS/zarr/v2/eumetsat_zarr_*")) +zarr_paths = list( + glob.glob( + "/mnt/storage_ssd/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/satellite/EUMETSAT/SEVIRI_RSS/zarr/v2/eumetsat_zarr_*" + ) +) for zarr_path in zarr_paths: - dataset: xr.Dataset = xr.open_dataset(zarr_path, engine = 'zarr') + dataset: xr.Dataset = xr.open_dataset(zarr_path, engine="zarr") print(dataset) if dataset.coords["x"] == osgb_x: continue - dataset = dataset.assign_coords(x=osgb_x,y=osgb_y) + dataset = dataset.assign_coords(x=osgb_x, y=osgb_y) print(dataset) - dataset.to_zarr(zarr_path, mode='a', compute = True) + dataset.to_zarr(zarr_path, mode="a", compute=True) scene = Scene(filenames={"seviri_l1b_native": [filename]}) scene.load( [ "HRV", - ] - ) + ] +) -scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS['UK']) +scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS["UK"]) # Lat and Lon are the same for all the channels now -lon, lat = scene['HRV'].attrs["area"].get_lonlats() +lon, lat = scene["HRV"].attrs["area"].get_lonlats() osgb_x, osgb_y = lat_lon_to_osgb(lat, lon) -osgb_y = osgb_y[:,0] -osgb_x = osgb_x[0,:] -zarr_paths = list(glob.glob("/mnt/storage_ssd/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/satellite/EUMETSAT/SEVIRI_RSS/zarr/v2/hrv_eumetsat_zarr_*")) +osgb_y = osgb_y[:, 0] +osgb_x = osgb_x[0, :] +zarr_paths = list( + glob.glob( + "/mnt/storage_ssd/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/satellite/EUMETSAT/SEVIRI_RSS/zarr/v2/hrv_eumetsat_zarr_*" + ) +) for zarr_path in zarr_paths: - dataset: xr.Dataset = xr.open_dataset(zarr_path, engine = 'zarr') + dataset: xr.Dataset = xr.open_dataset(zarr_path, engine="zarr") print(dataset) - dataset = dataset.assign_coords(x=osgb_x,y=osgb_y) + dataset = dataset.assign_coords(x=osgb_x, y=osgb_y) print(dataset) - dataset.to_zarr(zarr_path, mode='a', compute = True) \ No newline at end of file + dataset.to_zarr(zarr_path, mode="a", compute=True) diff --git a/setup.py b/setup.py index 15d495fd..e9229191 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,8 @@ -from setuptools import setup, find_packages - # read the contents of your README file from pathlib import Path +from setuptools import find_packages, setup + this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() install_requires = (this_directory / "requirements.txt").read_text().splitlines()