From 263533e18e5c11043eccda4859b2e5aec3c8437c Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Wed, 24 Apr 2024 14:43:58 +0100 Subject: [PATCH 1/8] feat(scripts): Add modified satellite archival pipeline from gcp vm --- scripts/archival_pipeline.py | 714 +++++++++++++++++++++++++++++++++++ 1 file changed, 714 insertions(+) create mode 100755 scripts/archival_pipeline.py diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py new file mode 100755 index 00000000..0d5beb9c --- /dev/null +++ b/scripts/archival_pipeline.py @@ -0,0 +1,714 @@ +"""Pipeline for downloading, processing, and saving archival satellite data. + +Consolidates the old cli_downloader, backfill_hrv and backfill_nonhrv scripts. +""" +import xarray as xr +import numpy as np +from typing import Literal +from tqdm import tqdm +import argparse +import multiprocessing +import os +import datetime as dt +import diskcache as dc +from ocf_blosc2 import Blosc2 +import pandas as pd +import subprocess +import sys +import logging +import dataclasses + +from satip.scale_to_zero_to_one import ScaleToZeroToOne +from satip.serialize import serialize_attrs +from satip.utils import convert_scene_to_dataarray + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) +log = logging.getLogger(__name__) + + +@dataclasses.dataclass +class Config: + region: str + cadence: str + product_id: str + native_path: str + zarr_path: dict[str, str] + +CONFIGS: dict[str, Config] = { + "iodc": Config( + region="india", + cadence="15min", + product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", + native_path="/mnt/disks/sat/native_files_india/", + zarr_path={ + "hrv": "/mnt/disks/sat/%Y_hrv_iodc.zarr", + "nonhrv": "/mnt/disks/sat/%Y_nonhrv_iodc.zarr", + }, + ), + "severi": Config( + region="europe", + cadence="5min", + product_id="EO:EUM:DAT:MSG:MSG15-RSS", + native_path="/mnt/disks/sat/native_files/", + zarr_path={ + "hrv": "/mnt/disks/sat/%Y_hrv.zarr", + "nonhrv": "/mnt/disks/sat/%Y_nonhrv.zarr", + }, + ), + # Optional + "odegree": Config( + region="europe, africa", + cadence="15min", + product_id="EO:EUM:DAT:MSG:HRSEVIRI", + native_path="/mnt/disks/sat/native_files_odegree/", + zarr_path={ + "hrv": "/mnt/disks/sat/%Y_hrv_odegree.zarr", + "nonhrv": "/mnt/disks/sat/%Y_nonhrv_odegree.zarr", + }, + ), +} + +@dataclasses.dataclass +class Channel: + minimum: float + maximum: float + variable: str + +# Approximate minimum and maximum pixel values per channel, for normalization +# * Caclulated by Jacob via the min and max of a snapshot of the data +CHANNELS: dict[str, list[Channel]] = { + # Non-HRV is 11 channels of different filters + "nonhrv": [ + Channel(-2.5118103, 69.60857, "IR_016"), + Channel(-64.83977, 339.15588, "IR_039"), + Channel(63.404694, 340.26526, "IR_087"), + Channel(2.844452, 317.86752, "IR_097"), + Channel(199.10002, 313.2767, "IR_108"), + Channel(-17.254883, 315.99194, "IR_120"), + Channel(-26.29155, 274.82297, "IR_134"), + Channel(-1.1009827, 93.786545, "VIS006"), + Channel(-2.4184198, 101.34922, "VIS008"), + Channel(199.57048, 249.91806, "WV_062"), + Channel(198.95093, 286.96323, "WV_073"), + ], + # HRV is one greyscale wideband filter channel + "hrv": [ + Channel(-1.2278595, 103.90016, "HRV"), + ], +} + +parser = argparse.ArgumentParser( + prog="EUMETSTAT Pipeline", + description="Downloads and processes data from EUMETSTAT", +) +parser.add_argument( + 'sat', + help="Which satellite to download data from", + type=str, + choices=list(CONFIGS.keys()), +) +parser.add_argument( + "--start_date", + help="Date to download from (YYYY-MM-DD)", + type=dt.date.fromisoformat, + required=True, +) +parser.add_argument( + "--end_date", + help="Date to download to (YYYY-MM-DD)", + type=dt.date.fromisoformat, + required=False, + default=str(dt.datetime.utcnow().date()), +) + +def download_scans( + sat_config: Config, + scan_times: list[pd.Timestamp], + ) -> list[pd.Timestamp]: + """Download satellite scans for a given config over the given scan times. + + Returns: + List of scan times that failed to download. + """ + failed_scan_times: list[pd.Timestamp] = [] + + # Get credentials from environment + consumer_key: str = os.environ["EUMETSAT_CONSUMER_KEY"] + consumer_secret: str = os.environ["EUMETSAT_CONSUMER_SECRET"] + + # Create progressbar + pbar = tqdm( + total=len(scan_times), + position=0 if scan_times[0] < scan_times[-1] else 1, + desc="forward" if scan_times[0] < scan_times[-1] else "backward" + ) + + for i, scan_time in enumerate(scan_times): + # Authenticate + # * Generates a new access token with short expiry + process = subprocess.run( + [ + "eumdac", + "--set-credentials", + consumer_key, + consumer_secret, + "\n" + ], + capture_output=True, + text=True, + ) + if process.returncode != 0: + log.warn(f"Unable to authenticate with eumdac: {process.stdout} {process.stderr}") + failed_scan_times.append(scan_time) + continue + + # Download + window_start: pd.Timestamp = scan_time - pd.Timedelta(sat_config.cadence) + window_end: pd.Timestamp = scan_time + pd.Timedelta(sat_config.cadence) + process = subprocess.run( + [ + "eumdac", + "download", + "-c", sat_config.product_id, + "-s", window_start.tz_localize(None).strftime("%Y-%m-%dT%H:%M:%S"), + "-e", window_end.tz_localize(None).strftime("%Y-%m-%dT%H:%M:%S"), + "-o", sat_config.native_path, + "--entry", + "*.nat", + "-y" + ], + capture_output=True, + text=True, + ) + if process.returncode != 0: + log.debug(f"Failed to download scans for scan_time {scan_time}") + failed_scan_times.append(scan_time) + + pbar.update(1) + + return failed_scan_times + +def process_scans( + sat_config: Config, + start: dt.date, + end: dt.date, + dstype: Literal["hrv", "nonhrv"], + ) -> str: + + # Check zarr file exists for the year + zarr_path = start.strftime(sat_config.zarr_path[dstype]) + if os.path.exists(zarr_path): + zarr_times = xr.open_zarr(zarr_path).sortby("time").time.values + last_zarr_time = zarr_times[-1] + else: + # Set dummy values for times already in zarr + last_zarr_time = dt.datetime(1970, 1, 1) + zarr_times = [last_zarr_time, last_zarr_time] + + # Get native files in order + native_files = list(glob.glob(f"{sat_config.native_path}*/*.nat")) + log.info(f"Found {len(native_files)} native files at {sat_config.native_path}") + native_files.sort() + + # Create progressbar + pbar = tqdm( + total=len(native_files), + position=0 if dstype == "hrv" else 1, + desc=f"{dstype}", + ) + + # Convert native files to xarray datasets + # * Append to the yearly zarr in hourly chunks + datasets: list[xr.Dataset] = [] + for f in native_files: + try: + dataset: xr.Dataset | None = _open_and_scale_data(zarr_times, f, dstype) + except Exception as e: + log.error(f"Exception: {e}") + continue + if dataset is not None: + dataset = _preprocess_function(dataset) + datasets.append(dataset) + # Append to zarrs in hourly chunks (12 sets of 5 minute datasets) + # * This is so zarr doesn't complain about mismatching chunk sizes + if len(datasets) == 12: + mode = "w" + if os.path.exists(zarr_path): + mode = "a" + write_to_zarr( + xr.concat(datasets, dim="time"), + zarr_path, + mode, + chunks={"time": 12,} + ) + datasets = [] + + pbar.update(1) + + # Consolidate zarr metadata + _rewrite_zarr_times(zarr_path) + + return dstype + +def _open_and_scale_data( + zarr_times: list[dt.datetime], + f: str, + dstype: Literal["hrv", "nonhrv"], + ) -> xr.Dataset | None: + """Opens a raw file and converts it to a normalised xarray dataset.""" + # Create a scaler according to the channel + scaler = ScaleToZeroToOne( + variable_order=[c.variable for c in CHANNELS[dstype]], + maxs=np.array([c.maximum for c in CHANNELS[dstype]]), + mins=np.array([c.minimum for c in CHANNELS[dstype]]), + ) + # The reader is the same for each satellite as the sensor is the same + # * Hence "severi" in all cases + scene = Scene(filenames={"severi_l1b_native": [f]}) + scene.load([c.variable for c in CHANNELS[dstype]]) + da: xr.DataArray = convert_scene_to_dataarray( + scene, band=CHANNELS[dstype][0].variable, area="RSS", calculate_osgb=False, + ) + + # Rescale the data, update the attributes, save as dataset + attrs = serialize_attrs(da.attrs) + da = scaler.rescale(da) + da.attrs.update(attrs) + da = da.transpose("time", "y_geostationary", "x_geostationary", "variable") + ds: xr.Dataset = da.to_dataset(name="data") + ds["data"] = ds.data.astype(np.float16) + + if dataset.time.values[0] in zarr_times: + log.debug(f"Skipping: {dataset.time.values[0]}") + return None + + return ds + +def _preprocess_function(xr_data: xr.Dataset) -> xr.Dataset: + """Updates the coordinates for the given dataset.""" + attrs = xr_data.attrs + y_coords = xr_data.coords["y_geostationary"].values + x_coords = xr_data.coords["x_geostationary"].values + x_dataarray = xr.DataArray( + data=np.expand_dims(xr_data.coords["x_geostationary"].values, axis=0), + dims=["time", "x_geostationary"], + coords=dict(time=xr_data.coords["time"].values, x_geostationary=x_coords), + ) + y_dataarray = xr.DataArray( + data=np.expand_dims(xr_data.coords["y_geostationary"].values, axis=0), + dims=["time", "y_geostationary"], + coords=dict(time=xr_data.coords["time"].values, y_geostationary=y_coords), + ) + xr_data["x_geostationary_coordinates"] = x_dataarray + xr_data["y_geostationary_coordinates"] = y_dataarray + xr_data.attrs = attrs + return xr_data + + +def _write_to_zarr(dataset, zarr_name, mode, chunks): + mode_extra_kwargs = { + "a": {"append_dim": "time"}, + "w": { + "encoding": { + "data": { + "compressor": Blosc2("zstd", clevel=5), + }, + "time": {"units": "nanoseconds since 1970-01-01"}, + } + }, + } + extra_kwargs = mode_extra_kwargs[mode] + dataset.isel(x_geostationary=slice(0,5548)).chunk(chunks).to_zarr( + zarr_name, compute=True, **extra_kwargs, consolidated=True, mode=mode + ) + + +def _rewrite_zarr_times(output_name): + # Combine time coords + ds = xr.open_zarr(output_name) + + # Prevent numcodecs string error + # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 + for v in list(ds.coords.keys()): + if ds.coords[v].dtype == object: + ds[v].encoding.clear() + + for v in list(ds.variables.keys()): + if ds[v].dtype == object: + ds[v].encoding.clear() + + del ds["data"] + # Need to remove these encodings to avoid chunking + del ds.time.encoding['chunks'] + del ds.time.encoding['preferred_chunks'] + ds.to_zarr(f"{output_name.split('.zarr')[0]}_coord.zarr", consolidated=True) + # Remove current time ones + shutil.rmtree(f"{output_name}/time/") + # Add new time ones + shutil.copytree(f"{output_name.split('.zarr')[0]}_coord.zarr/time", f"{output_name}/time") + + # Now replace the part of the .zmetadata with the part of the .zmetadata from the new coord one + with open(f"{output_name}/.zmetadata", "r") as f: + data = json.load(f) + with open(f"{output_name.split('.zarr')[0]}_coord.zarr/.zmetadata", "r") as f2: + coord_data = json.load(f2) + data["metadata"]["time/.zarray"] = coord_data["metadata"]["time/.zarray"] + with open(f"{output_name}/.zmetadata", "w") as f: + json.dump(data, f) + + +if __name__ == "__main__": + args = parser.parse_args() + prog_start = dt.datetime.utcnow() + log.info(f"{str(prog_start)}: Running with args: {args}") + + # Create a reusable cache + cache = dc.Cache('tmp') + + # Get config for desired satellite + sat_config = CONFIGS[args.sat] + + # Get start and end times for run + start: dt.date = args.start_date + end: dt.date = args.end_date + scan_times: list[pd.Timestamp] = pd.date_range(start=start, end=end, freq=sat_config.cadence).tolist() + + # Get average runtime from cache + secs_per_scan = cache.get('secs_per_scan', default=65) + expected_runtime = pd.Timedelta(secs_per_scan * len(scan_times), 'seconds') + log.info(f"Downloading {len(scan_times)} scans. Expected runtime: {str(expected_runtime)}") + + # Perform data download passes + # * Each pass has two simultaneous forward and backward download streams + for pass_num in [0, 1]: + pool = multiprocessing.Pool() + log.info(f"Performing pass {pass_num}") + results = pool.starmap( + download_scans, + [ + (sat_config, scan_times), + (sat_config, list(reversed(scan_times))), + ], + ) + pool.close() + pool.join() + for result in results: + log.info(f"Completed download with {len(result)} failed scan times.") + + + # Calculate the new average time per timestamp + runtime: dt.timedelta = dt.datetime.utcnow() - prog_start + new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2) + cache.set('secs_per_scan', new_average_secs_per_scan) + log.info(f"Completed download for args: {args} in {str(runtime)} (avg {new_average_secs_per_scan} secs per scan)") + + # Process the HRV and non-HRV data + pool = multiprocessing.Pool() + results = pool.starmap( + process_scans, + [ + (sat_config, start, end, "hrv"), + (sat_config, start, end, "nonhrv"), + ] + ) + pool.close() + pool.join() + for result in results: + log.info(f"Processed {result} data") + + pool + + +""" +Jacob's old script! + +last_zarr_time = pd.Timestamp("2024-01-01T00:00:00.000000000") +#start_zarr_time = pd.Timestamp("2024-01-01T00:00:00.000000000") +start_date = ( + pd.Timestamp.utcnow().tz_convert("UTC").to_pydatetime().replace(tzinfo=None) +) +start_zarr_time = pd.Timestamp(start_date).to_pydatetime().replace(tzinfo=None) +last_zarr_time = pd.Timestamp(last_zarr_time).to_pydatetime().replace(tzinfo=None) +start_str = last_zarr_time.strftime("%Y-%m-%d") +end_str = start_zarr_time.strftime("%Y-%m-%d") +print(start_str) +print(end_str) +date_range = pd.date_range(start=start_str, end=end_str, freq="5min") +print(date_range) +for date in date_range: + start_date = pd.Timestamp(date) - pd.Timedelta("5min") + end_date = pd.Timestamp(date) + pd.Timedelta("5min") + process = subprocess.run( + [ + "eumdac", + "--set-credentials", + "SWdEnLvOlVTVGli1An1nKJ3NcV0a", + "gUQe0ej7H_MqQVGF4cd7wfQWcawa", + "\n" + ] + ) + end_date_time = end_date.tz_localize(None).strftime("%Y-%m-%dT%H:%M:%S") + start_date_time = start_date.tz_localize(None).strftime("%Y-%m-%dT%H:%M:%S") + + print(start_date_time) + print(end_date_time) + process = subprocess.run( + [ + "eumdac", + "download", + "-c", + "EO:EUM:DAT:MSG:MSG15-RSS", + "-s", + f"{end_date_time}", + "-e", + f"{end_date_time}", + "-o", + "/mnt/disks/sat/native_files/", + "--entry", + "*.nat", + "-y" + ] + ) +""" + +from ocf_blosc2 import Blosc2 +import datetime as dt +import xarray as xr +import satpy +from satpy import Scene +from satip.eumetsat import DownloadManager +from satip.scale_to_zero_to_one import ScaleToZeroToOne +from satip.serialize import serialize_attrs +from satip.utils import convert_scene_to_dataarray +import os +import numpy as np +import pandas as pd +import glob +import shutil +import json + + +def list_native_files(): + # Get native files in order + native_files = list(glob.glob("/mnt/disks/sat/native_files/*/*.nat")) + print(f"Native files: {len(native_files)}") + native_files.sort() + return native_files + + +def preprocess_function(xr_data: xr.Dataset) -> xr.Dataset: + attrs = xr_data.attrs + y_coords = xr_data.coords["y_geostationary"].values + x_coords = xr_data.coords["x_geostationary"].values + x_dataarray = xr.DataArray( + data=np.expand_dims(xr_data.coords["x_geostationary"].values, axis=0), + dims=["time", "x_geostationary"], + coords=dict(time=xr_data.coords["time"].values, x_geostationary=x_coords), + ) + y_dataarray = xr.DataArray( + data=np.expand_dims(xr_data.coords["y_geostationary"].values, axis=0), + dims=["time", "y_geostationary"], + coords=dict(time=xr_data.coords["time"].values, y_geostationary=y_coords), + ) + xr_data["x_geostationary_coordinates"] = x_dataarray + xr_data["y_geostationary_coordinates"] = y_dataarray + xr_data.attrs = attrs + return xr_data + + +def open_and_scale_data_hrv(zarr_times, f): + hrv_scaler = ScaleToZeroToOne( + variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595]) + ) + + hrv_scene = Scene(filenames={"seviri_l1b_native": [f]}) + hrv_scene.load( + [ + "HRV", + ] + ) + hrv_dataarray: xr.DataArray = convert_scene_to_dataarray( + hrv_scene, band="HRV", area="RSS", calculate_osgb=False + ) + attrs = serialize_attrs(hrv_dataarray.attrs) + hrv_dataarray = hrv_scaler.rescale(hrv_dataarray) + hrv_dataarray.attrs.update(attrs) + hrv_dataarray = hrv_dataarray.transpose( + "time", "y_geostationary", "x_geostationary", "variable" + ) + hrv_dataset = hrv_dataarray.to_dataset(name="data") + hrv_dataset["data"] = hrv_dataset["data"].astype(np.float16) + + if hrv_dataset.time.values[0] in zarr_times: + print(f"Skipping: {hrv_dataset.time.values[0]}") + return None + + return hrv_dataset + + +def open_and_scale_data_nonhrv(zarr_times, f): + """Zarr path is the path to the zarr file to extend, f is native file to open and scale""" + scaler = ScaleToZeroToOne( + mins=np.array( + [ + -2.5118103, + -64.83977, + 63.404694, + 2.844452, + 199.10002, + -17.254883, + -26.29155, + -1.1009827, + -2.4184198, + 199.57048, + 198.95093, + ] + ), + maxs=np.array( + [ + 69.60857, + 339.15588, + 340.26526, + 317.86752, + 313.2767, + 315.99194, + 274.82297, + 93.786545, + 101.34922, + 249.91806, + 286.96323, + ] + ), + variable_order=[ + "IR_016", + "IR_039", + "IR_087", + "IR_097", + "IR_108", + "IR_120", + "IR_134", + "VIS006", + "VIS008", + "WV_062", + "WV_073", + ], + ) + + scene = Scene(filenames={"seviri_l1b_native": [f]}) + scene.load( + [ + "IR_016", + "IR_039", + "IR_087", + "IR_097", + "IR_108", + "IR_120", + "IR_134", + "VIS006", + "VIS008", + "WV_062", + "WV_073", + ] + ) + dataarray: xr.DataArray = convert_scene_to_dataarray( + scene, band="IR_016", area="RSS", calculate_osgb=False + ) + attrs = serialize_attrs(dataarray.attrs) + dataarray = scaler.rescale(dataarray) + dataarray.attrs.update(attrs) + dataarray = dataarray.transpose("time", "y_geostationary", "x_geostationary", "variable") + dataset = dataarray.to_dataset(name="data") + dataset["data"] = dataset.data.astype(np.float16) + + if dataset.time.values[0] in zarr_times: + print(f"Skipping: {dataset.time.values[0]}") + return None + + return dataset + + +def write_to_zarr(dataset, zarr_name, mode, chunks): + mode_extra_kwargs = { + "a": {"append_dim": "time"}, + "w": { + "encoding": { + "data": { + "compressor": Blosc2("zstd", clevel=5), + }, + "time": {"units": "nanoseconds since 1970-01-01"}, + } + }, + } + extra_kwargs = mode_extra_kwargs[mode] + dataset.isel(x_geostationary=slice(0,5548)).chunk(chunks).to_zarr( + zarr_name, compute=True, **extra_kwargs, consolidated=True, mode=mode, + ) + + +def rewrite_zarr_times(output_name): + # Combine time coords + ds = xr.open_zarr(output_name) + + # Prevent numcodecs string error + # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 + for v in list(ds.coords.keys()): + if ds.coords[v].dtype == object: + ds[v].encoding.clear() + for v in list(ds.variables.keys()): + if ds[v].dtype == object: + ds[v].encoding.clear() + + del ds["data"] + del ds["x_geostationary_coordinates"] + del ds["y_geostationary_coordinates"] + # Need to remove these encodings to avoid chunking + del ds.time.encoding['chunks'] + del ds.time.encoding['preferred_chunks'] + ds.to_zarr(f"{output_name.split('.zarr')[0]}_coord.zarr", consolidated=True) + # Remove current time ones + shutil.rmtree(f"{output_name}/time/") + # Add new time ones + shutil.copytree(f"{output_name.split('.zarr')[0]}_coord.zarr/time", f"{output_name}/time") + + # Now replace the part of the .zmetadata with the part of the .zmetadata from the new coord one + with open(f"{output_name}/.zmetadata", "r") as f: + data = json.load(f) + with open(f"{output_name.split('.zarr')[0]}_coord.zarr/.zmetadata", "r") as f2: + coord_data = json.load(f2) + data["metadata"]["time/.zarray"] = coord_data["metadata"]["time/.zarray"] + with open(f"{output_name}/.zmetadata", "w") as f: + json.dump(data, f) + + +if __name__ == "__main__": + zarr_path = "/mnt/disks/sat/2024_hrv.zarr" + if os.path.exists(zarr_path): + hrv_zarr_times = xr.open_zarr(zarr_path).sortby("time").time.values + last_zarr_time = hrv_zarr_times[-1] + else: + # Set dummy values for times already in zarr + last_zarr_time = dt.datetime(1970, 1, 1) + hrv_zarr_times = [last_zarr_time, last_zarr_time] + native_files = list_native_files() + hrv_datasets = [] + + for f in native_files: + try: + dataset = open_and_scale_data_hrv(hrv_zarr_times, f) + except Exception as e: + print(f"Exception: {e}") + continue + if dataset is not None: + dataset = preprocess_function(dataset) + hrv_datasets.append(dataset) + if len(hrv_datasets) == 12: + mode = "w" + if os.path.exists(zarr_path): + mode = "a" + write_to_zarr( + xr.concat(hrv_datasets, dim="time"), zarr_path, mode, chunks={"time": 12,} + ) + hrv_datasets = [] + + # Consolidate zarr metadata + rewrite_zarr_times(zarr_path) From dbaa4026c8f2d161a16b1936f8ddda862cafc69a Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Wed, 24 Apr 2024 14:51:05 +0100 Subject: [PATCH 2/8] fix(archive): Logging fix --- scripts/archival_pipeline.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index 0d5beb9c..b258d5bd 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -6,6 +6,7 @@ import numpy as np from typing import Literal from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm import argparse import multiprocessing import os @@ -358,6 +359,10 @@ def _rewrite_zarr_times(output_name): if __name__ == "__main__": + # Prevent logs interfering with progressbar + logging_redirect_tqdm(loggers=[log]) + + # Get running args args = parser.parse_args() prog_start = dt.datetime.utcnow() log.info(f"{str(prog_start)}: Running with args: {args}") From ba1250fd54fb53f186df39304cba3750da5aff5a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 24 Apr 2024 13:52:10 +0000 Subject: [PATCH 3/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- scripts/archival_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index b258d5bd..c7c5eaca 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -420,10 +420,10 @@ def _rewrite_zarr_times(output_name): pool.join() for result in results: log.info(f"Processed {result} data") - + pool - + """ Jacob's old script! From da92b7a10214c61e92e96ea58485de4ed965a262 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 25 Apr 2024 10:00:43 +0100 Subject: [PATCH 4/8] fix(scripts): Remove dead code from archive pipeline --- scripts/archival_pipeline.py | 264 ++--------------------------------- 1 file changed, 12 insertions(+), 252 deletions(-) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index c7c5eaca..3c397bf9 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -2,22 +2,23 @@ Consolidates the old cli_downloader, backfill_hrv and backfill_nonhrv scripts. """ -import xarray as xr -import numpy as np -from typing import Literal -from tqdm import tqdm -from tqdm.contrib.logging import logging_redirect_tqdm import argparse +import dataclasses +import datetime as dt +import logging import multiprocessing import os -import datetime as dt -import diskcache as dc -from ocf_blosc2 import Blosc2 -import pandas as pd import subprocess import sys -import logging -import dataclasses +from typing import Literal + +import diskcache as dc +import numpy as np +import pandas as pd +import xarray as xr +from ocf_blosc2 import Blosc2 +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm from satip.scale_to_zero_to_one import ScaleToZeroToOne from satip.serialize import serialize_attrs @@ -476,244 +477,3 @@ def _rewrite_zarr_times(output_name): ) """ -from ocf_blosc2 import Blosc2 -import datetime as dt -import xarray as xr -import satpy -from satpy import Scene -from satip.eumetsat import DownloadManager -from satip.scale_to_zero_to_one import ScaleToZeroToOne -from satip.serialize import serialize_attrs -from satip.utils import convert_scene_to_dataarray -import os -import numpy as np -import pandas as pd -import glob -import shutil -import json - - -def list_native_files(): - # Get native files in order - native_files = list(glob.glob("/mnt/disks/sat/native_files/*/*.nat")) - print(f"Native files: {len(native_files)}") - native_files.sort() - return native_files - - -def preprocess_function(xr_data: xr.Dataset) -> xr.Dataset: - attrs = xr_data.attrs - y_coords = xr_data.coords["y_geostationary"].values - x_coords = xr_data.coords["x_geostationary"].values - x_dataarray = xr.DataArray( - data=np.expand_dims(xr_data.coords["x_geostationary"].values, axis=0), - dims=["time", "x_geostationary"], - coords=dict(time=xr_data.coords["time"].values, x_geostationary=x_coords), - ) - y_dataarray = xr.DataArray( - data=np.expand_dims(xr_data.coords["y_geostationary"].values, axis=0), - dims=["time", "y_geostationary"], - coords=dict(time=xr_data.coords["time"].values, y_geostationary=y_coords), - ) - xr_data["x_geostationary_coordinates"] = x_dataarray - xr_data["y_geostationary_coordinates"] = y_dataarray - xr_data.attrs = attrs - return xr_data - - -def open_and_scale_data_hrv(zarr_times, f): - hrv_scaler = ScaleToZeroToOne( - variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595]) - ) - - hrv_scene = Scene(filenames={"seviri_l1b_native": [f]}) - hrv_scene.load( - [ - "HRV", - ] - ) - hrv_dataarray: xr.DataArray = convert_scene_to_dataarray( - hrv_scene, band="HRV", area="RSS", calculate_osgb=False - ) - attrs = serialize_attrs(hrv_dataarray.attrs) - hrv_dataarray = hrv_scaler.rescale(hrv_dataarray) - hrv_dataarray.attrs.update(attrs) - hrv_dataarray = hrv_dataarray.transpose( - "time", "y_geostationary", "x_geostationary", "variable" - ) - hrv_dataset = hrv_dataarray.to_dataset(name="data") - hrv_dataset["data"] = hrv_dataset["data"].astype(np.float16) - - if hrv_dataset.time.values[0] in zarr_times: - print(f"Skipping: {hrv_dataset.time.values[0]}") - return None - - return hrv_dataset - - -def open_and_scale_data_nonhrv(zarr_times, f): - """Zarr path is the path to the zarr file to extend, f is native file to open and scale""" - scaler = ScaleToZeroToOne( - mins=np.array( - [ - -2.5118103, - -64.83977, - 63.404694, - 2.844452, - 199.10002, - -17.254883, - -26.29155, - -1.1009827, - -2.4184198, - 199.57048, - 198.95093, - ] - ), - maxs=np.array( - [ - 69.60857, - 339.15588, - 340.26526, - 317.86752, - 313.2767, - 315.99194, - 274.82297, - 93.786545, - 101.34922, - 249.91806, - 286.96323, - ] - ), - variable_order=[ - "IR_016", - "IR_039", - "IR_087", - "IR_097", - "IR_108", - "IR_120", - "IR_134", - "VIS006", - "VIS008", - "WV_062", - "WV_073", - ], - ) - - scene = Scene(filenames={"seviri_l1b_native": [f]}) - scene.load( - [ - "IR_016", - "IR_039", - "IR_087", - "IR_097", - "IR_108", - "IR_120", - "IR_134", - "VIS006", - "VIS008", - "WV_062", - "WV_073", - ] - ) - dataarray: xr.DataArray = convert_scene_to_dataarray( - scene, band="IR_016", area="RSS", calculate_osgb=False - ) - attrs = serialize_attrs(dataarray.attrs) - dataarray = scaler.rescale(dataarray) - dataarray.attrs.update(attrs) - dataarray = dataarray.transpose("time", "y_geostationary", "x_geostationary", "variable") - dataset = dataarray.to_dataset(name="data") - dataset["data"] = dataset.data.astype(np.float16) - - if dataset.time.values[0] in zarr_times: - print(f"Skipping: {dataset.time.values[0]}") - return None - - return dataset - - -def write_to_zarr(dataset, zarr_name, mode, chunks): - mode_extra_kwargs = { - "a": {"append_dim": "time"}, - "w": { - "encoding": { - "data": { - "compressor": Blosc2("zstd", clevel=5), - }, - "time": {"units": "nanoseconds since 1970-01-01"}, - } - }, - } - extra_kwargs = mode_extra_kwargs[mode] - dataset.isel(x_geostationary=slice(0,5548)).chunk(chunks).to_zarr( - zarr_name, compute=True, **extra_kwargs, consolidated=True, mode=mode, - ) - - -def rewrite_zarr_times(output_name): - # Combine time coords - ds = xr.open_zarr(output_name) - - # Prevent numcodecs string error - # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 - for v in list(ds.coords.keys()): - if ds.coords[v].dtype == object: - ds[v].encoding.clear() - for v in list(ds.variables.keys()): - if ds[v].dtype == object: - ds[v].encoding.clear() - - del ds["data"] - del ds["x_geostationary_coordinates"] - del ds["y_geostationary_coordinates"] - # Need to remove these encodings to avoid chunking - del ds.time.encoding['chunks'] - del ds.time.encoding['preferred_chunks'] - ds.to_zarr(f"{output_name.split('.zarr')[0]}_coord.zarr", consolidated=True) - # Remove current time ones - shutil.rmtree(f"{output_name}/time/") - # Add new time ones - shutil.copytree(f"{output_name.split('.zarr')[0]}_coord.zarr/time", f"{output_name}/time") - - # Now replace the part of the .zmetadata with the part of the .zmetadata from the new coord one - with open(f"{output_name}/.zmetadata", "r") as f: - data = json.load(f) - with open(f"{output_name.split('.zarr')[0]}_coord.zarr/.zmetadata", "r") as f2: - coord_data = json.load(f2) - data["metadata"]["time/.zarray"] = coord_data["metadata"]["time/.zarray"] - with open(f"{output_name}/.zmetadata", "w") as f: - json.dump(data, f) - - -if __name__ == "__main__": - zarr_path = "/mnt/disks/sat/2024_hrv.zarr" - if os.path.exists(zarr_path): - hrv_zarr_times = xr.open_zarr(zarr_path).sortby("time").time.values - last_zarr_time = hrv_zarr_times[-1] - else: - # Set dummy values for times already in zarr - last_zarr_time = dt.datetime(1970, 1, 1) - hrv_zarr_times = [last_zarr_time, last_zarr_time] - native_files = list_native_files() - hrv_datasets = [] - - for f in native_files: - try: - dataset = open_and_scale_data_hrv(hrv_zarr_times, f) - except Exception as e: - print(f"Exception: {e}") - continue - if dataset is not None: - dataset = preprocess_function(dataset) - hrv_datasets.append(dataset) - if len(hrv_datasets) == 12: - mode = "w" - if os.path.exists(zarr_path): - mode = "a" - write_to_zarr( - xr.concat(hrv_datasets, dim="time"), zarr_path, mode, chunks={"time": 12,} - ) - hrv_datasets = [] - - # Consolidate zarr metadata - rewrite_zarr_times(zarr_path) From c6cd324c69d39b661d199a45f0b1621d5f594d7f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 25 Apr 2024 09:02:04 +0000 Subject: [PATCH 5/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- scripts/archival_pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index 3c397bf9..bc4afc4a 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -476,4 +476,3 @@ def _rewrite_zarr_times(output_name): ] ) """ - From 208e39bfba4663b589cba1737ab0669604d66c31 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 25 Apr 2024 11:15:08 +0100 Subject: [PATCH 6/8] fix(scripts): Archive pipeline imports --- scripts/archival_pipeline.py | 81 ++++++++---------------------------- 1 file changed, 17 insertions(+), 64 deletions(-) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index bc4afc4a..d4e93f44 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -5,9 +5,12 @@ import argparse import dataclasses import datetime as dt +import json import logging import multiprocessing import os +import pathlib +import shutil import subprocess import sys from typing import Literal @@ -17,6 +20,7 @@ import pandas as pd import xarray as xr from ocf_blosc2 import Blosc2 +from satpy import Scene from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm @@ -208,7 +212,7 @@ def process_scans( zarr_times = [last_zarr_time, last_zarr_time] # Get native files in order - native_files = list(glob.glob(f"{sat_config.native_path}*/*.nat")) + native_files = list(pathlib.Path(sat_config.native_path).glob("*/*.nat")) log.info(f"Found {len(native_files)} native files at {sat_config.native_path}") native_files.sort() @@ -224,7 +228,7 @@ def process_scans( datasets: list[xr.Dataset] = [] for f in native_files: try: - dataset: xr.Dataset | None = _open_and_scale_data(zarr_times, f, dstype) + dataset: xr.Dataset | None = _open_and_scale_data(zarr_times, f.as_posix(), dstype) except Exception as e: log.error(f"Exception: {e}") continue @@ -234,10 +238,12 @@ def process_scans( # Append to zarrs in hourly chunks (12 sets of 5 minute datasets) # * This is so zarr doesn't complain about mismatching chunk sizes if len(datasets) == 12: - mode = "w" if os.path.exists(zarr_path): mode = "a" - write_to_zarr( + else: + mode = "w" + log.debug(f"No zarr store found for year. Creating new store at {zarr_path}.") + _write_to_zarr( xr.concat(datasets, dim="time"), zarr_path, mode, @@ -266,7 +272,7 @@ def _open_and_scale_data( ) # The reader is the same for each satellite as the sensor is the same # * Hence "severi" in all cases - scene = Scene(filenames={"severi_l1b_native": [f]}) + scene = Scene(filenames={"seviri_l1b_native": [f]}) scene.load([c.variable for c in CHANNELS[dstype]]) da: xr.DataArray = convert_scene_to_dataarray( scene, band=CHANNELS[dstype][0].variable, area="RSS", calculate_osgb=False, @@ -280,8 +286,8 @@ def _open_and_scale_data( ds: xr.Dataset = da.to_dataset(name="data") ds["data"] = ds.data.astype(np.float16) - if dataset.time.values[0] in zarr_times: - log.debug(f"Skipping: {dataset.time.values[0]}") + if ds.time.values[0] in zarr_times: + log.debug(f"Skipping: {ds.time.values[0]}") return None return ds @@ -329,8 +335,8 @@ def _rewrite_zarr_times(output_name): # Combine time coords ds = xr.open_zarr(output_name) - # Prevent numcodecs string error - # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 + # Prevent numcodecs string error + # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 for v in list(ds.coords.keys()): if ds.coords[v].dtype == object: ds[v].encoding.clear() @@ -408,6 +414,8 @@ def _rewrite_zarr_times(output_name): cache.set('secs_per_scan', new_average_secs_per_scan) log.info(f"Completed download for args: {args} in {str(runtime)} (avg {new_average_secs_per_scan} secs per scan)") + log.info("Converting raw data to HRV and non-HRV Zarr Stores") + # Process the HRV and non-HRV data pool = multiprocessing.Pool() results = pool.starmap( @@ -421,58 +429,3 @@ def _rewrite_zarr_times(output_name): pool.join() for result in results: log.info(f"Processed {result} data") - - pool - - -""" -Jacob's old script! - -last_zarr_time = pd.Timestamp("2024-01-01T00:00:00.000000000") -#start_zarr_time = pd.Timestamp("2024-01-01T00:00:00.000000000") -start_date = ( - pd.Timestamp.utcnow().tz_convert("UTC").to_pydatetime().replace(tzinfo=None) -) -start_zarr_time = pd.Timestamp(start_date).to_pydatetime().replace(tzinfo=None) -last_zarr_time = pd.Timestamp(last_zarr_time).to_pydatetime().replace(tzinfo=None) -start_str = last_zarr_time.strftime("%Y-%m-%d") -end_str = start_zarr_time.strftime("%Y-%m-%d") -print(start_str) -print(end_str) -date_range = pd.date_range(start=start_str, end=end_str, freq="5min") -print(date_range) -for date in date_range: - start_date = pd.Timestamp(date) - pd.Timedelta("5min") - end_date = pd.Timestamp(date) + pd.Timedelta("5min") - process = subprocess.run( - [ - "eumdac", - "--set-credentials", - "SWdEnLvOlVTVGli1An1nKJ3NcV0a", - "gUQe0ej7H_MqQVGF4cd7wfQWcawa", - "\n" - ] - ) - end_date_time = end_date.tz_localize(None).strftime("%Y-%m-%dT%H:%M:%S") - start_date_time = start_date.tz_localize(None).strftime("%Y-%m-%dT%H:%M:%S") - - print(start_date_time) - print(end_date_time) - process = subprocess.run( - [ - "eumdac", - "download", - "-c", - "EO:EUM:DAT:MSG:MSG15-RSS", - "-s", - f"{end_date_time}", - "-e", - f"{end_date_time}", - "-o", - "/mnt/disks/sat/native_files/", - "--entry", - "*.nat", - "-y" - ] - ) -""" From f90b2d28989f23ac69212e80d61c2c798993ef6d Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 25 Apr 2024 11:45:04 +0100 Subject: [PATCH 7/8] fix(archive): Cache last processed time --- scripts/archival_pipeline.py | 88 ++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index d4e93f44..1de55ad6 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -103,30 +103,6 @@ class Channel: ], } -parser = argparse.ArgumentParser( - prog="EUMETSTAT Pipeline", - description="Downloads and processes data from EUMETSTAT", -) -parser.add_argument( - 'sat', - help="Which satellite to download data from", - type=str, - choices=list(CONFIGS.keys()), -) -parser.add_argument( - "--start_date", - help="Date to download from (YYYY-MM-DD)", - type=dt.date.fromisoformat, - required=True, -) -parser.add_argument( - "--end_date", - help="Date to download to (YYYY-MM-DD)", - type=dt.date.fromisoformat, - required=False, - default=str(dt.datetime.utcnow().date()), -) - def download_scans( sat_config: Config, scan_times: list[pd.Timestamp], @@ -365,17 +341,48 @@ def _rewrite_zarr_times(output_name): json.dump(data, f) +parser = argparse.ArgumentParser( + prog="EUMETSTAT Pipeline", + description="Downloads and processes data from EUMETSTAT", +) +parser.add_argument( + 'sat', + help="Which satellite to download data from", + type=str, + choices=list(CONFIGS.keys()), +) +parser.add_argument( + "--start_date", + help="Date to download from (YYYY-MM-DD)", + type=dt.date.fromisoformat, + required=False, +) +parser.add_argument( + "--end_date", + help="Date to download to (YYYY-MM-DD)", + type=dt.date.fromisoformat, + required=False, + default=str(dt.datetime.utcnow().date()), +) + if __name__ == "__main__": # Prevent logs interfering with progressbar logging_redirect_tqdm(loggers=[log]) - - # Get running args - args = parser.parse_args() prog_start = dt.datetime.utcnow() - log.info(f"{str(prog_start)}: Running with args: {args}") + # Parse running args + args = parser.parse_args() # Create a reusable cache - cache = dc.Cache('tmp') + cache = dc.Cache(f'/mnt/disks/sat/.cache/{args.sat}') + + if args.start_date is None: + # Try to get the start date from the last cached datetime + try: + args.start_date = dt.date.fromisoformat(cache.get('latest_time')) + except Exception as e: + raise Exception("Can't get last runtime from cache. Pass start_date in manually.") + + log.info(f"{str(prog_start)}: Running with args: {args}") # Get config for desired satellite sat_config = CONFIGS[args.sat] @@ -386,7 +393,7 @@ def _rewrite_zarr_times(output_name): scan_times: list[pd.Timestamp] = pd.date_range(start=start, end=end, freq=sat_config.cadence).tolist() # Get average runtime from cache - secs_per_scan = cache.get('secs_per_scan', default=65) + secs_per_scan = cache.get('secs_per_scan', default=90) expected_runtime = pd.Timedelta(secs_per_scan * len(scan_times), 'seconds') log.info(f"Downloading {len(scan_times)} scans. Expected runtime: {str(expected_runtime)}") @@ -407,14 +414,7 @@ def _rewrite_zarr_times(output_name): for result in results: log.info(f"Completed download with {len(result)} failed scan times.") - - # Calculate the new average time per timestamp - runtime: dt.timedelta = dt.datetime.utcnow() - prog_start - new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2) - cache.set('secs_per_scan', new_average_secs_per_scan) - log.info(f"Completed download for args: {args} in {str(runtime)} (avg {new_average_secs_per_scan} secs per scan)") - - log.info("Converting raw data to HRV and non-HRV Zarr Stores") + log.info("Converting raw data to HRV and non-HRV Zarr Stores.") # Process the HRV and non-HRV data pool = multiprocessing.Pool() @@ -428,4 +428,14 @@ def _rewrite_zarr_times(output_name): pool.close() pool.join() for result in results: - log.info(f"Processed {result} data") + log.info(f"Processed {result} data.") + + # Save the last processed time to cache + cache.set('latest_time', end.isoformat()) + + # Calculate the new average time per timestamp + runtime: dt.timedelta = dt.datetime.utcnow() - prog_start + new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2) + cache.set('secs_per_scan', new_average_secs_per_scan) + log.info(f"Completed archive for args: {args}. ({new_average_secs_per_scan} seconds per scan).") + From 1382f74f9ad02d68d10c84faa364d0936988ecc7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 25 Apr 2024 10:47:16 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- scripts/archival_pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/archival_pipeline.py b/scripts/archival_pipeline.py index 1de55ad6..a698d2e5 100755 --- a/scripts/archival_pipeline.py +++ b/scripts/archival_pipeline.py @@ -438,4 +438,3 @@ def _rewrite_zarr_times(output_name): new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2) cache.set('secs_per_scan', new_average_secs_per_scan) log.info(f"Completed archive for args: {args}. ({new_average_secs_per_scan} seconds per scan).") -