Skip to content

Commit

Permalink
refactor: improve test readability (#21)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Kanitz <[email protected]>
  • Loading branch information
athith-g and uniqueg authored Jul 26, 2024
1 parent 164f8ad commit 2d627da
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pylint = "*"
mypy = "*"

[tool.pylint."MESSAGES CONTROL"]
disable = ["logging-fstring-interpolation"]
disable = ["logging-fstring-interpolation", "missing-timeout"]
94 changes: 37 additions & 57 deletions tests/tasks/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,72 @@
"""Tests for I/O and decryption tasks"""

from time import sleep
import json
import requests
from time import sleep

import pytest
import requests

from task_bodies import (
output_dir,
uppercase_task_body,
decryption_task_body,
uppercase_task_with_decryption_body
)
from tests.utils import timeout

TES_URL = "http://localhost:8090/ga4gh/tes/v1"
HEADERS = {"accept": "application/json", "Content-Type": "application/json"}
WAIT_STATUSES = ("UNKNOWN", "INITIALIZING", "RUNNING", "QUEUED")
INPUT_TEXT = "hello world from the input!"
TIME_LIMIT = 60


def create_task(tasks_body):
"""Creates task with the given task body."""
return requests.post(
url=f"{TES_URL}/tasks", headers=HEADERS, json=tasks_body, timeout=TIME_LIMIT
)
def wait_for_file_download(filename):
"""Waits for file with given filename to download."""
while not (output_dir/filename).exists():
sleep(1)


def get_task(task_id):
"""Retrieves list of tasks."""
return requests.get(
url=f"{TES_URL}/tasks/{task_id}", headers=HEADERS, timeout=TIME_LIMIT
@pytest.fixture(name="task")
def fixture_task(request):
"""Returns response received after creating task."""
return requests.post(
url=f"{TES_URL}/tasks", headers=HEADERS, json=request.param
)


def get_task_state(task_id):
"""Retrieves state of task until completion."""
def wait_for_task_completion():
nonlocal task_state
elapsed_seconds = 0
get_response = get_task(task_id)
task_state = json.loads(get_response.text)["state"]
while task_state in WAIT_STATUSES:
if elapsed_seconds >= TIME_LIMIT:
raise requests.Timeout(f"Task did not complete within {TIME_LIMIT} seconds.")
sleep(1)
elapsed_seconds += 1
get_response = get_task(task_id)
task_state = json.loads(get_response.text)["state"]

task_state = ""
wait_for_task_completion()
return task_state


@pytest.fixture(name="post_response")
def fixture_post_response(request):
"""Returns response received after creating task."""
return create_task(request.param)

@pytest.fixture(name="final_task_info")
@timeout(time_limit=60)
def fixture_final_task_info(task):
"""Returns task information after completion."""
assert task.status_code == 200
task_id = json.loads(task.text)["id"]
task_info = None
for _ in range(30):
task_info = requests.get(
url=f"{TES_URL}/tasks/{task_id}", headers=HEADERS
)
task_state = json.loads(task_info.text)["state"]
if task_state not in WAIT_STATUSES:
break
sleep(1)

@pytest.fixture(name="task_state")
def fixture_task_state(post_response):
"""Returns state of task after completion."""
task_id = json.loads(post_response.text)["id"]
return get_task_state(task_id)
return json.loads(task_info.text)


@pytest.mark.parametrize("post_response,filename,expected_output", [
@pytest.mark.parametrize("task,filename,expected_output", [
(uppercase_task_body, "hello-upper.txt", INPUT_TEXT.upper()),
(decryption_task_body, "hello-decrypted.txt", INPUT_TEXT),
(uppercase_task_with_decryption_body, "hello-upper-decrypt.txt", INPUT_TEXT.upper())
], indirect=['post_response'])
def test_task(post_response, task_state, filename, expected_output):
], indirect=["task"])
@timeout(time_limit=10)
def test_task(task, final_task_info, filename, expected_output): # pylint: disable=unused-argument
"""Test tasks for successful completion and intended behavior."""
assert post_response.status_code == 200
assert task_state == "COMPLETE"
assert final_task_info["state"] == "COMPLETE"

elapsed_seconds = 0
while not (output_dir/filename).exists():
if elapsed_seconds == TIME_LIMIT:
raise FileNotFoundError(f"{filename} did not download to {output_dir} "
f"within {TIME_LIMIT} seconds.")
sleep(1)
elapsed_seconds += 1
wait_for_file_download(filename)

with open(output_dir/filename, encoding="utf-8") as f:
output = f.read()
assert output == expected_output
assert len(output) == len(expected_output)
if "upper" in filename:
assert output.isupper()
true_result = output.isupper() if "upper" in filename else not output.isupper()
assert true_result
20 changes: 20 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
""" Utility functions for tests."""
from functools import wraps
import signal


def timeout(time_limit):
"""Decorator that enforces a time limit on a function."""
def handler(signum, frame):
raise TimeoutError(f"Task did not complete within {time_limit} seconds.")

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, handler)
signal.alarm(time_limit)
result = func(*args, **kwargs)
signal.alarm(0)
return result
return wrapper
return decorator

0 comments on commit 2d627da

Please sign in to comment.