Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests): Add automated test for recording functionality #803

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ src

dist/
build/
openadapt/error.log
114 changes: 114 additions & 0 deletions tests/openadapt/test_recording.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Module for testing the recording module."""

import multiprocessing
import time
import os
import pytest
from openadapt import record, playback, utils, video
from openadapt.config import config
from openadapt.db import crud
from openadapt.models import Recording, ActionEvent
from loguru import logger

RECORD_STARTED_TIMEOUT = 360 # Increased timeout to 6 minutes



def test_record_functionality():
logger.info("Starting test_record_functionality")

# Set up multiprocessing communication
parent_conn, child_conn = multiprocessing.Pipe()

# Set up termination events
terminate_processing = multiprocessing.Event()
terminate_recording = multiprocessing.Event()

# Start the recording process
record_process = multiprocessing.Process(
target=record.record,
args=(
"Test recording",
terminate_processing,
terminate_recording,
child_conn,
False,
),
)

try:
record_process.start()
logger.info("Recording process started")

# Wait for the 'record.started' signal
start_time = time.time()
while time.time() - start_time < RECORD_STARTED_TIMEOUT:
if parent_conn.poll(1): # 1 second timeout for poll
message = parent_conn.recv()
logger.info(f"Received message: {message}")
if message["type"] == "record.started":
logger.info("Received 'record.started' signal")
break
else:
logger.debug("No message received, continuing to wait...")
else:
logger.error("Timed out waiting for 'record.started' signal")
pytest.fail("Timed out waiting for 'record.started' signal")

# Wait a short time to ensure some data is recorded
time.sleep(5)

logger.info("Stopping the recording")
terminate_processing.set() # Signal the recording to stop

# Wait for the recording to stop
logger.info("Waiting for recording to stop")
terminate_recording.wait(timeout=RECORD_STARTED_TIMEOUT)
if not terminate_recording.is_set():
logger.error("Recording did not stop within the expected time")
pytest.fail("Recording did not stop within the expected time")

logger.info("Recording stopped successfully")

# Assert database state
with crud.get_new_session(read_and_write=True) as session:
recording = session.query(Recording).order_by(Recording.id.desc()).first()
assert recording is not None, "No recording was created in the database"
assert recording.task_description == "Test recording"
logger.info("Database assertions passed")

# Assert filesystem state
video_path = video.get_video_file_path(recording.timestamp)
if config.RECORD_VIDEO:
assert os.path.exists(video_path), f"Video file not found at {video_path}"
logger.info(f"Video file found at {video_path}")
else:
logger.info("Video recording is disabled in the configuration")

performance_plot_path = utils.get_performance_plot_file_path(recording.timestamp)
assert os.path.exists(performance_plot_path), f"Performance plot not found at {performance_plot_path}"
logger.info(f"Performance plot found at {performance_plot_path}")

# Assert that at least one action event was recorded
with crud.get_new_session(read_and_write=True) as session:
action_events = crud.get_action_events(session, recording)
assert len(action_events) > 0, "No action events were recorded"
logger.info(f"Number of action events recorded: {len(action_events)}")

logger.info("All assertions passed")

except Exception as e:
logger.exception(f"An error occurred during the test: {e}")
raise

finally:
# Clean up the recording process
if record_process.is_alive():
logger.info("Terminating recording process")
record_process.terminate()
record_process.join()
logger.info("Test completed")


if __name__ == "__main__":
pytest.main([__file__])