From b5024ddc44c183fc752fd4210dde7a9757f7eb54 Mon Sep 17 00:00:00 2001 From: Trey Stafford Date: Tue, 12 Nov 2024 14:28:43 -0700 Subject: [PATCH] WIP begin implementing use of harmony-py Subset orders can now be submitted and the job completes. Some next steps: * Support downloads from harmony * Support non-subset downloads. This might be a little more complicated, as it does not appear that the harmony API can support non-subset orders (it always uses spatial/temporal constraints to do subsetting, not just filtering). So we may need to use `earthaccess` to download our granule list instead. * Remove code supporting variable subsetting. This is not supported by Harmony, but could be in the future. --- icepyx/core/APIformatting.py | 28 ++--- icepyx/core/granules.py | 227 ++++++++++------------------------- icepyx/core/harmony.py | 43 ++++++- icepyx/core/query.py | 35 +++--- icepyx/core/spatial.py | 58 +++++++++ icepyx/core/types/api.py | 9 +- 6 files changed, 201 insertions(+), 199 deletions(-) diff --git a/icepyx/core/APIformatting.py b/icepyx/core/APIformatting.py index dc20c6878..307084057 100644 --- a/icepyx/core/APIformatting.py +++ b/icepyx/core/APIformatting.py @@ -1,13 +1,12 @@ """Generate and format information for submitting to API (CMR and NSIDC).""" import datetime as dt -from typing import Any, Generic, Literal, Optional, TypeVar, Union, overload +from typing import Any, Generic, Literal, Optional, TypeVar, overload from icepyx.core.exceptions import ExhaustiveTypeGuardException, TypeGuardException -from icepyx.core.types import ( +from icepyx.core.harmony import HarmonyTemporal +from icepyx.core.types.api import ( CMRParams, - EGIParamsSubset, - EGIRequiredParams, ) # ---------------------------------------------------------------------- @@ -38,18 +37,17 @@ def _fmt_temporal(start, end, key): assert isinstance(start, dt.datetime) assert isinstance(end, dt.datetime) - if key == "temporal": + if key == "temporal": # search option. fmt_timerange = ( start.strftime("%Y-%m-%dT%H:%M:%SZ") + "," + end.strftime("%Y-%m-%dT%H:%M:%SZ") ) - elif key == "time": - fmt_timerange = ( - start.strftime("%Y-%m-%dT%H:%M:%S") - + "," - + end.strftime("%Y-%m-%dT%H:%M:%S") - ) + elif key == "time": # subsetting option. + # Format for harmony. This will do subsetting. + # TODO: change `key` to something more clear. `temporal` is the key + # passed into Harmony, so this is very confusing! + fmt_timerange: HarmonyTemporal = {"start": start, "stop": end} else: raise ValueError("An invalid time key was submitted for formatting.") @@ -212,20 +210,22 @@ def __get__( self, instance: 'Parameters[Literal["required"]]', owner: Any, - ) -> EGIRequiredParams: ... + ): # -> EGIRequiredParams: ... + ... @overload def __get__( self, instance: 'Parameters[Literal["subset"]]', owner: Any, - ) -> EGIParamsSubset: ... + ): # -> EGIParamsSubset: ... + ... def __get__( self, instance: "Parameters", owner: Any, - ) -> Union[CMRParams, EGIRequiredParams, EGIParamsSubset]: + ) -> CMRParams: """ Returns the dictionary of formatted keys associated with the parameter object. diff --git a/icepyx/core/granules.py b/icepyx/core/granules.py index d6a519048..94d842fef 100644 --- a/icepyx/core/granules.py +++ b/icepyx/core/granules.py @@ -8,24 +8,24 @@ import re import time from typing import Union -from xml.etree import ElementTree as ET import zipfile import numpy as np import requests -from requests.compat import unquote import icepyx.core.APIformatting as apifmt from icepyx.core.auth import EarthdataAuthMixin -from icepyx.core.cmr import CMR_PROVIDER +from icepyx.core.cmr import CMR_PROVIDER, get_concept_id import icepyx.core.exceptions -from icepyx.core.types import ( +from icepyx.core.harmony import HarmonyApi +from icepyx.core.types.api import ( CMRParams, - EGIRequiredParamsDownload, - EGIRequiredParamsSearch, ) -from icepyx.core.urls import DOWNLOAD_BASE_URL, GRANULE_SEARCH_BASE_URL, ORDER_BASE_URL -from icepyx.uat import EDL_ACCESS_TOKEN +from icepyx.core.urls import DOWNLOAD_BASE_URL, GRANULE_SEARCH_BASE_URL + +# TODO: mix this into existing classes rather than declaring as a global +# variable. +HARMONY_API = HarmonyApi() def info(grans: list[dict]) -> dict[str, Union[int, float]]: @@ -191,7 +191,6 @@ def __init__( def get_avail( self, CMRparams: CMRParams, - reqparams: EGIRequiredParamsSearch, cloud: bool = False, ): """ @@ -222,9 +221,7 @@ def get_avail( query.Query.avail_granules """ - assert ( - CMRparams is not None and reqparams is not None - ), "Missing required input parameter dictionaries" + assert CMRparams is not None, "Missing required input parameter dictionary" # if not hasattr(self, 'avail'): self.avail = [] @@ -232,14 +229,12 @@ def get_avail( headers = { "Accept": "application/json", "Client-Id": "icepyx", - "Authorization": f"Bearer {EDL_ACCESS_TOKEN}", } # note we should also check for errors whenever we ping NSIDC-API - # make a function to check for errors params = apifmt.combine_params( CMRparams, - {k: reqparams[k] for k in ["short_name", "version", "page_size"]}, {"provider": CMR_PROVIDER}, ) @@ -292,7 +287,7 @@ def get_avail( def place_order( self, CMRparams: CMRParams, - reqparams: EGIRequiredParamsDownload, + reqparams, # : EGIRequiredParamsDownload, subsetparams, verbose, subset=True, @@ -337,10 +332,13 @@ def place_order( -------- query.Query.order_granules """ - raise icepyx.core.exceptions.RefactoringException + # raise icepyx.core.exceptions.RefactoringException self.get_avail(CMRparams, reqparams) + # TODO: the harmony API may not support non-subsetting. So we may need + # to provide a list of granules for harmony to download, or use a + # different API. if subset is False: request_params = apifmt.combine_params( CMRparams, reqparams, {"agent": "NO"} @@ -348,158 +346,63 @@ def place_order( else: request_params = apifmt.combine_params(CMRparams, reqparams, subsetparams) - order_fn = ".order_restart" + concept_id = get_concept_id( + product=request_params["short_name"], version=request_params["version"] + ) - total_pages = int(np.ceil(len(self.avail) / reqparams["page_size"])) - print( - "Total number of data order requests is ", - total_pages, - " for ", - len(self.avail), - " granules.", + # TODO: At this point, the request parameters have been formatted into + # strings. `harmony-py` expects python objects (e.g., `dt.datetime` for + # temporal values) + + # Place the order. + # TODO: there are probably other options we want to more generically + # expose here. E.g., instead of just accepting a `bounding_box` of a + # particular flavor, we want to be able to pass in a polygon? + job_id = HARMONY_API.place_order( + concept_id=concept_id, + # TODO: why the double-nested bbox dict here? + bounding_box=subsetparams["bbox"]["bbox"], + temporal=subsetparams["time"], ) - if reqparams["page_num"] > 0: - pagenums = [reqparams["page_num"]] - else: - pagenums = range(1, total_pages + 1) + # TODO/Question: should this be changed from a list to a single value? + # There will only be one harmony job per request (I think) + self.orderIDs = [job_id] + order_fn = ".order_restart" + with open(order_fn, "w") as fid: + json.dump({"orderIDs": self.orderIDs}, fid) - for page_num in pagenums: + print("order ID: ", job_id) + status = HARMONY_API.check_order_status(job_id) + print("Initial status of your harmony order request: ", status["status"]) + # TODO: confirm which status responses we might expect. "running", + # "paused", or "canceled" are documented here: + # https://harmony.earthdata.nasa.gov/docs#getting-job-status + # I have also seen `running` and `running_with_errors`. + while status["status"].startswith("running"): print( - "Data request ", - page_num, - " of ", - total_pages, - " is submitting to NSIDC", + "Your harmony order status is still ", + status["status"], + ". Please continue waiting... this may take a few moments.", ) - breakpoint() - request_params.update({"page_num": page_num}) - - request = self.session.get(ORDER_BASE_URL, params=request_params) - - # DevGoal: use the request response/number to do some error handling/ - # give the user better messaging for failures - # print(request.content) - # root = ET.fromstring(request.content) - # print([subset_agent.attrib for subset_agent in root.iter('SubsetAgent')]) - - if verbose is True: - print("Request HTTP response: ", request.status_code) - # print('Order request URL: ', request.url) - - # Raise bad request: Loop will stop for bad response code. - request.raise_for_status() - esir_root = ET.fromstring(request.content) - if verbose is True: - print("Order request URL: ", unquote(request.url)) - print( - "Order request response XML content: ", - request.content.decode("utf-8"), - ) - - # Look up order ID - orderlist = [] - for order in esir_root.findall("./order/"): - # if verbose is True: - # print(order) - orderlist.append(order.text) - orderID = orderlist[0] - print("order ID: ", orderID) - - # Create status URL - statusURL = f"{ORDER_BASE_URL}/{orderID}" - if verbose is True: - print("status URL: ", statusURL) - - # Find order status - request_response = self.session.get(statusURL) - if verbose is True: - print( - "HTTP response from order response URL: ", - request_response.status_code, - ) - - # Raise bad request: Loop will stop for bad response code. - request_response.raise_for_status() - request_root = ET.fromstring(request_response.content) - statuslist = [] - for status in request_root.findall("./requestStatus/"): - statuslist.append(status.text) - status = statuslist[0] - print("Initial status of your order request at NSIDC is: ", status) - - loop_root = None - # If status is already finished without going into pending/processing - if status.startswith("complete"): - loop_response = self.session.get(statusURL) - loop_root = ET.fromstring(loop_response.content) - - # Continue loop while request is still processing - while status == "pending" or status == "processing": - print( - "Your order status is still ", - status, - " at NSIDC. Please continue waiting... this may take a few moments.", - ) - # print('Status is not complete. Trying again') - time.sleep(10) - loop_response = self.session.get(statusURL) - - # Raise bad request: Loop will stop for bad response code. - loop_response.raise_for_status() - loop_root = ET.fromstring(loop_response.content) - - # find status - statuslist = [] - for status in loop_root.findall("./requestStatus/"): - statuslist.append(status.text) - status = statuslist[0] - # print('Retry request status is: ', status) - if status == "pending" or status == "processing": - continue - - if not isinstance(loop_root, ET.Element): - # The typechecker needs help knowing that at this point loop_root is - # set, as it can't tell that the conditionals above are supposed to be - # exhaustive. - raise icepyx.core.exceptions.ExhaustiveTypeGuardException - - # Order can either complete, complete_with_errors, or fail: - # Provide complete_with_errors error message: - if status == "complete_with_errors" or status == "failed": - messagelist = [] - for message in loop_root.findall("./processInfo/"): - messagelist.append(message.text) - print("Your order is: ", status) - print("NSIDC provided these error messages:") - pprint.pprint(messagelist) - - if status == "complete" or status == "complete_with_errors": - print("Your order is:", status) - messagelist = [] - for message in loop_root.findall("./processInfo/info"): - messagelist.append(message.text) - if messagelist != []: - print("NSIDC returned these messages") - pprint.pprint(messagelist) - if not hasattr(self, "orderIDs"): - self.orderIDs = [] - - self.orderIDs.append(orderID) - else: - print("Request failed.") - - # DevGoal: save orderIDs more frequently than just at the end for large orders - # (e.g. for len(reqparams['page_num']) > 5 or 10 or something) - # Save orderIDs to file to avoid resubmitting order in case kernel breaks down. - # save orderIDs for every 5 orders when more than 10 orders are submitted. - if reqparams["page_num"] >= 10: - with open(order_fn, "w") as fid: - json.dump({"orderIDs": self.orderIDs}, fid) - - # --- Output the final orderIDs - with open(order_fn, "w") as fid: - json.dump({"orderIDs": self.orderIDs}, fid) + # Requesting the status too often can result in a 500 error. + time.sleep(5) + status = HARMONY_API.check_order_status(job_id) + + if status["status"] == "complete_with_errors" or status["status"] == "failed": + print("Your order is: ", status["status"]) + print("Harmony provided these error messages:") + pprint.pprint(status["errors"]) + + # TODO: consider always printing the status message. There's no need for + # this check, and the message is relevant regardless of if there are + # errors or not. We could check for a failure status instead. + if status == "complete" or status == "complete_with_errors": + print("Your order is:", status["status"]) + print("Harmony returned this message:") + pprint.pprint(status["message"]) + else: + print(f"Request failed with status {status['status']}.") return self.orderIDs diff --git a/icepyx/core/harmony.py b/icepyx/core/harmony.py index 33274a5ac..270083c7e 100644 --- a/icepyx/core/harmony.py +++ b/icepyx/core/harmony.py @@ -1,10 +1,16 @@ -from typing import Any +import datetime as dt +from typing import Any, NotRequired, TypedDict, Union import harmony from icepyx.core.auth import EarthdataAuthMixin +class HarmonyTemporal(TypedDict): + start: NotRequired[dt.datetime] + stop: NotRequired[dt.datetime] + + class HarmonyApi(EarthdataAuthMixin): def __init__(self): # initialize authentication properties @@ -21,3 +27,38 @@ def get_capabilities(self, concept_id: str) -> dict[str, Any]: response = self.harmony_client.submit(capabilities_request) return response + + def place_order( + self, + concept_id: str, + # These are optional subset parameters + bounding_box: Union[harmony.BBox, None] = None, + temporal: Union[HarmonyTemporal, None] = None, + ) -> str: + """Places a Harmony order with the given parameters. + + Return a string representing a job ID. + + TODO/Question: it looks like this code will always use the provided + parameters to do subsetting. Are there cases where we just want the data + downloaded as whole granules? If so, we may need to use another API to + do so? + """ + collection = harmony.Collection(id=concept_id) + request = harmony.Request( + collection=collection, + spatial=bounding_box, + temporal=temporal, + ) + + if not request.is_valid(): + # TODO: consider more specific error class & message + raise RuntimeError("Failed to create valid request") + + job_id = self.harmony_client.submit(request) + + return job_id + + def check_order_status(self, job_id: str): + status = self.harmony_client.status(job_id) + return status diff --git a/icepyx/core/query.py b/icepyx/core/query.py index 573ca8b1b..5f93b0375 100644 --- a/icepyx/core/query.py +++ b/icepyx/core/query.py @@ -1,7 +1,7 @@ import datetime as dt from functools import cached_property import pprint -from typing import Optional, Union, cast +from typing import Optional, Union import geopandas as gpd import holoviews as hv @@ -17,11 +17,8 @@ import icepyx.core.is2ref as is2ref import icepyx.core.spatial as spat import icepyx.core.temporal as tp -from icepyx.core.types import ( +from icepyx.core.types.api import ( CMRParams, - EGIParamsSubset, - EGIRequiredParams, - EGIRequiredParamsDownload, ) import icepyx.core.validate_inputs as val from icepyx.core.variables import Variables as Variables @@ -597,7 +594,7 @@ def CMRparams(self) -> CMRParams: return self._CMRparams.fmted_keys @property - def reqparams(self) -> EGIRequiredParams: + def reqparams(self): # -> EGIRequiredParams: """ Display the required key:value pairs that will be submitted. It generates the dictionary if it does not already exist. @@ -613,8 +610,6 @@ def reqparams(self) -> EGIRequiredParams: >>> reg_a.reqparams # doctest: +SKIP {'short_name': 'ATL06', 'version': '006', 'page_size': 2000, 'page_num': 1, 'request_mode': 'async', 'include_meta': 'Y', 'client_string': 'icepyx'} """ - raise RefactoringException - if not hasattr(self, "_reqparams"): self._reqparams = apifmt.Parameters("required", reqtype="search") self._reqparams.build_params(product=self.product, version=self._version) @@ -624,7 +619,7 @@ def reqparams(self) -> EGIRequiredParams: # @property # DevQuestion: if I make this a property, I get a "dict" object is not callable # when I try to give input kwargs... what approach should I be taking? - def subsetparams(self, **kwargs) -> Union[EGIParamsSubset, dict[Never, Never]]: + def subsetparams(self, **kwargs): # -> Union[EGIParamsSubset, dict[Never, Never]]: """ Display the subsetting key:value pairs that will be submitted. It generates the dictionary if it does not already exist @@ -650,7 +645,7 @@ def subsetparams(self, **kwargs) -> Union[EGIParamsSubset, dict[Never, Never]]: {'time': '2019-02-20T00:00:00,2019-02-28T23:59:59', 'bbox': '-55.0,68.0,-48.0,71.0'} """ - raise RefactoringException + # raise RefactoringException if not hasattr(self, "_subsetparams"): self._subsetparams = apifmt.Parameters("subset") @@ -665,6 +660,7 @@ def subsetparams(self, **kwargs) -> Union[EGIParamsSubset, dict[Never, Never]]: else: # If the user has supplied a subset list of variables, append the # icepyx required variables to the Coverage dict + # TODO: this is not supported in Harmony. if "Coverage" in kwargs: var_list = [ "orbit_info/sc_orient", @@ -690,13 +686,13 @@ def subsetparams(self, **kwargs) -> Union[EGIParamsSubset, dict[Never, Never]]: self._subsetparams.build_params( geom_filepath=self._spatial._geom_file, extent_type=self._spatial._ext_type, - spatial_extent=self._spatial.fmt_for_EGI(), + spatial_extent=self._spatial.fmt_for_harmony(), **kwargs, ) else: self._subsetparams.build_params( extent_type=self._spatial._ext_type, - spatial_extent=self._spatial.fmt_for_EGI(), + spatial_extent=self._spatial.fmt_for_harmony(), **kwargs, ) @@ -1024,12 +1020,13 @@ def order_granules( . Retry request status is: complete """ - breakpoint() - raise RefactoringException - - if not hasattr(self, "reqparams"): - self.reqparams + # breakpoint() + # raise RefactoringException + # TODO: this probably shouldn't be mutated based on which method is being called... + # It is also very confusing to have both `self.reqparams` and + # `self._reqparams`, each of which does something different! + self.reqparams if self._reqparams._reqtype == "search": self._reqparams._reqtype = "download" @@ -1065,7 +1062,7 @@ def order_granules( tempCMRparams["readable_granule_name[]"] = gran self.granules.place_order( tempCMRparams, - cast(EGIRequiredParamsDownload, self.reqparams), + self.reqparams, self.subsetparams(**kwargs), verbose, subset, @@ -1075,7 +1072,7 @@ def order_granules( else: self.granules.place_order( self.CMRparams, - cast(EGIRequiredParamsDownload, self.reqparams), + self.reqparams, self.subsetparams(**kwargs), verbose, subset, diff --git a/icepyx/core/spatial.py b/icepyx/core/spatial.py index fef61846f..b282b559b 100644 --- a/icepyx/core/spatial.py +++ b/icepyx/core/spatial.py @@ -4,6 +4,7 @@ import warnings import geopandas as gpd +import harmony import numpy as np from numpy.typing import NDArray from shapely.geometry import Polygon, box @@ -821,3 +822,60 @@ def fmt_for_EGI(self) -> str: else: raise icepyx.core.exceptions.ExhaustiveTypeGuardException + + def fmt_for_harmony(self) -> dict[str, harmony.BBox]: + """ + Format the spatial extent input into format expected by `harmony-py`. + + Returns a dictionary with keys mapping to `harmony.Request` kwargs, with + values appropriately formated for the harmony request. + + `harmony-py` can take two different spatial parameters: + + * `spatial`: "Bounding box spatial constraints on the data or Well Known + Text (WKT) string describing the spatial constraints." The "Bounding + box" is expected to be a `harmony.BBox`. + * `shape`: "a file path to an ESRI Shapefile zip, GeoJSON file, or KML + file to use for spatial subsetting. Note: not all collections support + shapefile subsetting" + + Question: is `spatial` the same as `shape`, in terms of performance? If + so, we could be consistent and always turn the input into geojson and + pass that along to harmony. Otherwise we should choose `spatial` if the + extent_type is bounding, otherwise `shape`. + Answer: No! They're not the same. They map to different harmony + parameters and each is a different service. E.g., some collections may + have bounding box subsetting while others have shape subsetting (or + both). + TODO: think more about how we verify if certain inputs are valid for + harmony. E.g., do we need to check the capabilities of each and + cross-check that with user inputs to determine which action to take? + Also: Does `icepyx` always perform subsetting based on user input? If + not, how do we determine which parameters are for finding granules vs + performing subetting? + + Question: is there any way to pass in a geojson string directly, so that + we do not have to mock out a file just for harmony? Answer: no, not + direcly. `harmony-py` wants a path to a file on disk. We may want to + have the function that submits the request to harmony with `harmony-py` + accept something that's easily-serializable to a geojson file so that it + can manage the lifespan of the file. It would be best (I think) to avoid + writing tmp files to disk in this function, because it doesn't know when + the request gets made/when to cleanup the file. That means that we may + leave stray files on the user's computer. Ideally, we would be able to + pass `harmony-py` a bytes object (or a shapely Polygon!) + """ + # Begin with bounding box because this is the simplest case. + if self.extent_type == "bounding_box": + harmony_kwargs = { + "bbox": harmony.BBox( + w=self.extent[0], + s=self.extent[1], + e=self.extent[2], + n=self.extent[3], + ) + } + + return harmony_kwargs + else: + raise NotImplementedError diff --git a/icepyx/core/types/api.py b/icepyx/core/types/api.py index b29ba8fb4..9af9bb9d6 100644 --- a/icepyx/core/types/api.py +++ b/icepyx/core/types/api.py @@ -1,11 +1,14 @@ -from typing import Literal, TypedDict, Union +from typing import TypedDict, Union -from typing_extensions import NotRequired from pydantic import BaseModel +from typing_extensions import NotRequired CMRParamsBase = TypedDict( "CMRParamsBase", { + "short_name": str, + "version": str, + "page_size": int, "temporal": NotRequired[str], "options[readable_granule_name][pattern]": NotRequired[str], "options[spatial][or]": NotRequired[str], @@ -25,4 +28,4 @@ class CMRParamsWithPolygon(CMRParamsBase): CMRParams = Union[CMRParamsWithBbox, CMRParamsWithPolygon] -class HarmonyCoverageAPIParamsBase(BaseModel): +class HarmonyCoverageAPIParamsBase(BaseModel): ...