Skip to content

Commit

Permalink
Add filtered index creation DAG (#1833)
Browse files Browse the repository at this point in the history
* Add filtered index creation DAG

Still WIP; data refresh external task sensor does not work and the new DAG is missing logic for deleting previous filtered indexes and unit tests

* Only wait for filtered index creation if a run actually exists

* Delete the previous filtered index

* Reorganise and use more "airflow-y" approach for concurrency check

* Wait for any finished DAG state

* Fix typo

* Allow per-media-type filtered index timeout configuration

* Use more descriptive error

* Fix DAG trigger configuration passing method

* Update DAG docs

* Remove permalinks from argument documentation

* Update dag docs since generator fix

* Skip unnecessary linting steps
  • Loading branch information
sarayourfriend authored May 1, 2023
1 parent 00a26a6 commit 4d6e995
Show file tree
Hide file tree
Showing 13 changed files with 649 additions and 116 deletions.
124 changes: 118 additions & 6 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ The following are DAGs grouped by their primary tag:

## Data Refresh

| DAG ID | Schedule Interval |
| ------------------------------------------- | ----------------- |
| [`audio_data_refresh`](#audio_data_refresh) | `@weekly` |
| [`image_data_refresh`](#image_data_refresh) | `None` |
| DAG ID | Schedule Interval |
| ------------------------------------------------------------- | ----------------- |
| [`audio_data_refresh`](#audio_data_refresh) | `@weekly` |
| [`create_filtered_audio_index`](#create_filtered_audio_index) | `None` |
| [`create_filtered_image_index`](#create_filtered_image_index) | `None` |
| [`image_data_refresh`](#image_data_refresh) | `None` |

## Database

Expand Down Expand Up @@ -103,6 +105,8 @@ The following is documentation associated with each DAG (where available):
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
1. [`audio_data_refresh`](#audio_data_refresh)
1. [`check_silenced_dags`](#check_silenced_dags)
1. [`create_filtered_audio_index`](#create_filtered_audio_index)
1. [`create_filtered_image_index`](#create_filtered_image_index)
1. [`europeana_reingestion_workflow`](#europeana_reingestion_workflow)
1. [`europeana_workflow`](#europeana_workflow)
1. [`finnish_museums_workflow`](#finnish_museums_workflow)
Expand Down Expand Up @@ -196,7 +200,8 @@ process is necessary to make new content added to the Catalog by our provider
DAGs available to the API. You can read more in the
[README](https://github.com/WordPress/openverse-api/blob/main/ingestion_server/README.md)
Importantly, the data refresh TaskGroup is also configured to handle concurrency
requirements of the data refresh server.
requirements of the data refresh server. Finally, once the origin indexes have
been refreshed, the corresponding filtered index creation DAG is triggered.

You can find more background information on this process in the following issues
and related PRs:
Expand Down Expand Up @@ -229,6 +234,112 @@ has been resolved.

The DAG runs weekly.

## `create_filtered_audio_index`

### Create filtered index DAG factory

This module creates the filtered index creation DAGs for each media type using a
factory function.

Filtered index creation is handled by the ingestion server. The DAGs generated
by the `build_create_filtered_index_dag` function in this module are responsible
for triggering the ingestion server action to create and populate the filtered
index for a given media type. The DAG awaits the completion of the filtered
index creation and then points the filtered index alias for the media type to
the newly created index.

#### When this DAG runs

The DAGs generated in this module are triggered by the data refresh DAGs.
Maintaining this process separate from the data refresh DAGs, while still
triggering it there, allows us to run filtered index creation independently of
the full data refresh. This is primarily useful in two cases: for testing
changes to the filtered index creation; and for re-running filtered index
creation if an urgent change to the sensitive terms calls for an immediate
recreation of the filtered indexes.

#### Race conditions

Because filtered index creation employs the `reindex` Elasticsearch API to
derive the filtered index from an existing index, we need to be mindful of the
race condition that potentially exists between the data refresh DAG and this
DAG. The race condition is caused by the fact that the data refresh DAG always
deletes the previous index once the new index for the media type is finished
being created. Consider the situation where filtered index creation is triggered
to run during a data refresh. The filtered index is being derived from the
previous index for the media type. Once the data refresh is finished, it will
delete that index, causing the reindex to halt because suddenly it has no data
source from which to pull documents.

There are two mechanisms that prevent this from happening:

1. The filtered index creation DAGs are not allowed to run if a data refresh for
the media type is already running.
2. The data refresh DAGs will wait for any pre-existing filtered index creation
DAG runs for the media type to finish before continuing.

This ensures that neither are depending on or modifying the origin indexes
critical for the creation of the filtered indexes.

Because the data refresh DAG triggers the filtered index creation DAG, we do
allow a `force` param to be passed to the DAGs generated by this module. This
parameter is only for use by the data refresh DAG and should not be used when
manually triggering the DAG unless you are absolutely certain of what you are
doing.

## `create_filtered_image_index`

### Create filtered index DAG factory

This module creates the filtered index creation DAGs for each media type using a
factory function.

Filtered index creation is handled by the ingestion server. The DAGs generated
by the `build_create_filtered_index_dag` function in this module are responsible
for triggering the ingestion server action to create and populate the filtered
index for a given media type. The DAG awaits the completion of the filtered
index creation and then points the filtered index alias for the media type to
the newly created index.

#### When this DAG runs

The DAGs generated in this module are triggered by the data refresh DAGs.
Maintaining this process separate from the data refresh DAGs, while still
triggering it there, allows us to run filtered index creation independently of
the full data refresh. This is primarily useful in two cases: for testing
changes to the filtered index creation; and for re-running filtered index
creation if an urgent change to the sensitive terms calls for an immediate
recreation of the filtered indexes.

#### Race conditions

Because filtered index creation employs the `reindex` Elasticsearch API to
derive the filtered index from an existing index, we need to be mindful of the
race condition that potentially exists between the data refresh DAG and this
DAG. The race condition is caused by the fact that the data refresh DAG always
deletes the previous index once the new index for the media type is finished
being created. Consider the situation where filtered index creation is triggered
to run during a data refresh. The filtered index is being derived from the
previous index for the media type. Once the data refresh is finished, it will
delete that index, causing the reindex to halt because suddenly it has no data
source from which to pull documents.

There are two mechanisms that prevent this from happening:

1. The filtered index creation DAGs are not allowed to run if a data refresh for
the media type is already running.
2. The data refresh DAGs will wait for any pre-existing filtered index creation
DAG runs for the media type to finish before continuing.

This ensures that neither are depending on or modifying the origin indexes
critical for the creation of the filtered indexes.

Because the data refresh DAG triggers the filtered index creation DAG, we do
allow a `force` param to be passed to the DAGs generated by this module. This
parameter is only for use by the data refresh DAG and should not be used when
manually triggering the DAG unless you are absolutely certain of what you are
doing.

## `europeana_reingestion_workflow`

Content Provider: Europeana
Expand Down Expand Up @@ -327,7 +438,8 @@ process is necessary to make new content added to the Catalog by our provider
DAGs available to the API. You can read more in the
[README](https://github.com/WordPress/openverse-api/blob/main/ingestion_server/README.md)
Importantly, the data refresh TaskGroup is also configured to handle concurrency
requirements of the data refresh server.
requirements of the data refresh server. Finally, once the origin indexes have
been refreshed, the corresponding filtered index creation DAG is triggered.

You can find more background information on this process in the following issues
and related PRs:
Expand Down
126 changes: 126 additions & 0 deletions catalog/dags/common/ingestion_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
import os
from datetime import timedelta
from urllib.parse import urlparse

from airflow.exceptions import AirflowException
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from requests import Response

from common.constants import XCOM_PULL_TEMPLATE


logger = logging.getLogger(__name__)


POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15))


def response_filter_stat(response: Response) -> str:
"""
Handle the response for the `get_current_index` task.
This is used to extract the name of the current index that the concerned alias
points to. This index name will be available via XCom in the downstream tasks.
"""
index_name = response.json()["alt_names"]
# Indices are named as '<media type>-<suffix>', so everything after the first
# hyphen '-' is the suffix.
_, index_suffix = index_name.split("-", maxsplit=1)
return index_suffix


def response_filter_status_check_endpoint(response: Response) -> str:
"""
Handle the response for `trigger_task` task.
This is used to grab the endpoint needed to poll for the status of the triggered
data refresh. This information will then be available via XCom in the downstream
tasks.
"""
status_check_url = response.json()["status_check"]
return urlparse(status_check_url).path


def response_check_wait_for_completion(response: Response) -> bool:
"""
Handle the response for `wait_for_completion` Sensor.
Processes the response to determine whether the task can complete.
"""
data = response.json()

if data["active"]:
# The data refresh is still running. Poll again later.
return False

if data["error"]:
raise AirflowException(
"Ingestion server encountered an error during data refresh."
)

logger.info(f"Data refresh done with {data['progress']}% completed.")
return True


def get_current_index(target_alias: str) -> SimpleHttpOperator:
return SimpleHttpOperator(
task_id="get_current_index",
http_conn_id="data_refresh",
endpoint=f"stat/{target_alias}",
method="GET",
response_check=lambda response: response.status_code == 200,
response_filter=response_filter_stat,
)


def trigger_task(
action: str,
model: str,
data: dict | None = None,
) -> SimpleHttpOperator:
data = {
**(data or {}),
"model": model,
"action": action.upper(),
}
return SimpleHttpOperator(
task_id=f"trigger_{action.lower()}",
http_conn_id="data_refresh",
endpoint="task",
data=data,
response_check=lambda response: response.status_code == 202,
response_filter=response_filter_status_check_endpoint,
)


def wait_for_task(
action: str,
task_trigger: SimpleHttpOperator,
timeout: timedelta,
poke_interval: int = POKE_INTERVAL,
) -> HttpSensor:
return HttpSensor(
task_id=f"wait_for_{action.lower()}",
http_conn_id="data_refresh",
endpoint=XCOM_PULL_TEMPLATE.format(task_trigger.task_id, "return_value"),
method="GET",
response_check=response_check_wait_for_completion,
mode="reschedule",
poke_interval=poke_interval,
timeout=timeout.total_seconds(),
)


def trigger_and_wait_for_task(
action: str,
model: str,
timeout: timedelta,
data: dict | None = None,
poke_interval: int = POKE_INTERVAL,
) -> tuple[SimpleHttpOperator, HttpSensor]:
trigger = trigger_task(action, model, data)
waiter = wait_for_task(action, trigger, timeout, poke_interval)
trigger >> waiter
return trigger, waiter
29 changes: 29 additions & 0 deletions catalog/dags/common/sensors/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import datetime

from airflow.models import DagRun


def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime:
"""
Retrieve the most recent DAG run's execution date.
For use as ``execution_date_fn`` argument to ``ExternalTaskSensor``.
Adapted from https://stackoverflow.com/a/74017474
CC BY-SA 4.0 by Stack Overflow user Nahid O.
"""
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
if dag_runs:
return dag_runs[0].execution_date

# If there are no DAG runs, return an empty list to indicate that
# there are no execution dates to check.
# This works because the sensor waits until the number
# of runs for the execution dates in the ``allowed_states`` matches the
# length of the list of execution dates to check. If there are no runs
# for this DAG, then the only possible number of required states
# we can have is 0. See ``ExternalTaskSensor::poke`` and
# ``ExternalTaskSensor::get_count``, especially the handling
# of ``dttm_filter`` for the relevant implementation details.
return []
Loading

0 comments on commit 4d6e995

Please sign in to comment.