Skip to content

Commit

Permalink
Merge pull request #190 from openclimatefix/jacob/tailored-checking
Browse files Browse the repository at this point in the history
Add checking datasets before each tailored download
  • Loading branch information
peterdudfield authored Aug 17, 2023
2 parents c34a28a + 9a8f482 commit 9741b22
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
45 changes: 27 additions & 18 deletions satip/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def run(
log.info(
f'Running application and saving to "{save_dir}"',
version=satip.__version__,
memory=utils.getMemory(),
memory=utils.get_memory(),
)
# 1. Get data from API, download native files
with tempfile.TemporaryDirectory() as tmpdir:
Expand All @@ -144,11 +144,12 @@ def run(
native_file_dir=save_dir_native,
)
if cleanup:
log.debug("Running Data Tailor Cleanup", memory=utils.getMemory())
log.debug("Running Data Tailor Cleanup", memory=utils.get_memory())
download_manager.cleanup_datatailor()
return
start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history)
log.info(f"Fetching datasets for {start_date} - {start_time}", memory=utils.getMemory())
log.info(f"Fetching datasets for {start_date} - {start_time}",
memory=utils.get_memory())
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H-%M-%S"),
Expand All @@ -158,7 +159,7 @@ def run(
log.warn(
f"No RSS Imagery available or using backup ({use_backup=}), "
f"falling back to 15-minutely data",
memory=utils.getMemory(),
memory=utils.get_memory(),
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"),
Expand All @@ -170,31 +171,39 @@ def run(
# if both final files don't exist, then we should make sure we run the whole process
datasets = utils.filter_dataset_ids_on_current_files(datasets, save_dir)
log.info(
f"Files to download after filtering: {len(datasets)}", memory=utils.getMemory()
f"Files to download after filtering: {len(datasets)}", memory=utils.get_memory()
)

if len(datasets) == 0:
log.info("No files to download, exiting", memory=utils.getMemory())
log.info("No files to download, exiting", memory=utils.get_memory())
updated_data = False
else:
if maximum_n_datasets != -1:
log.debug(
f"Ony going to get at most {maximum_n_datasets} datasets",
memory=utils.getMemory(),
memory=utils.get_memory(),
)
datasets = datasets[0:maximum_n_datasets]
random.shuffle(datasets) # Shuffle so subsequent runs might download different data
updated_data = True
if use_backup:
download_manager.download_tailored_datasets(
datasets,
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
)
# Check before downloading each tailored dataset, as it can take awhile
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
download_manager.download_tailored_datasets(
dset,
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
)
else:
download_manager.download_datasets(
datasets,
product_id="EO:EUM:DAT:MSG:MSG15-RSS",
)
# Check before downloading each tailored dataset, as it can take awhile
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
download_manager.download_datasets(
dset,
product_id="EO:EUM:DAT:MSG:MSG15-RSS",
)

# 2. Load nat files to one Xarray Dataset
native_files = (
Expand All @@ -204,7 +213,7 @@ def run(
)
log.debug(
"Saving native files to Zarr: " + native_files.__str__(),
memory=utils.getMemory(),
memory=utils.get_memory(),
)
# Save to S3
utils.save_native_to_zarr(
Expand All @@ -224,15 +233,15 @@ def run(
if updated_data:
# Collate files into single NetCDF file
utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_backup)
log.debug("Collated files", memory=utils.getMemory())
log.debug("Collated files", memory=utils.get_memory())

# 4. update table to show when this data has been pulled
if db_url is not None:
connection = DatabaseConnection(url=db_url, base=Base_Forecast)
with connection.get_session() as session:
update_latest_input_data_last_updated(session=session, component="satellite")

log.info("Finished Running application", memory=utils.getMemory())
log.info("Finished Running application", memory=utils.get_memory())

except Exception as e:
log.error(f"Error caught during run: {e}", exc_info=True)
Expand Down
66 changes: 33 additions & 33 deletions satip/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,16 @@ def convert_scene_to_dataarray(
"""
if area not in GEOGRAPHIC_BOUNDS:
raise ValueError(f"`area` must be one of {GEOGRAPHIC_BOUNDS.keys()}, not '{area}'")
log.debug("Starting scene conversion", memory=getMemory())
log.debug("Starting scene conversion", memory=get_memory())
if area != "RSS":
try:
scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
except NotImplementedError:
# 15 minutely data by default doesn't work for some reason, have to resample it
scene = scene.resample("msg_seviri_rss_1km" if band == "HRV" else "msg_seviri_rss_3km")
log.debug("Finished resample", memory=getMemory())
log.debug("Finished resample", memory=get_memory())
scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area])
log.debug("Finished crop", memory=getMemory())
log.debug("Finished crop", memory=get_memory())
# Remove acq time from all bands because it is not useful, and can actually
# get in the way of combining multiple Zarr datasets.
data_attrs = {}
Expand All @@ -301,7 +301,7 @@ def convert_scene_to_dataarray(
data_attrs[new_name] = scene[channel].attrs[attr]
dataset: xr.Dataset = scene.to_xarray_dataset()
dataarray = dataset.to_array()
log.debug("Converted to dataarray", memory=getMemory())
log.debug("Converted to dataarray", memory=get_memory())

# Lat and Lon are the same for all the channels now
if calculate_osgb:
Expand All @@ -323,7 +323,7 @@ def convert_scene_to_dataarray(

for name in ["x", "y"]:
dataarray[name].attrs["coordinate_reference_system"] = "geostationary"
log.info("Calculated OSGB", memory=getMemory())
log.info("Calculated OSGB", memory=get_memory())
# Round to the nearest 5 minutes
dataarray.attrs.update(data_attrs)
dataarray.attrs["end_time"] = pd.Timestamp(dataarray.attrs["end_time"]).round("5 min")
Expand All @@ -336,7 +336,7 @@ def convert_scene_to_dataarray(

del dataarray["crs"]
del scene
log.debug("Finished conversion", memory=getMemory())
log.debug("Finished conversion", memory=get_memory())
return dataarray


Expand Down Expand Up @@ -373,10 +373,10 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d
Returns the Xarray dataset from the filename
"""
if ".nat" in filename:
log.debug(f"Loading Native {filename}", memory=getMemory())
log.debug(f"Loading Native {filename}", memory=get_memory())
hrv_scene = load_native_from_zip(filename)
else:
log.debug(f"Loading HRIT {filename}", memory=getMemory())
log.debug(f"Loading HRIT {filename}", memory=get_memory())
hrv_scene = load_hrit_from_zip(filename, sections=list(range(16, 25)))
hrv_scene.load(
[
Expand All @@ -385,11 +385,11 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d
generate=False,
)

log.debug("Loaded HRV", memory=getMemory())
log.debug("Loaded HRV", memory=get_memory())
hrv_dataarray: xr.DataArray = convert_scene_to_dataarray(
hrv_scene, band="HRV", area="UK", calculate_osgb=True
)
log.debug("Converted HRV to dataarray", memory=getMemory())
log.debug("Converted HRV to dataarray", memory=get_memory())
del hrv_scene
attrs = serialize_attrs(hrv_dataarray.attrs)
if use_rescaler:
Expand All @@ -404,18 +404,18 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d
hrv_dataarray = hrv_dataarray.transpose(
"time", "y_geostationary", "x_geostationary", "variable"
)
log.info("Rescaled HRV", memory=getMemory())
log.info("Rescaled HRV", memory=get_memory())
hrv_dataarray = hrv_dataarray.chunk((1, 512, 512, 1))
hrv_dataset = hrv_dataarray.to_dataset(name="data")
hrv_dataset.attrs.update(attrs)
log.debug("Converted HRV to DataArray", memory=getMemory())
log.debug("Converted HRV to DataArray", memory=get_memory())
now_time = pd.Timestamp(hrv_dataset["time"].values[0]).strftime("%Y%m%d%H%M")
save_file = os.path.join(save_dir, f"{'15_' if using_backup else ''}hrv_{now_time}.zarr.zip")
log.debug(f"Saving HRV netcdf in {save_file}", memory=getMemory())
log.debug(f"Saving HRV netcdf in {save_file}", memory=get_memory())
save_to_zarr_to_s3(hrv_dataset, save_file)
del hrv_dataset
gc.collect()
log.debug("Saved HRV to NetCDF", memory=getMemory())
log.debug("Saved HRV to NetCDF", memory=get_memory())


def get_nonhrv_dataset_from_scene(
Expand Down Expand Up @@ -444,11 +444,11 @@ def get_nonhrv_dataset_from_scene(
],
generate=False,
)
log.debug(f"Loaded non-hrv file: {filename}", memory=getMemory())
log.debug(f"Loaded non-hrv file: {filename}", memory=get_memory())
dataarray: xr.DataArray = convert_scene_to_dataarray(
scene, band="IR_016", area="UK", calculate_osgb=True
)
log.debug(f"Converted non-HRV file {filename} to dataarray", memory=getMemory())
log.debug(f"Converted non-HRV file {filename} to dataarray", memory=get_memory())
del scene
attrs = serialize_attrs(dataarray.attrs)
if use_rescaler:
Expand Down Expand Up @@ -503,17 +503,17 @@ def get_nonhrv_dataset_from_scene(
dataarray = dataarray.transpose("time", "y_geostationary", "x_geostationary", "variable")
dataarray = dataarray.chunk((1, 256, 256, 1))
dataset = dataarray.to_dataset(name="data")
log.debug("Converted non-HRV to dataset", memory=getMemory())
log.debug("Converted non-HRV to dataset", memory=get_memory())
del dataarray
dataset.attrs.update(attrs)
log.debug("Deleted return list", memory=getMemory())
log.debug("Deleted return list", memory=get_memory())
now_time = pd.Timestamp(dataset["time"].values[0]).strftime("%Y%m%d%H%M")
save_file = os.path.join(save_dir, f"{'15_' if using_backup else ''}{now_time}.zarr.zip")
log.debug(f"Saving non-HRV netcdf in {save_file}", memory=getMemory())
log.debug(f"Saving non-HRV netcdf in {save_file}", memory=get_memory())
save_to_zarr_to_s3(dataset, save_file)
del dataset
gc.collect()
log.debug(f"Saved non-HRV file {save_file}", memory=getMemory())
log.debug(f"Saved non-HRV file {save_file}", memory=get_memory())


def load_hrit_from_zip(filename: str, sections: list) -> Scene:
Expand Down Expand Up @@ -573,7 +573,7 @@ def save_native_to_zarr(

log.debug(
f"Converting from {'HRIT' if using_backup else 'native'} to zarr in {save_dir}",
memory=getMemory(),
memory=get_memory(),
)

scaler = ScaleToZeroToOne(
Expand Down Expand Up @@ -625,24 +625,24 @@ def save_native_to_zarr(
variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595])
)
for f in list_of_native_files:
log.debug(f"Processing {f}", memory=getMemory())
log.debug(f"Processing {f}", memory=get_memory())
if "EPCT" in f:
log.debug(f"Processing HRIT file {f}", memory=getMemory())
log.debug(f"Processing HRIT file {f}", memory=get_memory())
if "HRV" in f:
log.debug(f"Processing HRV {f}", memory=getMemory())
log.debug(f"Processing HRV {f}", memory=get_memory())
get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, using_backup)
else:
log.debug(f"Processing non-HRV {f}", memory=getMemory())
log.debug(f"Processing non-HRV {f}", memory=get_memory())
get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, using_backup)
else:
if "HRV" in bands:
log.debug(f"Processing HRV {f}", memory=getMemory())
log.debug(f"Processing HRV {f}", memory=get_memory())
get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, using_backup)

log.debug(f"Processing non-HRV {f}", memory=getMemory())
log.debug(f"Processing non-HRV {f}", memory=get_memory())
get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, using_backup)

log.debug(f"Finished processing files: {list_of_native_files}", memory=getMemory())
log.debug(f"Finished processing files: {list_of_native_files}", memory=get_memory())


def save_dataarray_to_zarr(
Expand Down Expand Up @@ -794,7 +794,7 @@ def save_to_zarr_to_s3(dataset: xr.Dataset, filename: str):
"""

gc.collect()
log.info(f"Saving file to {filename}", memory=getMemory())
log.info(f"Saving file to {filename}", memory=get_memory())

with tempfile.TemporaryDirectory() as dir:
# save locally
Expand All @@ -804,15 +804,15 @@ def save_to_zarr_to_s3(dataset: xr.Dataset, filename: str):
# make sure variable is string
dataset = dataset.assign_coords({"variable": dataset.coords["variable"].astype(str)})

log.debug(f"Dataset time: {dataset.time}", memory=getMemory())
log.debug(f"Dataset time: {dataset.time}", memory=get_memory())

with zarr.ZipStore(path) as store:
dataset.to_zarr(store, compute=True, mode="w", encoding=encoding, consolidated=True)

new_times = xr.open_dataset(f"zip::{path}", engine="zarr").time
log.debug(f"New times for {path}: {new_times}", memory=getMemory())
log.debug(f"New times for {path}: {new_times}", memory=get_memory())

log.debug(f"Saved to temporary file {path}, now pushing to {filename}", memory=getMemory())
log.debug(f"Saved to temporary file {path}, now pushing to {filename}", memory=get_memory())
# save to s3
filesystem = fsspec.open(filename).fs
filesystem.put(path, filename)
Expand Down Expand Up @@ -1074,7 +1074,7 @@ def collate_files_into_latest(save_dir: str, using_backup: bool = False):
log.debug(f"{filename} {new_times}")


def getMemory() -> str:
def get_memory() -> str:
"""
Gets memory of process as a string
"""
Expand Down

0 comments on commit 9741b22

Please sign in to comment.