Skip to content

Commit

Permalink
list csvs DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
davedavemckay committed Jun 27, 2024
1 parent 6d39c7b commit 74423b3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 67 deletions.
90 changes: 24 additions & 66 deletions echo-side/dags/monitor.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,38 @@
"""
Created on Tue Jun 14
@author: dmckay
"""
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from datetime import timedelta

new_keys = []
connection = S3Hook(aws_conn_id='EchoS3')
bucket_name = 'LSST-IR-FUSION-TESTSTRATEGY'

def run_on_new_file(**kwargs):
s3_hook = S3Hook(aws_conn_id='EchoS3')
bucket_name=bucket_name,
bucket_key='/',
wildcard_match_suffix='.csv',
all_keys = s3_hook.list_keys(bucket_name=bucket_name, prefix=bucket_key, delimiter='/', suffix=wildcard_match_suffix, apply_wildcard=True),
for key in all_keys:
if s3_hook.get_key(key).last_modified > kwargs['execution_date']:
new_keys.append(key)
for key in new_keys:
print(f'New key: {key}')

# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# Instantiate the DAG
dag = DAG(
f'monitor-{bucket_name}',
'list_backup_csvs',
default_args=default_args,
description=f'Monitor {bucket_name} S3 bucket for new CSV-formatted upload log files.',
schedule=timedelta(days=1),
description='List backup CSV files from S3 bucket',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
catchup=False,
)

s3_sensor = S3KeySensor(
task_id='s3_sensor',
bucket_name=bucket_name,
bucket_key='*.csv',
wildcard_match=True,
aws_conn_id='EchoS3',
timeout=1 * 60 * 60,
poke_interval=60,
# DockerOperator to run the container
list_csv_files = DockerOperator(
task_id='list_csv_files',
image='ghcr.io/lsst-uk/csd3-echo-somerville',
command='python csd3-echo-somerville/scripts/list_backup_csvs.py --bucket_name LSST-IR-FUSION-Butlers',
environment={
'ECHO_S3_ACCESS_KEY': Variable.get("ECHO_S3_ACCESS_KEY"),
'ECHO_S3_SECRET_KEY': Variable.get("ECHO_S3_SECRET_KEY"),
},
auto_remove=True,
dag=dag,
default_args=default_args,
)

run_on_new_file_op = PythonOperator(
task_id='run_on_new_file',
python_callable=run_on_new_file,
dag=dag,
default_args=default_args,
op_kwargs={'ds': '{{ ds }}'},
)

check_csv_ops = []
for key in new_keys:
check_csv_op = KubernetesPodOperator(
task_id=f"check_key_{key}",
name=f"check-key-{key}",
namespace="airflow",
image="ghcr.io/lsst-uk/csd3-echo-somerville:latest",
cmds=["python", "scripts/check_upload.py"],
env_vars={'ECHO_S3_ACCESS_KEY': connection.access_key, 'ECHO_S3_SECRET_KEY': connection.secret_key},
arguments=[bucket_name, key],
get_logs=True,
dag=dag,
)
check_csv_ops.append(check_csv_op)

#graph
s3_sensor >> run_on_new_file_op >> check_csv_ops
# Set the task sequence
list_csv_files
2 changes: 1 addition & 1 deletion scripts/list_backup_csvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import bucket_manager.bucket_manager as bm

parser = argparse.ArgumentParser()
parser.add_argument('bucket_name', type=str, help='The name of the S3 bucket.')
parser.add_argument('--bucket_name', '-b', type=str, help='The name of the S3 bucket.')
parser.add_argument('--download', action='store_true', default=False, help='Download the backup log.')
args = parser.parse_args()

Expand Down

0 comments on commit 74423b3

Please sign in to comment.