Skip to content

Commit

Permalink
Merge pull request #166 from openclimatefix/issue/161-data-tailor-tim…
Browse files Browse the repository at this point in the history
…eout

add timeout for data tailor, this stops services hanging
  • Loading branch information
peterdudfield authored Mar 16, 2023
2 parents 453f50f + 12c7644 commit f674537
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 90 deletions.
53 changes: 0 additions & 53 deletions .github/workflows/plotting.yaml

This file was deleted.

15 changes: 13 additions & 2 deletions .github/workflows/workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ jobs:
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f "setup.py" ]; then pip install -e .; else export PYTHONPATH=$PYTHONPATH:./src; fi
echo "PYTHONPATH=$PYTHONPATH" >> $GITHUB_ENV
- name: Setup with pytest-xdist
run: |
# lets get the string for how many cpus to use with pytest
echo "Will be using 4 cpus for pytest testing"
pip install pytest-xdist
#
# make PYTESTXDIST
export PYTESTXDIST="-n 4"
# echo results and save env var for other jobs
echo "pytest-xdist options that will be used are: $PYTESTXDIST"
echo "PYTESTXDIST=$PYTESTXDIST" >> $GITHUB_ENV
- name: Setup with pytest-cov
run: |
# let make pytest run with coverage
Expand All @@ -45,12 +56,12 @@ jobs:
run: |
export EUMETSAT_USER_KEY="${{ secrets.EUMETSAT_USER_KEY }}"
export EUMETSAT_USER_SECRET="${{ secrets.EUMETSAT_USER_SECRET }}"
export PYTEST_COMMAND="pytest $PYTESTCOV $PYTESTXDIST -s"
export PYTEST_COMMAND="pytest $PYTESTCOV $PYTESTXDIST -s --log-level=DEBUG"
echo "Will be running this command: $PYTEST_COMMAND"
eval $PYTEST_COMMAND
- name: Show coverage
run: coverage report -m
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v2
with:
fail_ci_if_error: true
fail_ci_if_error: false
26 changes: 20 additions & 6 deletions satip/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ def run(

try:

log.info(f'Running application and saving to "{save_dir}"', version=satip.__version__, memory=utils.getMemory())
log.info(
f'Running application and saving to "{save_dir}"',
version=satip.__version__,
memory=utils.getMemory(),
)
# 1. Get data from API, download native files
with tempfile.TemporaryDirectory() as tmpdir:
download_manager = DownloadManager(
Expand All @@ -143,7 +147,7 @@ def run(
download_manager.cleanup_datatailor()
return
start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history)
log.info(f'Fetching datasets for {start_date} - {start_time}', memory=utils.getMemory())
log.info(f"Fetching datasets for {start_date} - {start_time}", memory=utils.getMemory())
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H-%M-%S"),
Expand All @@ -152,7 +156,8 @@ def run(
if (len(datasets) == 0) or use_backup:
log.warn(
f"No RSS Imagery available or using backup ({use_backup=}), "
f"falling back to 15-minutely data", memory=utils.getMemory()
f"falling back to 15-minutely data",
memory=utils.getMemory(),
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H-%M-%S"),
Expand All @@ -163,14 +168,19 @@ def run(
# Filter out ones that already exist
# if both final files don't exist, then we should make sure we run the whole process
datasets = utils.filter_dataset_ids_on_current_files(datasets, save_dir)
log.info(f"Files to download after filtering: {len(datasets)}", memory=utils.getMemory())
log.info(
f"Files to download after filtering: {len(datasets)}", memory=utils.getMemory()
)

if len(datasets) == 0:
log.info("No files to download, exiting", memory=utils.getMemory())
updated_data = False
else:
if maximum_n_datasets != -1:
log.debug(f"Ony going to get at most {maximum_n_datasets} datasets", memory=utils.getMemory())
log.debug(
f"Ony going to get at most {maximum_n_datasets} datasets",
memory=utils.getMemory(),
)
datasets = datasets[0:maximum_n_datasets]

updated_data = True
Expand All @@ -191,7 +201,10 @@ def run(
if not use_backup
else list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*")))
)
log.debug("Saving native files to Zarr: " + native_files.__str__(), memory=utils.getMemory())
log.debug(
"Saving native files to Zarr: " + native_files.__str__(),
memory=utils.getMemory(),
)
# Save to S3
utils.save_native_to_zarr(
native_files,
Expand Down Expand Up @@ -222,6 +235,7 @@ def run(

except Exception as e:
log.error(f"Error caught during run: {e}", exc_info=True)
raise e


if __name__ == "__main__":
Expand Down
10 changes: 8 additions & 2 deletions satip/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ def _sanity_check_files_and_move_to_directory(directory: str, product_id: str) -
log.warn(
f"Error when sanity-checking {f}. Skipping this file. "
+ "Will be downloaded next time this script is run.",
file=f, filesize=file_size, expsize=CLOUD_FILESIZE_MB
file=f,
filesize=file_size,
expsize=CLOUD_FILESIZE_MB,
)
continue
else:
Expand All @@ -267,7 +269,11 @@ def _process_rss_images(
try:
file_size = eumetsat.get_filesize_megabytes(f)
if not math.isclose(file_size, NATIVE_FILESIZE_MB, abs_tol=1):
log.debug("RSS Image has the wrong size, skipping", filesize=file_size, expsize=NATIVE_FILESIZE_MB)
log.debug(
"RSS Image has the wrong size, skipping",
filesize=file_size,
expsize=NATIVE_FILESIZE_MB,
)
return

# Now that the file has been checked and can be opened,
Expand Down
93 changes: 78 additions & 15 deletions satip/eumetsat.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
# Data Tailor download endpoint
API_TAILORED_DOWNLOAD_ENDPOINT = API_ENDPOINT + "/epcs/download"

# Data Tailor time out
DATA_TAILOR_TIMEOUT_LIMIT_MINUTES = 15


# TODO: This function is not used anywhere in the code, suggest to remove.
def build_url_string(url, parameters):
Expand Down Expand Up @@ -157,7 +160,7 @@ def identify_available_datasets(
"""
log.info(
f"Identifying which dataset are available for {start_date} {end_date} {product_id}",
productID=product_id
productID=product_id,
)

r_json = query_data_products(start_date, end_date, product_id=product_id).json()
Expand Down Expand Up @@ -190,7 +193,10 @@ def identify_available_datasets(
datasets = datasets + batch_r_json["features"]

if num_total_results != len(datasets):
log.warn(f"Some features have not been appended - {len(datasets)} / {num_total_results}", productID=product_id)
log.warn(
f"Some features have not been appended - {len(datasets)} / {num_total_results}",
productID=product_id,
)

return datasets

Expand Down Expand Up @@ -335,7 +341,10 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"):

# Downloading specified datasets
if not dataset_ids:
log.info("No files will be downloaded. None were found in API search.", parent="DownloadManager")
log.info(
"No files will be downloaded. None were found in API search.",
parent="DownloadManager",
)
return

for dataset_id in dataset_ids:
Expand All @@ -354,7 +363,11 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"):
)
self.download_single_dataset(dataset_link)
except Exception as e:
log.error(f"Error downloading dataset with id {dataset_id}: {e}", exc_info=True, parent="DownloadManager")
log.error(
f"Error downloading dataset with id {dataset_id}: {e}",
exc_info=True,
parent="DownloadManager",
)

def download_tailored_date_range(
self,
Expand Down Expand Up @@ -408,7 +421,10 @@ def download_tailored_datasets(
log.debug(f"Dataset IDS: {dataset_ids}", parent="DownloadManager")
# Downloading specified datasets
if not dataset_ids:
log.info("No files will be downloaded. None were found in API search.", parent="DownloadManager")
log.info(
"No files will be downloaded. None were found in API search.",
parent="DownloadManager",
)
return

for dataset_id in dataset_ids:
Expand Down Expand Up @@ -539,42 +555,80 @@ def create_and_download_datatailor_data(
# copy to 'data_dir'
log.debug(
f"Copying file from {data_store_filename_remote} to {data_store_filename_local}",
parent="DownloadManager"
parent="DownloadManager",
)
fs.get(data_store_filename_remote, data_store_filename_local)

else:
log.debug(f"{data_store_filename_remote} does not exist, so will download it", parent="DownloadManager")
log.debug(
f"{data_store_filename_remote} does not exist, so will download it",
parent="DownloadManager",
)

log.debug("Making customisation, this can take ~1 minute", parent="DownloadManager")
chain = eumdac.tailor_models.Chain(
product=tailor_id,
format=file_format,
projection=projection,
roi=roi,
compression=compression,
)

datatailor = eumdac.DataTailor(eumdac.AccessToken((self.user_key, self.user_secret)))
customisation = datatailor.new_customisation(dataset_id, chain=chain)

# sometimes the customisation fails first time, so we try twice
try:
customisation = datatailor.new_customisation(dataset_id, chain=chain)
except Exception:
log.debug("Did not customisation first time, so trying again after 2 seconds")
time.sleep(2)
customisation = datatailor.new_customisation(dataset_id, chain=chain)

sleep_time = 5 # seconds
log.debug("Customisation: {customisation}", parent="DownloadManager")
log.debug(f"Customisation: {customisation}", parent="DownloadManager")
# Customisation Loop
now = datetime.datetime.now(tz=datetime.timezone.utc)
start = datetime.datetime.now(tz=datetime.timezone.utc)
status = datatailor.get_customisation(customisation._id).status
while status != "DONE":
status = datatailor.get_customisation(customisation._id).status
while (status != "DONE") & (
now - start < datetime.timedelta(minutes=DATA_TAILOR_TIMEOUT_LIMIT_MINUTES)
):

log.debug(
f"Checking if the file has been downloaded. Started at {start}. "
f"Time out is {DATA_TAILOR_TIMEOUT_LIMIT_MINUTES} minutes",
parent="DownloadManager",
)

# Get the status of the ongoing customisation
status = datatailor.get_customisation(customisation._id).status
now = datetime.datetime.now(tz=datetime.timezone.utc)
log.info(f"Status of ID {customisation._id} is {status}", parent="DownloadManager")

if "DONE" == status:
break
elif "ERROR" in status or "KILLED" in status:
log.info("UNSUCCESS, exiting", parent="DownloadManager")
break

time.sleep(sleep_time)

if status != "DONE":
log.info(
f"UNSUCCESS, data tailor service took more that {DATA_TAILOR_TIMEOUT_LIMIT_MINUTES} minutes. "
f"The service may fail later on now",
parent="DownloadManager",
)
else:
log.info("Customisation as been made", parent="DownloadManager")

customisation = datatailor.get_customisation(customisation._id)
(out,) = fnmatch.filter(customisation.outputs, "*")
jobID = customisation._id
log.info(f"Downloading outputs from Data Tailor job {jobID}", parent="DownloadManager")
log.info(
f"Downloading outputs from Data Tailor job {jobID}. This can take ~2 minutes",
parent="DownloadManager",
)

with customisation.stream_output(
out,
Expand All @@ -584,13 +638,22 @@ def create_and_download_datatailor_data(
log.debug(f"Saved file to {filename}", parent="DownloadManager")

# save to native file data store
log.debug(f"Copying file from {filename} to {data_store_filename_remote}", parent="DownloadManager")
log.debug(
f"Copying file from {filename} to {data_store_filename_remote}",
parent="DownloadManager",
)
fs = fsspec.open(data_store_filename_remote).fs
fs.put(filename, data_store_filename_remote)
log.debug(f"Copied file from {filename} to {data_store_filename_remote}", parent="DownloadManager")
log.debug(
f"Copied file from {filename} to {data_store_filename_remote}",
parent="DownloadManager",
)

try:
log.info(f"Deleting job {jobID} from Data Tailor storage", parent="DownloadManager")
log.info(
f"Deleting job {jobID} from Data Tailor storage. This can take ~1 minute",
parent="DownloadManager",
)
customisation.delete()

except Exception as e:
Expand Down
7 changes: 5 additions & 2 deletions satip/scale_to_zero_to_one.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ def fit(self, dataset: xr.Dataset, dims: Iterable = ("time", "y", "x")) -> objec
self.maxs = dataset.max(dims).compute()
self.variable_order = dataset.coords["variable"].values

log.debug("Calculated new min and max values",
mins=self.mins, maxes=self.maxs, variableorder=self.variable_order
log.debug(
"Calculated new min and max values",
mins=self.mins,
maxes=self.maxs,
variableorder=self.variable_order,
)

return self
Expand Down
Loading

0 comments on commit f674537

Please sign in to comment.