Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dhis2): push data values #9

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion openhexa/toolbox/dhis2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def raise_if_error(response: requests.Response):
if response.status_code != 200 and "json" in response.headers["content-type"]:
msg = response.json()
if msg.get("status") == "ERROR":
raise DHIS2Error(f"{msg.get('status')} {msg.get('httpStatusCode')}:" f" {msg.get('message')}")
raise DHIS2Error(f"{msg.get('status')} {msg.get('httpStatusCode')}: {msg.get('message')}")

# raise with requests if no error message provided
response.raise_for_status()
Expand Down Expand Up @@ -111,3 +111,8 @@ def merge_pages(pages: Sequence[requests.Response]) -> dict:
for page in pages:
merged_response[key] += page.json()[key]
return merged_response

def post(self, endpoint: str, json: dict = None, params: dict = None) -> requests.Response:
r = self.session.post(f"{self.url}/{endpoint}", json=json, params=params)
self.raise_if_error(r)
return r
140 changes: 138 additions & 2 deletions openhexa/toolbox/dhis2/dhis2.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ def system_info(self) -> dict:
r = self.client.api.get("system/info")
return r.json()

def identifiable_objects(self, uid: str) -> dict:
"""Get metadata from element UID"""
cache_key = f"identifiableObject_{uid}"
if self.client.cache_dir:
with Cache(self.client.cache_dir) as cache:
if cache_key in cache:
return json.loads(cache.get(cache_key))

r = self.client.api.get(f"identifiableObjects/{uid}")

if self.client.cache_dir:
with Cache(self.client.cache_dir) as cache:
cache.set(cache_key, json.dumps(r.json()))

return r.json()

@use_cache("organisation_unit_levels")
def organisation_unit_levels(self) -> List[dict]:
"""Get names of all organisation unit levels.
Expand Down Expand Up @@ -485,6 +501,7 @@ def __init__(self, client: DHIS2):
self.MAX_DATA_ELEMENTS = 50
self.MAX_ORG_UNITS = 50
self.MAX_PERIODS = 1
self.MAX_POST_DATA_VALUES = 50

def split_params(self, params: dict) -> List[dict]:
"""Split request parameters into chunks.
Expand Down Expand Up @@ -601,13 +618,132 @@ def get(

return response

def _validate(self, data_values: List[dict]):
"""Validate data values based on data element value type.

Supported: NUMBER, INTEGER, UNIT_INTERVAL, PERCENTAGE, INTEGER_POSITIVE,
INTEGER_NEGATIVE, INTEGER_ZERO_OR_POSITIVE, TEXT, LONG_TEXT, LETTER, BOOLEAN.
Not supported: FILE_RESOURCE, COORDINATE, PHONE_NUMBER, EMAIL, TRUE_ONLY, DATE,
DATETIME.
"""
value_types = {}

for dv in data_values:
value = dv.get("value")
de_uid = dv.get("dataElement")

# keep a cache with value type for each data element
if de_uid in value_types:
value_type = value_types[de_uid]
else:
value_type = self.client.meta.identifiable_objects(de_uid).get("valueType")
value_types[de_uid] = value_type

for key in ["dataElement", "orgUnit", "period", "value", "categoryOptionCombo", "attributeOptionCombo"]:
if not dv.get(key):
raise ValueError(f"Missing {key} key in data value")

if value_type == "INTEGER":
if not isinstance(value, int):
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "NUMBER":
if not isinstance(value, int) and not isinstance(value, float):
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "UNIT_INTERVAL":
if not isinstance(value, float) or not (value >= 0 and value <= 1):
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "PERCENTAGE":
if (not isinstance(value, float) and not isinstance(value, int)) or not (value >= 0 and value <= 100):
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "INTEGER_POSITIVE":
if not isinstance(value, int) and value <= 0:
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "INTEGER_NEGATIVE":
if not isinstance(value, int) and value >= 0:
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "INTEGER_ZERO_OR_POSITIVE":
if not isinstance(value, int) and value < 0:
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "TEXT":
if not isinstance(value, str):
raise ValueError(f"Data value {value} is not a valid {value_type}")
elif len(value) > 50000:
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "LONG_TEXT":
if not isinstance(value, str):
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "LETTER":
if not isinstance(value, str):
raise ValueError(f"Data value {value} is not a valid {value_type}")
elif len(value) != 1:
raise ValueError(f"Data value {value} is not a valid {value_type}")

elif value_type == "BOOLEAN":
if not isinstance(value, bool):
raise ValueError(f"Data value {value} is not a valid {value_type}")

else:
raise ValueError(f"Data value type {value_type} not supported")

def post(
self,
data_values: List[dict],
import_strategy: str = "CREATE",
dry_run: bool = True,
):
pass
) -> str:
"""Push data values to a DHIS2 instance using the dataValueSets API endpoint.

Parameters
----------
data_values : list of dict
Data values as a list of dictionnaries with the following keys: dataElement,
period, orgUnit, categoryOptionCombo, attributeOptionCombo, and value.
import_strategy : str, optional (default="CREATE")
CREATE, UPDATE, CREATE_AND_UPDATE, or DELETE
dry_run : bool, optional
Whether to save changes on the server or just return the
import summary

Return
------
dict
Import counts summary
"""
self._validate(data_values)

if import_strategy not in ("UPDATE", "CREATE", "CREATE_AND_UPDATE", "DELETE"):
raise ValueError("Invalid import strategy")

import_counts = {"imported": 0, "updated": 0, "ignored": 0, "deleted": 0}

for chunk in _split_list(data_values, self.MAX_POST_DATA_VALUES):
r = self.client.api.post(
endpoint="dataValueSets",
json={"dataValues": chunk},
params={"dryRun": dry_run, "importStrategy": import_strategy},
)

if "response" in r.json():
summary = r.json()["response"]
else:
summary = r.json()

if summary.get("status") != "SUCCESS":
raise DHIS2Error(summary.get("description"))

for key in ["imported", "updated", "ignored", "deleted"]:
import_counts[key] += summary["importCount"][key]

return import_counts


class Analytics:
Expand Down
Loading