diff --git a/echo-side/dags/monitor.py b/echo-side/dags/monitor.py index f591dc0..da405f7 100644 --- a/echo-side/dags/monitor.py +++ b/echo-side/dags/monitor.py @@ -1,80 +1,38 @@ -""" -Created on Tue Jun 14 - -@author: dmckay -""" -import airflow from airflow import DAG -from airflow.operators.python import PythonOperator -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from datetime import datetime, timedelta -from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor +from airflow.operators.docker_operator import DockerOperator +from airflow.utils.dates import days_ago +from airflow.models import Variable from datetime import timedelta -new_keys = [] -connection = S3Hook(aws_conn_id='EchoS3') -bucket_name = 'LSST-IR-FUSION-TESTSTRATEGY' - -def run_on_new_file(**kwargs): - s3_hook = S3Hook(aws_conn_id='EchoS3') - bucket_name=bucket_name, - bucket_key='/', - wildcard_match_suffix='.csv', - all_keys = s3_hook.list_keys(bucket_name=bucket_name, prefix=bucket_key, delimiter='/', suffix=wildcard_match_suffix, apply_wildcard=True), - for key in all_keys: - if s3_hook.get_key(key).last_modified > kwargs['execution_date']: - new_keys.append(key) - for key in new_keys: - print(f'New key: {key}') - +# Define default arguments for the DAG default_args = { 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime.now(), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), } +# Instantiate the DAG dag = DAG( - f'monitor-{bucket_name}', + 'list_backup_csvs', default_args=default_args, - description=f'Monitor {bucket_name} S3 bucket for new CSV-formatted upload log files.', - schedule=timedelta(days=1), + description='List backup CSV files from S3 bucket', + schedule_interval=timedelta(days=1), + start_date=days_ago(1), + catchup=False, ) -s3_sensor = S3KeySensor( - task_id='s3_sensor', - bucket_name=bucket_name, - bucket_key='*.csv', - wildcard_match=True, - aws_conn_id='EchoS3', - timeout=1 * 60 * 60, - poke_interval=60, +# DockerOperator to run the container +list_csv_files = DockerOperator( + task_id='list_csv_files', + image='ghcr.io/lsst-uk/csd3-echo-somerville', + command='python csd3-echo-somerville/scripts/list_backup_csvs.py --bucket_name LSST-IR-FUSION-Butlers', + environment={ + 'ECHO_S3_ACCESS_KEY': Variable.get("ECHO_S3_ACCESS_KEY"), + 'ECHO_S3_SECRET_KEY': Variable.get("ECHO_S3_SECRET_KEY"), + }, + auto_remove=True, dag=dag, - default_args=default_args, ) -run_on_new_file_op = PythonOperator( - task_id='run_on_new_file', - python_callable=run_on_new_file, - dag=dag, - default_args=default_args, - op_kwargs={'ds': '{{ ds }}'}, -) - -check_csv_ops = [] -for key in new_keys: - check_csv_op = KubernetesPodOperator( - task_id=f"check_key_{key}", - name=f"check-key-{key}", - namespace="airflow", - image="ghcr.io/lsst-uk/csd3-echo-somerville:latest", - cmds=["python", "scripts/check_upload.py"], - env_vars={'ECHO_S3_ACCESS_KEY': connection.access_key, 'ECHO_S3_SECRET_KEY': connection.secret_key}, - arguments=[bucket_name, key], - get_logs=True, - dag=dag, - ) - check_csv_ops.append(check_csv_op) - -#graph -s3_sensor >> run_on_new_file_op >> check_csv_ops +# Set the task sequence +list_csv_files \ No newline at end of file diff --git a/scripts/list_backup_csvs.py b/scripts/list_backup_csvs.py index 411f43d..0249770 100644 --- a/scripts/list_backup_csvs.py +++ b/scripts/list_backup_csvs.py @@ -12,7 +12,7 @@ import bucket_manager.bucket_manager as bm parser = argparse.ArgumentParser() -parser.add_argument('bucket_name', type=str, help='The name of the S3 bucket.') +parser.add_argument('--bucket_name', '-b', type=str, help='The name of the S3 bucket.') parser.add_argument('--download', action='store_true', default=False, help='Download the backup log.') args = parser.parse_args()