Skip to content

Commit

Permalink
Merge pull request #148 from openclimatefix/issue/147-use-backup
Browse files Browse the repository at this point in the history
add option to use backup feed in click argument
  • Loading branch information
peterdudfield authored Dec 21, 2022
2 parents 6ace16f + 15b2367 commit 731b419
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 13 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ nowcasting_datamodel>=0.0.36
h5netcdf
psutil
eumdac
pyresample==1.25.0
43 changes: 34 additions & 9 deletions satip/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@
help="Run Data Tailor Cleanup and exit",
type=click.BOOL,
)
@click.option(
"--use-backup",
envvar="USE_BACKUP",
default=False,
help="Option not to sue the RSS imaginary. If True, use the 15 mins data. ",
type=click.BOOL,
)
@click.option(
"--maximum-n-datasets",
envvar="MAXIMUM_N_DATASETS",
default=-1,
help="Set the maximum number of dataset to load, default gets them all",
type=click.BOOL,
)
def run(
api_key,
api_secret,
Expand All @@ -101,6 +115,8 @@ def run(
use_rescaler: bool = False,
start_time: str = pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0],
cleanup: bool = False,
use_backup: bool = False,
maximum_n_datasets: int = -1,
):
"""Run main application
Expand All @@ -114,10 +130,11 @@ def run(
use_rescaler: Rescale data to between 0 and 1 or not
start_time: Start time in UTC ISO Format
cleanup: Cleanup Data Tailor
use_backup: use 15 min data, not RSS
maximum_n_datasets: Set the maximum number of dataset to load, default gets them all
"""

logger.info(f'Running application and saving to "{save_dir}"')
using_backup = False
# 1. Get data from API, download native files
with tempfile.TemporaryDirectory() as tmpdir:
download_manager = DownloadManager(
Expand All @@ -144,14 +161,17 @@ def run(
f"Memory in use: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB"
)
# Check if any RSS imagery is available, if not, fall back to 15 minutely data
if len(datasets) == 0:
logger.info("No RSS Imagery available, falling back to 15-minutely data")
if (len(datasets) == 0) or use_backup:
logger.info(
f"No RSS Imagery available or using backup ({use_backup=}), "
f"falling back to 15-minutely data"
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H-%M-%S"),
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
)
using_backup = True
use_backup = True
# Filter out ones that already exist
logger.info(
f"Memory in use: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB"
Expand All @@ -166,8 +186,13 @@ def run(
logger.info("No files to download, exiting")
updated_data = False
else:

if maximum_n_datasets != -1:
logger.debug(f"Ony going to get at most {maximum_n_datasets} datasets")
datasets = datasets[0:maximum_n_datasets]

updated_data = True
if using_backup:
if use_backup:

download_manager.download_tailored_datasets(
datasets,
Expand All @@ -185,7 +210,7 @@ def run(
# 2. Load nat files to one Xarray Dataset
native_files = (
list(glob.glob(os.path.join(tmpdir, "*.nat")))
if not using_backup
if not use_backup
else list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*")))
)
logger.info(native_files)
Expand All @@ -194,7 +219,7 @@ def run(
native_files,
save_dir=save_dir,
use_rescaler=use_rescaler,
using_backup=using_backup,
using_backup=use_backup,
)
logger.info(
f"Memory in use: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB"
Expand All @@ -208,12 +233,12 @@ def run(
f"Memory in use: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB"
)

if not check_both_final_files_exists(save_dir=save_dir, using_backup=using_backup):
if not check_both_final_files_exists(save_dir=save_dir, using_backup=use_backup):
updated_data = True

if updated_data:
# Collate files into single NetCDF file
collate_files_into_latest(save_dir=save_dir, using_backup=using_backup)
collate_files_into_latest(save_dir=save_dir, using_backup=use_backup)
logger.info(
f"Memory in use: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB"
)
Expand Down
2 changes: 1 addition & 1 deletion satip/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def dateset_it_to_filename(dataset_id: str, tailor_id: str, dir) -> str:
"""

# these names are very sensitive, consider carefully when changing
if 'HRSEVIRI' not in tailor_id:
if "HRSEVIRI" not in tailor_id:
filename = f"{dir}/{dataset_id}_{tailor_id}.nat"
else:
filename = f"{dir}/{dataset_id}_EPCT_{tailor_id}"
Expand Down
6 changes: 3 additions & 3 deletions satip/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import glob
import logging
import os
import secrets
import shutil
import subprocess
import tempfile
Expand All @@ -22,7 +23,6 @@
from zipfile import ZipFile

import fsspec
import secrets
import numcodecs
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -351,7 +351,7 @@ def do_v15_rescaling(
dataarray = dataarray.reindex({"variable": variable_order}).transpose(
"time", "y_geostationary", "x_geostationary", "variable"
)
upper_bound = (2 ** 10) - 1
upper_bound = (2**10) - 1
new_max = maxs - mins

dataarray -= mins
Expand Down Expand Up @@ -862,7 +862,7 @@ def save_to_zarr_to_s3(dataset: xr.Dataset, filename: str):
# make sure variable is string
dataset = dataset.assign_coords({"variable": dataset.coords["variable"].astype(str)})

logger.debug(f'{dataset.time}')
logger.debug(f"{dataset.time}")

with zarr.ZipStore(path) as store:
dataset.to_zarr(store, compute=True, mode="w", encoding=encoding, consolidated=True)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def test_save_to_netcdf(): # noqa 103
False,
"--start-time",
datetime.datetime.utcnow().isoformat(),
"--maximum-n-datasets",
1,
],
catch_exceptions=False,
)
native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip")))
assert len(native_files) > 0
Expand All @@ -53,7 +56,10 @@ def test_save_to_netcdf_now(): # noqa 103
tmpdirname,
"--use-rescaler",
False,
"--maximum-n-datasets",
1,
],
catch_exceptions=False,
)
native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip")))
assert len(native_files) > 0
Expand All @@ -76,7 +82,10 @@ def test_cleanup_now(): # noqa 103
False,
"--cleanup",
True,
"--maximum-n-datasets",
1,
],
catch_exceptions=False,
)
native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip")))
assert len(native_files) == 0
Expand All @@ -100,7 +109,10 @@ def test_save_datatailor_to_disk(): # noqa 103
False,
"--start-time",
datetime.datetime.utcnow().isoformat(),
"--maximum-n-datasets",
1,
],
catch_exceptions=False,
)
native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip")))
assert len(native_files) > 0
Expand All @@ -124,7 +136,39 @@ def test_save_to_netcdf_rescaled(): # noqa 103
True,
"--start-time",
datetime.datetime.utcnow().isoformat(),
"--maximum-n-datasets",
1,
],
catch_exceptions=False,
)
native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip")))
assert len(native_files) > 0


@freeze_time("2022-06-28 12:00:00") # Use backup
def test_use_backup(): # noqa 103
user_key = os.environ.get("EUMETSAT_USER_KEY")
user_secret = os.environ.get("EUMETSAT_USER_SECRET")
with tempfile.TemporaryDirectory() as tmpdirname:
runner.invoke(
run,
[
"--api-key",
user_key,
"--api-secret",
user_secret,
"--save-dir",
tmpdirname,
"--use-rescaler",
False,
"--start-time",
datetime.datetime.utcnow().isoformat(),
"--use-backup",
True,
"--maximum-n-datasets",
1,
],
catch_exceptions=False,
)
native_files = list(glob.glob(os.path.join(tmpdirname, "*.zarr.zip")))
assert len(native_files) > 0

0 comments on commit 731b419

Please sign in to comment.