diff --git a/app/config.py b/app/config.py index bd6e655..1e847b4 100644 --- a/app/config.py +++ b/app/config.py @@ -5,6 +5,26 @@ BASEROW_PW = os.environ.get("BASEROW_PW") DATABASES = { + "test": + { + "db_name": "Test DB", + "db_id": "538", + "endpoints": { + "persons": { + "table_id": "3284", + "search_field_name": "name", + "search_field_id": "29742", + "id_field_name": "project_id" + }, + "places": { + "table_id": "3285", + "search_field_name": "name", + "search_field_id": "29745", + "id_field_name": "project_id" + } + }, + "zotero": "https://api.zotero.org/groups/5456180/items" + }, "emt": { "db_name": "EMT", diff --git a/app/main.py b/app/main.py index e8b025c..3f1a1cc 100644 --- a/app/main.py +++ b/app/main.py @@ -1,3 +1,4 @@ +import requests from typing import Union from fastapi import FastAPI, HTTPException, Request from fastapi_cache import FastAPICache @@ -7,6 +8,8 @@ from acdh_baserow_pyutils import BaseRowClient from app.config import DATABASES, BASEROW_PW, BASEROW_USER, BASEROW_URL from app.utils import make_ac_uris, populate_baserow_response +from app.zotero_utils import populate_zotero_response + app = FastAPI() @@ -20,11 +23,21 @@ async def root(request: Request): "endpoint": f"{request.url._url}db/{db_key}/tables", "autocompletes": [ make_ac_uris(db_key, key, request.url._url) - for key, value in x["endpoints"].items() + for key, _ in x["endpoints"].items() ], } for db_key, x in DATABASES.items() ] + for x in endpoints: + db_key = x["db_id"] + print(db_key) + try: + DATABASES[db_key]["zotero"] + print("###################") + except KeyError: + continue + zotero_ep = make_ac_uris(db_key, "zotero", request.url._url) + x["autocompletes"].append(zotero_ep) return { "message": "A baserow autocomplete service", @@ -42,6 +55,13 @@ async def query_endpoint( except KeyError: detail_msg = f"no baserow database with ID: <{db_id}> defined in config.py" raise HTTPException(status_code=404, detail=detail_msg) + if endpoint == "zotero": + zotero_api = cur_db["zotero"] + url = f"{zotero_api}?q={q}" + r = requests.get(url) + data = r.json() + result = populate_zotero_response(data, format=format) + return result cur_conf = cur_db["endpoints"][endpoint] br_table_id = cur_conf["table_id"] query_field_id = cur_conf["search_field_id"] diff --git a/app/test_main.py b/app/test_main.py index f45e1ed..7ac7f06 100644 --- a/app/test_main.py +++ b/app/test_main.py @@ -20,5 +20,40 @@ def test_read_main(): def test_correct_endpoint_number(): with TestClient(app) as client: response = client.get("/") - endpoints = response.json()['endpoints'] + endpoints = response.json()["endpoints"] assert len(endpoints) == len(DATABASES) + + +def test_ac_endpoint(): + db_id = "test" + endpoints = [key for key, _ in DATABASES[db_id]["endpoints"].items()] + for format in ["original", "teicompleter", "select2"]: + for x in endpoints: + with TestClient(app) as client: + response = client.get(f"/ac/{db_id}/{x}") + assert response.status_code == 422 + response = client.get(f"/ac/{db_id}/{x}?q=A&format={format}") + assert response.status_code == 200 + + +def test_nonexiting_ac_endpoint(): + with TestClient(app) as client: + response = client.get("/ac/quatsch/blödsinn?q=A") + assert response.status_code == 404 + + +def test_list_tables_endpoint(): + db_id = "test" + with TestClient(app) as client: + response = client.get(f"/db/{db_id}/tables") + assert response.status_code == 200 + with TestClient(app) as client: + response = client.get("/db/blödsinn/tables") + assert response.status_code == 404 + + +def test_zotero_ep(): + for format in ["original", "teicompleter", "select2"]: + with TestClient(app) as client: + response = client.get(f"/ac/test/zotero?q=Digital&format={format}") + assert response.status_code == 200 diff --git a/app/zotero_utils.py b/app/zotero_utils.py new file mode 100644 index 0000000..1790214 --- /dev/null +++ b/app/zotero_utils.py @@ -0,0 +1,42 @@ +def zotero_description(item_data: dict) -> dict: + item_title = item_data.get('title', 'no title provided') + item_place = item_data.get('place', 'no place provided') + item_date = item_data.get('date', 'no date provided') + item_id = f"#zotero_{item_data['key']}" + item_description = f"{item_title}, {item_place}, {item_date}" + return { + "id": item_id, + "value": item_description + } + + +def populate_zotero_response(data: list, format: str = "teicompleter") -> dict: + if format == 'select2': + result = { + "results": [], + } + for x in data: + item_data = zotero_description(x['data']) + item = { + "id": item_data['id'], + "text": item_data['value'] + } + result['results'].append(item) + return result + + elif format == 'original': + return { + "result": data + } + else: + result = { + "tc:suggestion": [] + } + for x in data: + item_data = zotero_description(x['data']) + item = { + "tc:value": item_data['id'], + "tc:description": item_data['value'] + } + result['tc:suggestion'].append(item) + return result diff --git a/run_tests.sh b/run_tests.sh index 15d2ca2..f3871c3 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,6 +1,7 @@ #/bin/bash -coverage run -m pytest -v +echo "Be aware that we are ignoring warnings, this might come back to you one day!" +coverage run -m pytest -v -p no:warnings coverage report coverage html xdg-open ./htmlcov/index.html \ No newline at end of file