From d74854e27985ab3bb9b3e1b576fbc68b387c2e9a Mon Sep 17 00:00:00 2001 From: mgonnav Date: Wed, 23 Oct 2024 11:10:28 -0500 Subject: [PATCH 1/2] docs: document automatic celery tasks --- estela-api/config/celery.py | 19 ++++++- estela-api/core/tasks.py | 107 ++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/estela-api/config/celery.py b/estela-api/config/celery.py index ebed3e52..23f42a23 100644 --- a/estela-api/config/celery.py +++ b/estela-api/config/celery.py @@ -3,30 +3,47 @@ from celery import Celery from django.conf import settings +# Set the default Django settings module for the 'celery' program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.base") +# Initialize a new Celery application instance named "estela". app = Celery("estela") + +# Configure the Celery app to use Django settings with a specific namespace "CELERY". +# All Celery-related configurations are expected to be prefixed with "CELERY_" in Django settings. app.config_from_object("django.conf:settings", namespace="CELERY") +# Autodiscover and load tasks from all installed Django apps. +# Celery will search for a `tasks.py` module in each Django app and register tasks found in them. app.autodiscover_tasks() +# These tasks will be executed at regular intervals as specified by the "schedule" argument. app.conf.beat_schedule = { + # Task to run spider that are IN_QUEUE jobs every 2 minutes. "run-spider-jobs": { "task": "core.tasks.run_spider_jobs", "schedule": 120, }, + # Task to check and update job status errors every minute. "check-and-update-job-status-errors": { "task": "core.tasks.check_and_update_job_status_errors", "schedule": 60, }, + # Task to delete expired job data every hour. "delete-expired-jobs-data": { "task": "core.tasks.delete_expired_jobs_data", "schedule": 3600, }, } - +# Dynamically import and update the beat schedule with periodic tasks from external applications. +# External apps are specified in the Django settings under CELERY_EXTERNAL_IMPORTS. +# Each external app must define its own Celery configuration and beat schedule. for import_name in settings.CELERY_EXTERNAL_IMPORTS: + # Import the Celery app configuration from the external application. module = __import__(f"{import_name}.celery", fromlist=["celery"]) external_app = module.app + + # Merge the external app's beat schedule with the current Celery app's beat schedule. + # This allows tasks from external apps to be included in the periodic task schedule. app.conf.beat_schedule.update(external_app.conf.beat_schedule) diff --git a/estela-api/core/tasks.py b/estela-api/core/tasks.py index 3d3577dc..3405d741 100644 --- a/estela-api/core/tasks.py +++ b/estela-api/core/tasks.py @@ -28,6 +28,16 @@ def get_default_token(job): + """ + Retrieves or creates an authentication token for the first user + associated with the given job's spider project. + + Args: + job (SpiderJob): The job for which to get the user's token. + + Returns: + str: The token key for the first user, or None if no user is found. + """ user = job.spider.project.users.first() if not user: return None @@ -37,6 +47,11 @@ def get_default_token(job): @celery_app.task def run_spider_jobs(): + """ + Task to run spider jobs that are in the IN_QUEUE_STATUS. + Selects jobs from the queue and updates their status to WAITING_STATUS. + Initializes job execution using the job manager. + """ jobs = SpiderJob.objects.filter(status=SpiderJob.IN_QUEUE_STATUS)[ : settings.RUN_JOBS_PER_LOT ] @@ -59,6 +74,18 @@ def run_spider_jobs(): def delete_data(pid, sid, jid, data_type): + """ + Deletes specific data (items, requests, logs) associated with a job. + + Args: + pid (str): Project ID. + sid (str): Spider ID. + jid (str): Job ID. + data_type (str): The type of data to delete ('items', 'requests', 'logs'). + + Returns: + bool: False if connection to spiderdata_db_client cannot be established. + """ if not spiderdata_db_client.get_connection(): return False job = SpiderJob.objects.get(jid=jid) @@ -76,6 +103,16 @@ def delete_data(pid, sid, jid, data_type): @celery_app.task(name="core.tasks.launch_job") def launch_job(sid_, data_, data_expiry_days=None, token=None): + """ + Task to launch a spider job with the provided data and optional token. + Creates a job using SpiderJobCreateSerializer and passes the job to the job manager. + + Args: + sid_ (int): Spider ID. + data_ (dict): Job data to be serialized and saved. + data_expiry_days (int, optional): Number of days before data expiry. + token (str, optional): Authentication token. If not provided, a default token is used. + """ spider = Spider.objects.get(sid=sid_) if data_expiry_days is None: @@ -122,6 +159,11 @@ def launch_job(sid_, data_, data_expiry_days=None, token=None): @celery_app.task(name="core.tasks.check_and_update_job_status_errors") def check_and_update_job_status_errors(): + """ + Task to check jobs with WAITING_STATUS and update their status to ERROR_STATUS + if no active or succeeded status is detected. Also updates job stats from Redis + and deletes Redis stats if applicable. + """ jobs = SpiderJob.objects.filter(status=SpiderJob.WAITING_STATUS)[ : settings.CHECK_JOB_ERRORS_BATCH_SIZE ] @@ -146,6 +188,20 @@ def check_and_update_job_status_errors(): retry_kwargs={"max_retries": None, "countdown": 600}, ) def record_project_usage_after_data_delete(project_id, job_id): + """ + Task to record the project usage statistics after deleting job data. + Calculates the new usage records based on database size after data deletion. + + Args: + project_id (int): The project ID. + job_id (int): The job ID. + + Raises: + TaskError: If the connection to spiderdata_db_client cannot be established. + + Returns: + str: JSON string with job data size and usage details. + """ if not spiderdata_db_client.get_connection(): raise TaskError("Could not get a connection to the database.") @@ -186,6 +242,14 @@ def record_project_usage_after_data_delete(project_id, job_id): @celery_app.task() def delete_job_data(job_key): + """ + Task to delete all job-related data including items, requests, and logs. + Updates job data status to DELETED_STATUS after deletion. + Calculates a new usage record after data deletion. + + Args: + job_key (str): Unique identifier for the job, typically in the format "jid.sid.pid". + """ jid, sid, pid = job_key.split(".") for data_type in ["items", "requests", "logs"]: delete_data(pid, sid, jid, data_type) @@ -195,6 +259,10 @@ def delete_job_data(job_key): @celery_app.task(name="core.tasks.delete_expired_jobs_data") def delete_expired_jobs_data(): + """ + Task to delete job data for jobs that have passed their data expiry date. + This checks jobs with a PENDING_STATUS and triggers deletion if expiry is met. + """ pending_data_delete_jobs = SpiderJob.objects.filter( data_status=DataStatus.PENDING_STATUS, status__in=[SpiderJob.COMPLETED_STATUS, SpiderJob.STOPPED_STATUS], @@ -211,6 +279,19 @@ def delete_expired_jobs_data(): retry_kwargs={"max_retries": None, "countdown": 600}, ) def record_project_usage_after_job_event(job_id): + """ + Task to record the project usage statistics after a job event (e.g., job completion). + Calculates new usage records based on the job data and updates the project usage records. + + Args: + job_id (int): The job ID. + + Raises: + TaskError: If the connection to spiderdata_db_client cannot be established. + + Returns: + str: JSON string with job data size and usage details. + """ if not spiderdata_db_client.get_connection(): raise TaskError("Could not get a connection to the database.") @@ -230,14 +311,17 @@ def record_project_usage_after_job_event(job_id): str(project.pid), items_collection_name ) unique_collection = False + requests_collection_name = "{}-{}-job_requests".format(job.spider.sid, job.jid) requests_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), requests_collection_name ) + logs_collection_name = "{}-{}-job_logs".format(job.spider.sid, job.jid) logs_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), logs_collection_name ) + # Tracking Proxy Usage proxy_details = {} for proxy_name, proxy_usage_name in settings.PROXY_PROVIDERS_TO_TRACK: @@ -303,6 +387,16 @@ def record_project_usage_after_job_event(job_id): retry_kwargs={"max_retries": None, "countdown": 60}, ) def record_job_coverage_event(job_id): + """ + Task to record job field coverage statistics after a job event. + Collects data on field coverage and updates the job's statistics document in the database. + + Args: + job_id (int): The job ID. + + Raises: + TaskError: If the connection to spiderdata_db_client cannot be established. + """ if not spiderdata_db_client.get_connection(): raise TaskError("Could not get a connection to the database.") job = SpiderJob.objects.get(jid=job_id) @@ -339,6 +433,19 @@ def record_job_coverage_event(job_id): def get_chain_to_process_usage_data(after_delete=False, project_id=None, job_id=None): + """ + Generates a Celery chain to process usage data after a job event or data deletion. + This function dynamically imports tasks from external apps specified in settings and chains + them together with either record_project_usage_after_data_delete or record_project_usage_after_job_event. + + Args: + after_delete (bool): If True, processes usage data after a data deletion event. + project_id (int, optional): Project ID for the data deletion event. + job_id (int, optional): Job ID for the job event or data deletion. + + Returns: + Celery chain: A chain of tasks to be executed sequentially. + """ list_of_process_functions = [] for external_app in settings.DJANGO_EXTERNAL_APPS: module = __import__(f"{external_app}.tasks", fromlist=["tasks"]) From 9416add2b305300a8b99ef8692ad95c88395ff0d Mon Sep 17 00:00:00 2001 From: mgonnav Date: Thu, 24 Oct 2024 10:31:36 -0500 Subject: [PATCH 2/2] docs: document signal on SpiderJob update --- estela-api/core/signals.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/estela-api/core/signals.py b/estela-api/core/signals.py index 3f7b3f33..5e35f414 100644 --- a/estela-api/core/signals.py +++ b/estela-api/core/signals.py @@ -8,15 +8,38 @@ @receiver(post_save, sender=SpiderJob, dispatch_uid="update_usage") def update_usage(sender, instance: SpiderJob, created, **kwargs): + """ + Signal receiver that triggers after a SpiderJob instance is saved/updated. + When the status of the SpiderJob is one of COMPLETED, STOPPED, or ERROR, + this function initiates asynchronous tasks to process usage data and record job coverage. + + Args: + sender (Model): The model class that sent the signal (SpiderJob). + instance (SpiderJob): The specific SpiderJob instance that was saved. + created (bool): Whether this instance was newly created. + **kwargs: Additional keyword arguments provided by the signal. + + Functionality: + 1. If the SpiderJob instance has a status of COMPLETED, STOPPED, or ERROR, it triggers two asynchronous tasks: + - A task chain to process usage data using `get_chain_to_process_usage_data`. + - A task to record the job's field coverage using `record_job_coverage_event`. + 2. Both tasks are scheduled to run after a delay specified in the Django settings. + - The delay for processing usage data is controlled by `settings.COUNTDOWN_RECORD_PROJECT_USAGE_AFTER_JOB_EVENT`. + - The delay for recording job coverage is controlled by `settings.COUNTDOWN_RECORD_COVERAGE_AFTER_JOB_EVENT`. + """ + # Check if the SpiderJob status is either COMPLETED, STOPPED, or ERROR if instance.status in [ SpiderJob.COMPLETED_STATUS, SpiderJob.STOPPED_STATUS, SpiderJob.ERROR_STATUS, ]: + # Initiate the task chain to process usage data after the job event chain_of_usage_process = get_chain_to_process_usage_data(job_id=instance.jid) chain_of_usage_process.apply_async( countdown=settings.COUNTDOWN_RECORD_PROJECT_USAGE_AFTER_JOB_EVENT ) + + # Schedule the task to record job coverage statistics record_job_coverage_event.apply_async( args=[instance.jid], countdown=settings.COUNTDOWN_RECORD_COVERAGE_AFTER_JOB_EVENT,