diff --git a/openhexa/toolbox/dhis2/api.py b/openhexa/toolbox/dhis2/api.py index daf7975..1e569da 100644 --- a/openhexa/toolbox/dhis2/api.py +++ b/openhexa/toolbox/dhis2/api.py @@ -52,7 +52,7 @@ def raise_if_error(response: requests.Response): if response.status_code != 200 and "json" in response.headers["content-type"]: msg = response.json() if msg.get("status") == "ERROR": - raise DHIS2Error(f"{msg.get('status')} {msg.get('httpStatusCode')}:" f" {msg.get('message')}") + raise DHIS2Error(f"{msg.get('status')} {msg.get('httpStatusCode')}: {msg.get('message')}") # raise with requests if no error message provided response.raise_for_status() @@ -111,3 +111,8 @@ def merge_pages(pages: Sequence[requests.Response]) -> dict: for page in pages: merged_response[key] += page.json()[key] return merged_response + + def post(self, endpoint: str, json: dict = None, params: dict = None) -> requests.Response: + r = self.session.post(f"{self.url}/{endpoint}", json=json, params=params) + self.raise_if_error(r) + return r diff --git a/openhexa/toolbox/dhis2/dhis2.py b/openhexa/toolbox/dhis2/dhis2.py index c9b96d3..957580e 100644 --- a/openhexa/toolbox/dhis2/dhis2.py +++ b/openhexa/toolbox/dhis2/dhis2.py @@ -81,6 +81,22 @@ def system_info(self) -> dict: r = self.client.api.get("system/info") return r.json() + def identifiable_objects(self, uid: str) -> dict: + """Get metadata from element UID""" + cache_key = f"identifiableObject_{uid}" + if self.client.cache_dir: + with Cache(self.client.cache_dir) as cache: + if cache_key in cache: + return json.loads(cache.get(cache_key)) + + r = self.client.api.get(f"identifiableObjects/{uid}") + + if self.client.cache_dir: + with Cache(self.client.cache_dir) as cache: + cache.set(cache_key, json.dumps(r.json())) + + return r.json() + @use_cache("organisation_unit_levels") def organisation_unit_levels(self) -> List[dict]: """Get names of all organisation unit levels. @@ -485,6 +501,7 @@ def __init__(self, client: DHIS2): self.MAX_DATA_ELEMENTS = 50 self.MAX_ORG_UNITS = 50 self.MAX_PERIODS = 1 + self.MAX_POST_DATA_VALUES = 50 def split_params(self, params: dict) -> List[dict]: """Split request parameters into chunks. @@ -601,13 +618,132 @@ def get( return response + def _validate(self, data_values: List[dict]): + """Validate data values based on data element value type. + + Supported: NUMBER, INTEGER, UNIT_INTERVAL, PERCENTAGE, INTEGER_POSITIVE, + INTEGER_NEGATIVE, INTEGER_ZERO_OR_POSITIVE, TEXT, LONG_TEXT, LETTER, BOOLEAN. + Not supported: FILE_RESOURCE, COORDINATE, PHONE_NUMBER, EMAIL, TRUE_ONLY, DATE, + DATETIME. + """ + value_types = {} + + for dv in data_values: + value = dv.get("value") + de_uid = dv.get("dataElement") + + # keep a cache with value type for each data element + if de_uid in value_types: + value_type = value_types[de_uid] + else: + value_type = self.client.meta.identifiable_objects(de_uid).get("valueType") + value_types[de_uid] = value_type + + for key in ["dataElement", "orgUnit", "period", "value", "categoryOptionCombo", "attributeOptionCombo"]: + if not dv.get(key): + raise ValueError(f"Missing {key} key in data value") + + if value_type == "INTEGER": + if not isinstance(value, int): + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "NUMBER": + if not isinstance(value, int) and not isinstance(value, float): + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "UNIT_INTERVAL": + if not isinstance(value, float) or not (value >= 0 and value <= 1): + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "PERCENTAGE": + if (not isinstance(value, float) and not isinstance(value, int)) or not (value >= 0 and value <= 100): + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "INTEGER_POSITIVE": + if not isinstance(value, int) and value <= 0: + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "INTEGER_NEGATIVE": + if not isinstance(value, int) and value >= 0: + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "INTEGER_ZERO_OR_POSITIVE": + if not isinstance(value, int) and value < 0: + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "TEXT": + if not isinstance(value, str): + raise ValueError(f"Data value {value} is not a valid {value_type}") + elif len(value) > 50000: + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "LONG_TEXT": + if not isinstance(value, str): + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "LETTER": + if not isinstance(value, str): + raise ValueError(f"Data value {value} is not a valid {value_type}") + elif len(value) != 1: + raise ValueError(f"Data value {value} is not a valid {value_type}") + + elif value_type == "BOOLEAN": + if not isinstance(value, bool): + raise ValueError(f"Data value {value} is not a valid {value_type}") + + else: + raise ValueError(f"Data value type {value_type} not supported") + def post( self, data_values: List[dict], import_strategy: str = "CREATE", dry_run: bool = True, - ): - pass + ) -> str: + """Push data values to a DHIS2 instance using the dataValueSets API endpoint. + + Parameters + ---------- + data_values : list of dict + Data values as a list of dictionnaries with the following keys: dataElement, + period, orgUnit, categoryOptionCombo, attributeOptionCombo, and value. + import_strategy : str, optional (default="CREATE") + CREATE, UPDATE, CREATE_AND_UPDATE, or DELETE + dry_run : bool, optional + Whether to save changes on the server or just return the + import summary + + Return + ------ + dict + Import counts summary + """ + self._validate(data_values) + + if import_strategy not in ("UPDATE", "CREATE", "CREATE_AND_UPDATE", "DELETE"): + raise ValueError("Invalid import strategy") + + import_counts = {"imported": 0, "updated": 0, "ignored": 0, "deleted": 0} + + for chunk in _split_list(data_values, self.MAX_POST_DATA_VALUES): + r = self.client.api.post( + endpoint="dataValueSets", + json={"dataValues": chunk}, + params={"dryRun": dry_run, "importStrategy": import_strategy}, + ) + + if "response" in r.json(): + summary = r.json()["response"] + else: + summary = r.json() + + if summary.get("status") != "SUCCESS": + raise DHIS2Error(summary.get("description")) + + for key in ["imported", "updated", "ignored", "deleted"]: + import_counts[key] += summary["importCount"][key] + + return import_counts class Analytics: