From 8788aeb5ca43eea119d78ff408f2637ced443b94 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 17 Aug 2023 08:27:51 +0100 Subject: [PATCH 1/6] Add checking datasets before each tailored download Also small refactor to make name of function more pythonic --- satip/app.py | 32 +++++++++++++----------- satip/utils.py | 66 +++++++++++++++++++++++++------------------------- 2 files changed, 51 insertions(+), 47 deletions(-) diff --git a/satip/app.py b/satip/app.py index 5ad6f9e4..9035ffbb 100644 --- a/satip/app.py +++ b/satip/app.py @@ -133,7 +133,7 @@ def run( log.info( f'Running application and saving to "{save_dir}"', version=satip.__version__, - memory=utils.getMemory(), + memory=utils.get_memory(), ) # 1. Get data from API, download native files with tempfile.TemporaryDirectory() as tmpdir: @@ -144,11 +144,11 @@ def run( native_file_dir=save_dir_native, ) if cleanup: - log.debug("Running Data Tailor Cleanup", memory=utils.getMemory()) + log.debug("Running Data Tailor Cleanup", memory=utils.get_memory()) download_manager.cleanup_datatailor() return start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history) - log.info(f"Fetching datasets for {start_date} - {start_time}", memory=utils.getMemory()) + log.info(f"Fetching datasets for {start_date} - {start_time}", memory=utils.get_memory()) datasets = download_manager.identify_available_datasets( start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"), end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H-%M-%S"), @@ -158,7 +158,7 @@ def run( log.warn( f"No RSS Imagery available or using backup ({use_backup=}), " f"falling back to 15-minutely data", - memory=utils.getMemory(), + memory=utils.get_memory(), ) datasets = download_manager.identify_available_datasets( start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"), @@ -170,26 +170,30 @@ def run( # if both final files don't exist, then we should make sure we run the whole process datasets = utils.filter_dataset_ids_on_current_files(datasets, save_dir) log.info( - f"Files to download after filtering: {len(datasets)}", memory=utils.getMemory() + f"Files to download after filtering: {len(datasets)}", memory=utils.get_memory() ) if len(datasets) == 0: - log.info("No files to download, exiting", memory=utils.getMemory()) + log.info("No files to download, exiting", memory=utils.get_memory()) updated_data = False else: if maximum_n_datasets != -1: log.debug( f"Ony going to get at most {maximum_n_datasets} datasets", - memory=utils.getMemory(), + memory=utils.get_memory(), ) datasets = datasets[0:maximum_n_datasets] random.shuffle(datasets) # Shuffle so subsequent runs might download different data updated_data = True if use_backup: - download_manager.download_tailored_datasets( - datasets, - product_id="EO:EUM:DAT:MSG:HRSEVIRI", - ) + # Check before downloading each tailored dataset, as it can take awhile + for dset in datasets: + dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) + if len(dset) > 0: + download_manager.download_tailored_datasets( + [dset], + product_id="EO:EUM:DAT:MSG:HRSEVIRI", + ) else: download_manager.download_datasets( datasets, @@ -204,7 +208,7 @@ def run( ) log.debug( "Saving native files to Zarr: " + native_files.__str__(), - memory=utils.getMemory(), + memory=utils.get_memory(), ) # Save to S3 utils.save_native_to_zarr( @@ -224,7 +228,7 @@ def run( if updated_data: # Collate files into single NetCDF file utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_backup) - log.debug("Collated files", memory=utils.getMemory()) + log.debug("Collated files", memory=utils.get_memory()) # 4. update table to show when this data has been pulled if db_url is not None: @@ -232,7 +236,7 @@ def run( with connection.get_session() as session: update_latest_input_data_last_updated(session=session, component="satellite") - log.info("Finished Running application", memory=utils.getMemory()) + log.info("Finished Running application", memory=utils.get_memory()) except Exception as e: log.error(f"Error caught during run: {e}", exc_info=True) diff --git a/satip/utils.py b/satip/utils.py index 2fdead78..fe25067d 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -281,16 +281,16 @@ def convert_scene_to_dataarray( """ if area not in GEOGRAPHIC_BOUNDS: raise ValueError(f"`area` must be one of {GEOGRAPHIC_BOUNDS.keys()}, not '{area}'") - log.debug("Starting scene conversion", memory=getMemory()) + log.debug("Starting scene conversion", memory=get_memory()) if area != "RSS": try: scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area]) except NotImplementedError: # 15 minutely data by default doesn't work for some reason, have to resample it scene = scene.resample("msg_seviri_rss_1km" if band == "HRV" else "msg_seviri_rss_3km") - log.debug("Finished resample", memory=getMemory()) + log.debug("Finished resample", memory=get_memory()) scene = scene.crop(ll_bbox=GEOGRAPHIC_BOUNDS[area]) - log.debug("Finished crop", memory=getMemory()) + log.debug("Finished crop", memory=get_memory()) # Remove acq time from all bands because it is not useful, and can actually # get in the way of combining multiple Zarr datasets. data_attrs = {} @@ -301,7 +301,7 @@ def convert_scene_to_dataarray( data_attrs[new_name] = scene[channel].attrs[attr] dataset: xr.Dataset = scene.to_xarray_dataset() dataarray = dataset.to_array() - log.debug("Converted to dataarray", memory=getMemory()) + log.debug("Converted to dataarray", memory=get_memory()) # Lat and Lon are the same for all the channels now if calculate_osgb: @@ -323,7 +323,7 @@ def convert_scene_to_dataarray( for name in ["x", "y"]: dataarray[name].attrs["coordinate_reference_system"] = "geostationary" - log.info("Calculated OSGB", memory=getMemory()) + log.info("Calculated OSGB", memory=get_memory()) # Round to the nearest 5 minutes dataarray.attrs.update(data_attrs) dataarray.attrs["end_time"] = pd.Timestamp(dataarray.attrs["end_time"]).round("5 min") @@ -336,7 +336,7 @@ def convert_scene_to_dataarray( del dataarray["crs"] del scene - log.debug("Finished conversion", memory=getMemory()) + log.debug("Finished conversion", memory=get_memory()) return dataarray @@ -373,10 +373,10 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d Returns the Xarray dataset from the filename """ if ".nat" in filename: - log.debug(f"Loading Native {filename}", memory=getMemory()) + log.debug(f"Loading Native {filename}", memory=get_memory()) hrv_scene = load_native_from_zip(filename) else: - log.debug(f"Loading HRIT {filename}", memory=getMemory()) + log.debug(f"Loading HRIT {filename}", memory=get_memory()) hrv_scene = load_hrit_from_zip(filename, sections=list(range(16, 25))) hrv_scene.load( [ @@ -385,11 +385,11 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d generate=False, ) - log.debug("Loaded HRV", memory=getMemory()) + log.debug("Loaded HRV", memory=get_memory()) hrv_dataarray: xr.DataArray = convert_scene_to_dataarray( hrv_scene, band="HRV", area="UK", calculate_osgb=True ) - log.debug("Converted HRV to dataarray", memory=getMemory()) + log.debug("Converted HRV to dataarray", memory=get_memory()) del hrv_scene attrs = serialize_attrs(hrv_dataarray.attrs) if use_rescaler: @@ -404,18 +404,18 @@ def get_dataset_from_scene(filename: str, hrv_scaler, use_rescaler: bool, save_d hrv_dataarray = hrv_dataarray.transpose( "time", "y_geostationary", "x_geostationary", "variable" ) - log.info("Rescaled HRV", memory=getMemory()) + log.info("Rescaled HRV", memory=get_memory()) hrv_dataarray = hrv_dataarray.chunk((1, 512, 512, 1)) hrv_dataset = hrv_dataarray.to_dataset(name="data") hrv_dataset.attrs.update(attrs) - log.debug("Converted HRV to DataArray", memory=getMemory()) + log.debug("Converted HRV to DataArray", memory=get_memory()) now_time = pd.Timestamp(hrv_dataset["time"].values[0]).strftime("%Y%m%d%H%M") save_file = os.path.join(save_dir, f"{'15_' if using_backup else ''}hrv_{now_time}.zarr.zip") - log.debug(f"Saving HRV netcdf in {save_file}", memory=getMemory()) + log.debug(f"Saving HRV netcdf in {save_file}", memory=get_memory()) save_to_zarr_to_s3(hrv_dataset, save_file) del hrv_dataset gc.collect() - log.debug("Saved HRV to NetCDF", memory=getMemory()) + log.debug("Saved HRV to NetCDF", memory=get_memory()) def get_nonhrv_dataset_from_scene( @@ -444,11 +444,11 @@ def get_nonhrv_dataset_from_scene( ], generate=False, ) - log.debug(f"Loaded non-hrv file: {filename}", memory=getMemory()) + log.debug(f"Loaded non-hrv file: {filename}", memory=get_memory()) dataarray: xr.DataArray = convert_scene_to_dataarray( scene, band="IR_016", area="UK", calculate_osgb=True ) - log.debug(f"Converted non-HRV file {filename} to dataarray", memory=getMemory()) + log.debug(f"Converted non-HRV file {filename} to dataarray", memory=get_memory()) del scene attrs = serialize_attrs(dataarray.attrs) if use_rescaler: @@ -503,17 +503,17 @@ def get_nonhrv_dataset_from_scene( dataarray = dataarray.transpose("time", "y_geostationary", "x_geostationary", "variable") dataarray = dataarray.chunk((1, 256, 256, 1)) dataset = dataarray.to_dataset(name="data") - log.debug("Converted non-HRV to dataset", memory=getMemory()) + log.debug("Converted non-HRV to dataset", memory=get_memory()) del dataarray dataset.attrs.update(attrs) - log.debug("Deleted return list", memory=getMemory()) + log.debug("Deleted return list", memory=get_memory()) now_time = pd.Timestamp(dataset["time"].values[0]).strftime("%Y%m%d%H%M") save_file = os.path.join(save_dir, f"{'15_' if using_backup else ''}{now_time}.zarr.zip") - log.debug(f"Saving non-HRV netcdf in {save_file}", memory=getMemory()) + log.debug(f"Saving non-HRV netcdf in {save_file}", memory=get_memory()) save_to_zarr_to_s3(dataset, save_file) del dataset gc.collect() - log.debug(f"Saved non-HRV file {save_file}", memory=getMemory()) + log.debug(f"Saved non-HRV file {save_file}", memory=get_memory()) def load_hrit_from_zip(filename: str, sections: list) -> Scene: @@ -573,7 +573,7 @@ def save_native_to_zarr( log.debug( f"Converting from {'HRIT' if using_backup else 'native'} to zarr in {save_dir}", - memory=getMemory(), + memory=get_memory(), ) scaler = ScaleToZeroToOne( @@ -625,24 +625,24 @@ def save_native_to_zarr( variable_order=["HRV"], maxs=np.array([103.90016]), mins=np.array([-1.2278595]) ) for f in list_of_native_files: - log.debug(f"Processing {f}", memory=getMemory()) + log.debug(f"Processing {f}", memory=get_memory()) if "EPCT" in f: - log.debug(f"Processing HRIT file {f}", memory=getMemory()) + log.debug(f"Processing HRIT file {f}", memory=get_memory()) if "HRV" in f: - log.debug(f"Processing HRV {f}", memory=getMemory()) + log.debug(f"Processing HRV {f}", memory=get_memory()) get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, using_backup) else: - log.debug(f"Processing non-HRV {f}", memory=getMemory()) + log.debug(f"Processing non-HRV {f}", memory=get_memory()) get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, using_backup) else: if "HRV" in bands: - log.debug(f"Processing HRV {f}", memory=getMemory()) + log.debug(f"Processing HRV {f}", memory=get_memory()) get_dataset_from_scene(f, hrv_scaler, use_rescaler, save_dir, using_backup) - log.debug(f"Processing non-HRV {f}", memory=getMemory()) + log.debug(f"Processing non-HRV {f}", memory=get_memory()) get_nonhrv_dataset_from_scene(f, scaler, use_rescaler, save_dir, using_backup) - log.debug(f"Finished processing files: {list_of_native_files}", memory=getMemory()) + log.debug(f"Finished processing files: {list_of_native_files}", memory=get_memory()) def save_dataarray_to_zarr( @@ -794,7 +794,7 @@ def save_to_zarr_to_s3(dataset: xr.Dataset, filename: str): """ gc.collect() - log.info(f"Saving file to {filename}", memory=getMemory()) + log.info(f"Saving file to {filename}", memory=get_memory()) with tempfile.TemporaryDirectory() as dir: # save locally @@ -804,15 +804,15 @@ def save_to_zarr_to_s3(dataset: xr.Dataset, filename: str): # make sure variable is string dataset = dataset.assign_coords({"variable": dataset.coords["variable"].astype(str)}) - log.debug(f"Dataset time: {dataset.time}", memory=getMemory()) + log.debug(f"Dataset time: {dataset.time}", memory=get_memory()) with zarr.ZipStore(path) as store: dataset.to_zarr(store, compute=True, mode="w", encoding=encoding, consolidated=True) new_times = xr.open_dataset(f"zip::{path}", engine="zarr").time - log.debug(f"New times for {path}: {new_times}", memory=getMemory()) + log.debug(f"New times for {path}: {new_times}", memory=get_memory()) - log.debug(f"Saved to temporary file {path}, now pushing to {filename}", memory=getMemory()) + log.debug(f"Saved to temporary file {path}, now pushing to {filename}", memory=get_memory()) # save to s3 filesystem = fsspec.open(filename).fs filesystem.put(path, filename) @@ -1074,7 +1074,7 @@ def collate_files_into_latest(save_dir: str, using_backup: bool = False): log.debug(f"{filename} {new_times}") -def getMemory() -> str: +def get_memory() -> str: """ Gets memory of process as a string """ From 3c72ae1c8aad4b9041c7a5cc8341aa329e773dc6 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 17 Aug 2023 08:31:01 +0100 Subject: [PATCH 2/6] Fix lint --- satip/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/satip/app.py b/satip/app.py index 9035ffbb..f38071a0 100644 --- a/satip/app.py +++ b/satip/app.py @@ -148,7 +148,8 @@ def run( download_manager.cleanup_datatailor() return start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history) - log.info(f"Fetching datasets for {start_date} - {start_time}", memory=utils.get_memory()) + log.info(f"Fetching datasets for {start_date} - {start_time}", + memory=utils.get_memory()) datasets = download_manager.identify_available_datasets( start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"), end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H-%M-%S"), From ff42b12623e1a72b21e3804f474477f6883ba3cf Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 17 Aug 2023 14:14:28 +0100 Subject: [PATCH 3/6] Readd things --- satip/app.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/satip/app.py b/satip/app.py index f38071a0..8acc9285 100644 --- a/satip/app.py +++ b/satip/app.py @@ -196,10 +196,14 @@ def run( product_id="EO:EUM:DAT:MSG:HRSEVIRI", ) else: - download_manager.download_datasets( - datasets, - product_id="EO:EUM:DAT:MSG:MSG15-RSS", - ) + # Check before downloading each tailored dataset, as it can take awhile + for dset in datasets: + dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) + if len(dset) > 0: + download_manager.download_datasets( + [dset], + product_id="EO:EUM:DAT:MSG:MSG15-RSS", + ) # 2. Load nat files to one Xarray Dataset native_files = ( From 96f9f05ce11cdeb9b280ac8328d09665031cf66e Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 17 Aug 2023 14:51:01 +0100 Subject: [PATCH 4/6] Remove extra list? --- satip/app.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/satip/app.py b/satip/app.py index 8acc9285..683eff06 100644 --- a/satip/app.py +++ b/satip/app.py @@ -189,19 +189,19 @@ def run( if use_backup: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: - dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) + dset = utils.filter_dataset_ids_on_current_files(dset, save_dir) if len(dset) > 0: download_manager.download_tailored_datasets( - [dset], + dset, product_id="EO:EUM:DAT:MSG:HRSEVIRI", ) else: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: - dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) + dset = utils.filter_dataset_ids_on_current_files(dset, save_dir) if len(dset) > 0: download_manager.download_datasets( - [dset], + dset, product_id="EO:EUM:DAT:MSG:MSG15-RSS", ) From d2eb402691f9b63f3a650406b9daca27dd9c0395 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 17 Aug 2023 14:59:17 +0100 Subject: [PATCH 5/6] Add prints --- satip/app.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/satip/app.py b/satip/app.py index 683eff06..135a038f 100644 --- a/satip/app.py +++ b/satip/app.py @@ -189,6 +189,7 @@ def run( if use_backup: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: + print(dset) dset = utils.filter_dataset_ids_on_current_files(dset, save_dir) if len(dset) > 0: download_manager.download_tailored_datasets( @@ -198,6 +199,7 @@ def run( else: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: + print(dset) dset = utils.filter_dataset_ids_on_current_files(dset, save_dir) if len(dset) > 0: download_manager.download_datasets( From 9a8f482a4d96c2c06f7132de9ab2d921bf6d7a33 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 17 Aug 2023 15:26:18 +0100 Subject: [PATCH 6/6] Only wrap list once --- satip/app.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/satip/app.py b/satip/app.py index 135a038f..ac31a8f2 100644 --- a/satip/app.py +++ b/satip/app.py @@ -189,8 +189,7 @@ def run( if use_backup: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: - print(dset) - dset = utils.filter_dataset_ids_on_current_files(dset, save_dir) + dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) if len(dset) > 0: download_manager.download_tailored_datasets( dset, @@ -199,8 +198,7 @@ def run( else: # Check before downloading each tailored dataset, as it can take awhile for dset in datasets: - print(dset) - dset = utils.filter_dataset_ids_on_current_files(dset, save_dir) + dset = utils.filter_dataset_ids_on_current_files([dset], save_dir) if len(dset) > 0: download_manager.download_datasets( dset,