diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 088a11ec..123179fa 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -7,9 +7,7 @@ permissions: on: push: branches: - - main - dev - - production pull_request: branches: - main @@ -36,12 +34,8 @@ jobs: - name: Set the environment based on the branch id: define_environment run: | - if [ "${{ github.ref }}" = "refs/heads/main" ]; then - echo "env_name=staging" >> $GITHUB_OUTPUT - elif [ "${{ github.ref }}" = "refs/heads/dev" ]; then + if [ "${{ github.ref }}" = "refs/heads/dev" ]; then echo "env_name=development" >> $GITHUB_OUTPUT - elif [ "${{ github.ref }}" = "refs/heads/production" ]; then - echo "env_name=production" >> $GITHUB_OUTPUT fi - name: Print the environment run: echo "The environment is ${{ steps.define_environment.outputs.env_name }}" @@ -71,11 +65,6 @@ jobs: role-session-name: "veda-airflow-github-${{ needs.define-environment.outputs.env_name }}-deployment" aws-region: us-west-2 - - name: Run MWAA deployment - uses: "./.github/actions/terraform-deploy" - with: - env_aws_secret_name: ${{ secrets.ENV_AWS_SECRET_NAME }} - - name: Run SM2A deployment # Flag to deploy SM2A if: ${{ vars.DEPLOY_SM2A }} = "true" diff --git a/dags/automated_transformation/README.md b/dags/automated_transformation/README.md new file mode 100644 index 00000000..da41c752 --- /dev/null +++ b/dags/automated_transformation/README.md @@ -0,0 +1,5 @@ +## Information about the folder +This folder contains `automation DAG`. It contains the DAG which is essential for transforming the given dataset into COGs. +The folder consists of two files-: +- `automation_dag.py` - It contains the DAG architecture. +- `transformation_pipeline.py` - It contains multiple functions to achieve the transformation of a given netCDF file to a COG. The file fetches checks whether the `transformation plugins` exist on `S3 bucket`, fetches the function and uses it for transformation. The statistics for the `netCDF` and `COG` files are also calculated and stored in a `JSON` file and pushed to `S3` along with the `transformed COG`. \ No newline at end of file diff --git a/dags/automated_transformation/automation_dag.py b/dags/automated_transformation/automation_dag.py new file mode 100644 index 00000000..824a14c8 --- /dev/null +++ b/dags/automated_transformation/automation_dag.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from airflow import DAG +from airflow.decorators import task +from airflow.models.param import Param +from airflow.operators.dummy_operator import DummyOperator +from slack_notifications import slack_fail_alert + +DAG_ID = "automate-cog-transformation" + +# Custom validation function + + +dag_run_config = { + "data_acquisition_method": Param( + "s3", enum=["s3"] + ), # To add Other protocols (HTTP, SFTP...) + "plugins_uri": Param( + "https://raw.githubusercontent.com/US-GHG-Center/ghgc-docs/refs/heads/main/", + type="string", + ), + "raw_data_bucket": "ghgc-data-store-develop", + "raw_data_prefix": Param( + "delivery/gpw", + type="string", + pattern="^[^/].*[^/]$", + ), + "dest_data_bucket": "ghgc-data-store-develop", + "data_prefix": Param("transformed_cogs", type="string", pattern="^[^/].*[^/]$"), + "collection_name": "gpw", + "nodata": Param(-9999, type="number"), + "ext": Param(".tif", type="string", pattern="^\\..*$"), +} + +with DAG( + dag_id=DAG_ID, + schedule=None, + catchup=False, + tags=["Transformation", "Report"], + params=dag_run_config, + on_failure_callback=slack_fail_alert, +) as dag: + start = DummyOperator(task_id="start", dag=dag) + end = DummyOperator(task_id="end", dag=dag) + + @task + def check_function_exists(ti): + from dags.automated_transformation.transformation_pipeline import ( + download_python_file, + ) + + config = ti.dag_run.conf + folder_name = "data_transformation_plugins" + file_name = f'{config.get("collection_name")}_transformation.py' + try: + plugin_url = f"{config['plugins_uri'].strip('/')}/{folder_name}/{file_name}" + download_python_file(uri=plugin_url, check_exist=True) + return f"The {file_name} exists in {folder_name} in this URL {plugin_url}." + except Exception as e: + raise Exception(f"Error checking file existence: {e}") + + @task + def discover_files(ti): + from dags.automated_transformation.transformation_pipeline import ( + get_all_s3_keys, + ) + + config = ti.dag_run.conf.copy() + bucket = config.get("raw_data_bucket") + model_name = config.get("raw_data_prefix") + ext = config.get("ext") # .nc as well + generated_list = get_all_s3_keys(bucket, model_name, ext) + chunk_size = int(len(generated_list) / 900) + 1 + return [ + generated_list[i : i + chunk_size] + for i in range(0, len(generated_list), chunk_size) + ] + + @task(max_active_tis_per_dag=1) + def process_files(file_url, **kwargs): + dag_run = kwargs.get("dag_run") + from dags.automated_transformation.transformation_pipeline import transform_cog + + config = dag_run.conf.copy() + raw_bucket_name = config.get("raw_data_bucket") + dest_data_bucket = config.get("dest_data_bucket") + data_prefix = config.get("data_prefix") + nodata = config.get("nodata") + collection_name = config.get("collection_name") + print(f"The file I am processing is {file_url}") + print("len of files", len(file_url)) + folder_name = "data_transformation_plugins" + file_name = f"{collection_name}_transformation.py" + plugin_url = f"{config['plugins_uri'].strip('/')}/{folder_name}/{file_name}" + + file_status = transform_cog( + file_url, + plugin_url=plugin_url, + nodata=nodata, + raw_data_bucket=raw_bucket_name, + dest_data_bucket=dest_data_bucket, + data_prefix=data_prefix, + collection_name=collection_name, + ) + return file_status + + @task + def generate_report(reports, **kwargs): + dag_run = kwargs.get("dag_run") + collection_name = dag_run.conf.get("collection_name") + count, failed_files = 0, [] + for report in reports: + if "failed" in report.values(): + failed_files.append(report) + elif "success" in report.values(): + count += 1 + + if failed_files: + raise Exception(f"Error generating COG file {failed_files}") + return { + "collection": collection_name, + "successes": count, + "failures": failed_files, + } + + urls = start >> check_function_exists() >> discover_files() + report_data = process_files.expand(file_url=urls) + generate_report(reports=report_data) >> end diff --git a/dags/automated_transformation/transformation_pipeline.py b/dags/automated_transformation/transformation_pipeline.py new file mode 100644 index 00000000..96af662c --- /dev/null +++ b/dags/automated_transformation/transformation_pipeline.py @@ -0,0 +1,233 @@ +import importlib +import json +import os +import tempfile + +import boto3 +import numpy as np +import rasterio +import requests +import s3fs + + +def get_all_s3_keys(bucket, model_name, ext) -> list: + """Function fetches all the s3 keys from the given bucket and model name. + + Args: + bucket (str): Name of the bucket from where we want to fetch the data + model_name (str): Dataset name/folder name where the data is stored + ext (str): extension of the file that is to be fetched. + + Returns: + list : List of all the keys that match the given criteria + """ + session = boto3.session.Session() + s3_client = session.client("s3") + keys = [] + + kwargs = {"Bucket": bucket, "Prefix": f"{model_name}"} + try: + while True: + resp = s3_client.list_objects_v2(**kwargs) + print("response is ", resp) + for obj in resp["Contents"]: + if obj["Key"].endswith(ext) and "historical" not in obj["Key"]: + keys.append(obj["Key"]) + + try: + kwargs["ContinuationToken"] = resp["NextContinuationToken"] + except KeyError: + break + except Exception as ex: + raise Exception(f"Error returned is {ex}") + + print(f"Discovered {len(keys)}") + return keys + + +def download_python_file_from_s3(bucket_name, s3_key): + """ + Downloads a Python file from an S3 bucket and returns a temporary file path. + + Parameters: + - bucket_name (str): The name of the S3 bucket. + - s3_key (str): The key (path) to the file in the S3 bucket. + + Returns: + - str: Path to the temporary file. + """ + s3 = boto3.client("s3") + + # Create a temporary file + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".py") + temp_file.close() # Close the file so it can be written to by boto3 + + # Download the S3 file to the temporary file location + s3.download_file(bucket_name, s3_key, temp_file.name) + print( + f"Downloaded {s3_key} from bucket {bucket_name} to temporary file {temp_file.name}" + ) + + return temp_file.name + + +def download_python_file(uri: str, check_exist=False): + if uri.startswith("s3://"): + # Remove the 's3://' prefix + s3_path = uri[5:] + # Split into bucket and key + parts = s3_path.split("/", 1) + bucket_name, key = parts + return download_python_file_from_s3(bucket_name=bucket_name, s3_key=key) + return download_python_file_from_github(url=uri, check_exist=check_exist) + + +def download_python_file_from_github(url, check_exist=False): + try: + # Send a GET request to the URL + response = requests.get(url) + response.raise_for_status() # Raise an error for HTTP errors + if check_exist: + return True + + # Extract the file name from the URL + file_name = os.path.basename(url) + + # Create a temporary directory and file with the same name + temp_dir = tempfile.gettempdir() + temp_file_path = os.path.join(temp_dir, file_name) + + # Write the content to the temporary file + with open(temp_file_path, "wb") as temp_file: + temp_file.write(response.content) + + print(f"File downloaded to: {temp_file_path}") + return temp_file_path + + except requests.exceptions.RequestException as e: + print(f"Error downloading the file: {e}") + return None + + +def load_function_from_file(file_path, function_name): + """ + Dynamically loads a function from a Python file. + + Parameters: + - file_path (str): Path to the Python file. + - function_name (str): Name of the function to load. + + Returns: + - function: The loaded function object. + """ + # Load the module from the file path + spec = importlib.util.spec_from_file_location("dynamic_module", file_path) + dynamic_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(dynamic_module) + + # Return the function + return getattr(dynamic_module, function_name) + + +def transform_cog( + name_list, + nodata, + raw_data_bucket, + dest_data_bucket, + data_prefix, + collection_name, + plugin_url, +): + """This function calls the plugins (dataset specific transformation functions) and + generalizes the transformation of dataset to COGs. + + Args: + plugin_url: + name_list (str): List of the files to be transformed + nodata (str): Nodata value as mentioned by the data provider + raw_data_bucket (str): Name of the bucket where the raw data resides + dest_data_bucket (str): Name of the bucket where we want to store the tranformed cogs + raw_data_prefix (str): Folder where the netCDF files are stored in the bucket + collection_name (str): Name of the collection that would be used for the dataset + + Returns: + dict: Status and name of the file that is transformed + """ + + session = boto3.session.Session() + s3_client = session.client("s3") + json_dict = {} + function_name = f'{collection_name.replace("-", "_")}_transformation' + temp_file_path = download_python_file(plugin_url) + for name in name_list: + url = f"s3://{raw_data_bucket}/{name}" + fs = s3fs.S3FileSystem() + print("the url is", url) + with fs.open(url, mode="rb") as file_obj: + try: + transform_func = load_function_from_file(temp_file_path, function_name) + var_data_netcdf = transform_func(file_obj, name, nodata) + + for cog_filename, data in var_data_netcdf.items(): + # generate COG + min_value_netcdf = data.min().item() + max_value_netcdf = data.max().item() + std_value_netcdf = data.std().item() + mean_value_netcdf = data.mean().item() + COG_PROFILE = {"driver": "COG", "compress": "DEFLATE"} + with tempfile.NamedTemporaryFile() as temp_file: + data.rio.to_raster(temp_file.name, **COG_PROFILE) + s3_client.upload_file( + Filename=temp_file.name, + Bucket=dest_data_bucket, + Key=f"{data_prefix}/{collection_name}/{cog_filename}", + ) + raster_data = rasterio.open(temp_file.name).read() + raster_data[raster_data == -9999] = np.nan + min_value_cog = np.nanmin(raster_data) + max_value_cog = np.nanmax(raster_data) + mean_value_cog = np.nanmean(raster_data) + std_value_cog = np.nanstd(raster_data) + json_dict.update( + { + "original_file_url": name, + "transformed_filename": cog_filename, + "transformed_cog_s3uri": f"s3://{dest_data_bucket}/{data_prefix}/{cog_filename}", + "minimum_value_cog": f"{min_value_cog:.4f}", + "maximum_value_cog": f"{max_value_cog:.4f}", + "std_value_cog": f"{std_value_cog:.4f}", + "mean_value_cog": f"{mean_value_cog:.4f}", + "minimum_value_netcdf": f"{min_value_netcdf:.4f}", + "maximum_value_netcdf": f"{max_value_netcdf:.4f}", + "std_value_netcdf": f"{std_value_netcdf:.4f}", + "mean_value_netcdf": f"{mean_value_netcdf:.4f}", + } + ) + with tempfile.NamedTemporaryFile() as json_temp: + with open(json_temp.name, "w") as fp: + json.dump(json_dict, fp, indent=4) + print("JSON dictionary is ", json_dict) + + # Upload the file to the specified S3 bucket and folder + s3_client.upload_file( + Filename=json_temp.name, + Bucket=dest_data_bucket, + Key=f"{data_prefix}/{collection_name}/{cog_filename[:-4]}.json", + ExtraArgs={"ContentType": "application/json"}, + ) + status = { + "transformed_filename": cog_filename, + "statistics_file": f"{cog_filename.split('.')[0]}.json", + "s3uri": f"s3://{dest_data_bucket}/{data_prefix}/{collection_name}/{cog_filename}", + "status": "success", + } + + except Exception as ex: + status = { + "transformed_filename": name, + "status": "failed", + "reason": f"Error: {ex}", + } + finally: + os.remove(temp_file_path) + return status diff --git a/dags/example_dag.py b/dags/example_dag.py index 7bd11599..6bd2ce62 100644 --- a/dags/example_dag.py +++ b/dags/example_dag.py @@ -5,13 +5,14 @@ from airflow import DAG from airflow.operators.dummy_operator import DummyOperator as EmptyOperator from airflow.operators.python import PythonOperator - +import sys def log_task(text: str): logging.info(text) def discover_from_cmr_task(text): + print(f"Python version used {sys.version}") log_task(text) diff --git a/dags/generate_dags.py b/dags/generate_dags.py index 9eeab714..7bd2c31b 100644 --- a/dags/generate_dags.py +++ b/dags/generate_dags.py @@ -15,9 +15,9 @@ def generate_dags(): from pathlib import Path - - mwaa_stac_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) - bucket = mwaa_stac_conf["EVENT_BUCKET"] + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + bucket = airflow_vars_json.get("EVENT_BUCKET") try: client = boto3.client("s3") diff --git a/dags/requirements.txt b/dags/requirements.txt index 8c9ec097..a9ab2a39 100644 --- a/dags/requirements.txt +++ b/dags/requirements.txt @@ -11,7 +11,6 @@ apache-airflow-providers-postgres==5.2.2 apache-airflow-providers-common-sql==1.2.0 typing-extensions==4.4.0 psycopg2-binary==2.9.5 -pypgstac==0.7.4 pyOpenSSL==22.0.0 stac-pydantic fsspec diff --git a/dags/slack_notifications.py b/dags/slack_notifications.py new file mode 100644 index 00000000..d57dfbf5 --- /dev/null +++ b/dags/slack_notifications.py @@ -0,0 +1,48 @@ +from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.models.variable import Variable + + +def slack_alert(context, circle, status): + slack_conn_id = "slack_connection_id" + ti = context.get("task_instance") + pocs = ti.dag_run.conf.get("pocs", []) + vars = Variable.get("aws_dags_variables", deserialize_json=True) + sm2a_base_url = f'https://{vars.get("SM2A_BASE_URL", "localhost:8080")}' + slack_msg = """ + :{circle}: Task {status}. + *Task*: {task} + *Dag*: {dag} + *Execution Time*: {exec_date} + *Log Url*: {log_url} + + """.format( + status=status, + circle=circle, + task=ti.task_id, + dag=ti.dag_id, + exec_date=context.get("data_interval_start"), + log_url=ti.log_url.replace("http://localhost:8080", sm2a_base_url), + ) + if pocs: + pocs_mentions = " ".join([f"<@{poc}>" for poc in pocs]) + slack_msg = f"{slack_msg}\n*points of contact*: {pocs_mentions}" + print(f"{context=}") + _alert = SlackWebhookOperator( + task_id="slack_test", + slack_webhook_conn_id=slack_conn_id, + message=slack_msg, + username="airflow", + ) + return _alert.execute(context=context) + + +def slack_fail_alert(context): + return slack_alert(context=context, circle="red_circle", status="Failed") + + +def slack_success_alert(context): + return slack_alert(context=context, circle="large_green_circle", status="Succeeded") + + +def slack_warning_alert(context): + return slack_alert(context=context, circle="large_yellow_circle", status="Warning") diff --git a/dags/veda_data_pipeline/groups/collection_group.py b/dags/veda_data_pipeline/groups/collection_group.py index de4f2dd1..0a561175 100644 --- a/dags/veda_data_pipeline/groups/collection_group.py +++ b/dags/veda_data_pipeline/groups/collection_group.py @@ -32,28 +32,40 @@ def ingest_collection_task(ti): dataset (Dict[str, Any]): dataset dictionary (JSON) role_arn (str): role arn for Zarr collection generation """ + import json collection = ti.xcom_pull(task_ids='Collection.generate_collection') + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET") + stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL") return submission_handler( event=collection, endpoint="/collections", - cognito_app_secret=Variable.get("COGNITO_APP_SECRET"), - stac_ingestor_api_url=Variable.get("STAC_INGESTOR_API_URL"), + cognito_app_secret=cognito_app_secret, + stac_ingestor_api_url=stac_ingestor_api_url ) # NOTE unused, but useful for item ingests, since collections are a dependency for items def check_collection_exists_task(ti): + import json config = ti.dag_run.conf + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + stac_url = airflow_vars_json.get("STAC_URL") return check_collection_exists( - endpoint=Variable.get("STAC_URL", default_var=None), + endpoint=stac_url, collection_id=config.get("collection"), ) def generate_collection_task(ti): + import json config = ti.dag_run.conf - role_arn = Variable.get("ASSUME_ROLE_READ_ARN", default_var=None) + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") # TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable collection = generator.generate_stac( diff --git a/dags/veda_data_pipeline/groups/discover_group.py b/dags/veda_data_pipeline/groups/discover_group.py index 798f5f10..8cd2d205 100644 --- a/dags/veda_data_pipeline/groups/discover_group.py +++ b/dags/veda_data_pipeline/groups/discover_group.py @@ -1,34 +1,38 @@ -import time +from datetime import timedelta +import json import uuid from airflow.models.variable import Variable from airflow.models.xcom import LazyXComAccess -from airflow.operators.dummy_operator import DummyOperator as EmptyOperator -from airflow.decorators import task_group -from airflow.operators.python import BranchPythonOperator, PythonOperator, ShortCircuitOperator -from airflow.utils.trigger_rule import TriggerRule -from airflow_multi_dagrun.operators import TriggerMultiDagRunOperator +from airflow.decorators import task from veda_data_pipeline.utils.s3_discovery import ( s3_discovery_handler, EmptyFileListError ) +from deprecated import deprecated group_kwgs = {"group_id": "Discover", "tooltip": "Discover"} -def discover_from_s3_task(ti, event={}, **kwargs): +@task(retries=1, retry_delay=timedelta(minutes=1)) +def discover_from_s3_task(ti=None, event={}, **kwargs): """Discover grouped assets/files from S3 in batches of 2800. Produce a list of such files stored on S3 to process. This task is used as part of the discover_group subdag and outputs data to EVENT_BUCKET. """ + payload = kwargs.get("payload", ti.dag_run.conf) config = { **event, - **ti.dag_run.conf, + **payload, } + # TODO test that this context var is available in taskflow last_successful_execution = kwargs.get("prev_start_date_success") if event.get("schedule") and last_successful_execution: config["last_successful_execution"] = last_successful_execution.isoformat() # (event, chunk_size=2800, role_arn=None, bucket_output=None): - MWAA_STAC_CONF = Variable.get("MWAA_STACK_CONF", deserialize_json=True) - read_assume_arn = Variable.get("ASSUME_ROLE_READ_ARN", default_var=None) + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + event_bucket = airflow_vars_json.get("EVENT_BUCKET") + read_assume_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") # Making the chunk size small, this helped us process large data faster than # passing a large chunk of 500 chunk_size = config.get("chunk_size", 500) @@ -36,84 +40,83 @@ def discover_from_s3_task(ti, event={}, **kwargs): return s3_discovery_handler( event=config, role_arn=read_assume_arn, - bucket_output=MWAA_STAC_CONF["EVENT_BUCKET"], + bucket_output=event_bucket, chunk_size=chunk_size ) except EmptyFileListError as ex: print(f"Received an exception {ex}") - return [] + # TODO test continued short circuit operator behavior (no files -> skip remaining tasks) + return {} + + +@task +def get_files_task(payload, ti=None): + """ + Get files from S3 produced by discovery or dataset tasks. + Handles both single payload and multiple payload scenarios. + """ + dag_run_id = ti.dag_run.run_id + results = [] + + # Handle multiple payloads (dataset and items case) + payloads = payload if isinstance(payload, list) else [payload] + + for item in payloads: + if isinstance(item, LazyXComAccess): # Dynamic task mapping case + payloads_xcom = item[0].pop("payload", []) + base_payload = item[0] + else: + payloads_xcom = item.pop("payload", []) + base_payload = item + + for indx, payload_xcom in enumerate(payloads_xcom): + results.append({ + "run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", + **base_payload, + "payload": payload_xcom, + }) + return results -def get_files_to_process(ti): +@task +@deprecated(reason="Please use get_files_task function that handles both files and dataset files use cases") +def get_files_to_process(payload, ti=None): """Get files from S3 produced by the discovery task. Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks. """ - dynamic_group_id = ti.task_id.split(".")[0] - payload = ti.xcom_pull(task_ids=f"{dynamic_group_id}.discover_from_s3") - if isinstance(payload, LazyXComAccess): + if isinstance(payload, LazyXComAccess): # if used as part of a dynamic task mapping payloads_xcom = payload[0].pop("payload", []) payload = payload[0] else: payloads_xcom = payload.pop("payload", []) dag_run_id = ti.dag_run.run_id - for indx, payload_xcom in enumerate(payloads_xcom): - time.sleep(2) - yield { - "run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", - **payload, - "payload": payload_xcom, - } + return [{ + "run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", + **payload, + "payload": payload_xcom, + } for indx, payload_xcom in enumerate(payloads_xcom)] -def vector_raster_choice(ti): - """Choose whether to process rasters or vectors based on the payload.""" - payload = ti.dag_run.conf - dynamic_group_id = ti.task_id.split(".")[0] - - if payload.get("vector"): - return f"{dynamic_group_id}.parallel_run_process_generic_vectors" - if payload.get("vector_eis"): - return f"{dynamic_group_id}.parallel_run_process_vectors" - return f"{dynamic_group_id}.parallel_run_process_rasters" - -@task_group -def subdag_discover(event={}): - discover_from_s3 = ShortCircuitOperator( - task_id="discover_from_s3", - python_callable=discover_from_s3_task, - op_kwargs={"text": "Discover from S3", "event": event}, - trigger_rule=TriggerRule.NONE_FAILED, - provide_context=True, - ) - - raster_vector_branching = BranchPythonOperator( - task_id="raster_vector_branching", - python_callable=vector_raster_choice, - ) - - run_process_raster = TriggerMultiDagRunOperator( - task_id="parallel_run_process_rasters", - trigger_dag_id="veda_ingest_raster", - python_callable=get_files_to_process, - ) - - run_process_vector = TriggerMultiDagRunOperator( - task_id="parallel_run_process_vectors", - trigger_dag_id="veda_ingest_vector", - python_callable=get_files_to_process, - ) - - run_process_generic_vector = TriggerMultiDagRunOperator( - task_id="parallel_run_process_generic_vectors", - trigger_dag_id="veda_generic_ingest_vector", - python_callable=get_files_to_process, - ) +@task +@deprecated(reason="Please use get_files_task airflow task instead. This will be removed in the new release") +def get_dataset_files_to_process(payload, ti=None): + """Get files from S3 produced by the dataset task. + This is different from the get_files_to_process task as it produces a combined structure from repeated mappings. + """ + dag_run_id = ti.dag_run.run_id - # extra no-op, needed to run in dynamic mapping context - end_discover = EmptyOperator(task_id="end_discover", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,) - - discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector, run_process_generic_vector] - run_process_raster >> end_discover - run_process_vector >> end_discover - run_process_generic_vector >> end_discover - + result = [] + for x in payload: + if isinstance(x, LazyXComAccess): # if used as part of a dynamic task mapping + payloads_xcom = x[0].pop("payload", []) + payload_0 = x[0] + else: + payloads_xcom = x.pop("payload", []) + payload_0 = x + for indx, payload_xcom in enumerate(payloads_xcom): + result.append({ + "run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", + **payload_0, + "payload": payload_xcom, + }) + return result diff --git a/dags/veda_data_pipeline/groups/processing_group.py b/dags/veda_data_pipeline/groups/processing_group.py deleted file mode 100644 index 9a8382f9..00000000 --- a/dags/veda_data_pipeline/groups/processing_group.py +++ /dev/null @@ -1,95 +0,0 @@ -import json -import logging -from datetime import timedelta - -import smart_open -from airflow.models.variable import Variable -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator -from airflow.utils.task_group import TaskGroup -from veda_data_pipeline.utils.submit_stac import ( - submission_handler, -) - -group_kwgs = {"group_id": "Process", "tooltip": "Process"} - - -def log_task(text: str): - logging.info(text) - - -def submit_to_stac_ingestor_task(ti): - """Submit STAC items to the STAC ingestor API.""" - print("Submit STAC ingestor") - event = json.loads(ti.xcom_pull(task_ids=f"{group_kwgs['group_id']}.build_stac")) - success_file = event["payload"]["success_event_key"] - with smart_open.open(success_file, "r") as _file: - stac_items = json.loads(_file.read()) - - for item in stac_items: - submission_handler( - event=item, - endpoint="/ingestions", - cognito_app_secret=Variable.get("COGNITO_APP_SECRET"), - stac_ingestor_api_url=Variable.get("STAC_INGESTOR_API_URL"), - ) - return event - - -def subdag_process(): - with TaskGroup(**group_kwgs) as process_grp: - mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) - build_stac = EcsRunTaskOperator( - task_id="build_stac", - trigger_rule="none_failed", - cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", - task_definition=f"{mwaa_stack_conf.get('PREFIX')}-tasks", - launch_type="FARGATE", - do_xcom_push=True, - execution_timeout=timedelta(minutes=60), - overrides={ - "containerOverrides": [ - { - "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-stac-build", - "command": [ - "/usr/local/bin/python", - "handler.py", - "--payload", - "{}".format("{{ task_instance.dag_run.conf }}"), - ], - "environment": [ - { - "name": "EXTERNAL_ROLE_ARN", - "value": Variable.get( - "ASSUME_ROLE_READ_ARN", default_var="" - ), - }, - { - "name": "BUCKET", - "value": "veda-data-pipelines-staging-lambda-ndjson-bucket", - }, - { - "name": "EVENT_BUCKET", - "value": mwaa_stack_conf.get("EVENT_BUCKET"), - }, - ], - "memory": 2048, - "cpu": 1024, - }, - ], - }, - network_configuration={ - "awsvpcConfiguration": { - "securityGroups": mwaa_stack_conf.get("SECURITYGROUPS"), - "subnets": mwaa_stack_conf.get("SUBNETS"), - }, - }, - awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"), - awslogs_stream_prefix=f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-stac-build", # prefix with container name - ) - submit_to_stac_ingestor = PythonOperator( - task_id="submit_to_stac_ingestor", - python_callable=submit_to_stac_ingestor_task, - ) - build_stac >> submit_to_stac_ingestor - return process_grp diff --git a/dags/veda_data_pipeline/groups/processing_tasks.py b/dags/veda_data_pipeline/groups/processing_tasks.py new file mode 100644 index 00000000..4ce85e61 --- /dev/null +++ b/dags/veda_data_pipeline/groups/processing_tasks.py @@ -0,0 +1,60 @@ +from datetime import timedelta +import json +import logging +from copy import deepcopy +import smart_open +from airflow.models.variable import Variable +from airflow.decorators import task +from veda_data_pipeline.utils.submit_stac import submission_handler + +group_kwgs = {"group_id": "Process", "tooltip": "Process"} + + +def log_task(text: str): + logging.info(text) + +@task +def extract_discovery_items_from_payload(ti, payload=None, **kwargs): + discovery_items = ti.dag_run.conf.get("discovery_items") if not payload else payload.get("discovery_items") + return discovery_items + +@task +def remove_thumbnail_asset(ti): + payload = deepcopy(ti.dag_run.conf) + assets = payload.get("assets", {}) + if assets.get("thumbnail"): + assets.pop("thumbnail") + # if thumbnail was only asset, delete assets + if not assets: + payload.pop("assets") + return payload + +@task(retries=1, retry_delay=timedelta(minutes=1)) +def submit_to_stac_ingestor_task(built_stac: dict): + """Submit STAC items to the STAC ingestor API.""" + event = built_stac.copy() + success_file = event["payload"]["success_event_key"] + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET") + stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL") + with smart_open.open(success_file, "r") as _file: + stac_items = json.loads(_file.read()) + + for item in stac_items: + submission_handler( + event=item, + endpoint="/ingestions", + cognito_app_secret=cognito_app_secret, + stac_ingestor_api_url=stac_ingestor_api_url, + ) + return event + + +@task(max_active_tis_per_dag=5) +def build_stac_task(payload): + from veda_data_pipeline.utils.build_stac.handler import stac_handler + airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True) + event_bucket = airflow_vars_json.get("EVENT_BUCKET") + return stac_handler(payload_src=payload, bucket_output=event_bucket) diff --git a/dags/veda_data_pipeline/groups/transfer_group.py b/dags/veda_data_pipeline/groups/transfer_group.py index a4235496..6eab6cc3 100644 --- a/dags/veda_data_pipeline/groups/transfer_group.py +++ b/dags/veda_data_pipeline/groups/transfer_group.py @@ -1,13 +1,9 @@ -from datetime import timedelta - from airflow.models.variable import Variable from airflow.operators.python import BranchPythonOperator, PythonOperator -from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator +import json from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule -from veda_data_pipeline.utils.transfer import ( - data_transfer_handler, -) +from airflow.decorators import task group_kwgs = {"group_id": "Transfer", "tooltip": "Transfer"} @@ -22,12 +18,36 @@ def cogify_choice(ti): return f"{group_kwgs['group_id']}.copy_data" -def transfer_data(ti): - """Transfer data from one S3 bucket to another; s3 copy, no need for docker""" +def cogify_copy_task(ti): + from veda_data_pipeline.utils.cogify_transfer.handler import cogify_transfer_handler config = ti.dag_run.conf - role_arn = Variable.get("ASSUME_ROLE_READ_ARN", default_var="") + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") + return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn) + +@task +def transfer_data(ti, payload): + """Transfer data from one S3 bucket to another; s3 copy, no need for docker""" + from veda_data_pipeline.utils.transfer import ( + data_transfer_handler, + ) + # use task-provided payload if provided, otherwise fall back on ti values + # payload will generally have the same values expected by discovery, so some renames are needed when combining the dicts + config = { + **payload, + **ti.dag_run.conf, + "origin_bucket": payload.get("bucket", ti.dag_run.conf.get("origin_bucket", "veda-data-store")), + "origin_prefix": payload.get("prefix", ti.dag_run.conf.get("origin_prefix", "s3-prefix/")), + "target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")), + "dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", False)), + } + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") # (event, chunk_size=2800, role_arn=None, bucket_output=None): - return data_transfer_handler(event=config, role_arn=role_arn) + return data_transfer_handler(event=config, role_arn=external_role_arn) + # TODO: cogify_transfer handler is missing arg parser so this subdag will not work def subdag_transfer(): @@ -43,47 +63,10 @@ def subdag_transfer(): python_callable=transfer_data, op_kwargs={"text": "Copy files on S3"}, ) - - mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) - run_cogify_copy = EcsRunTaskOperator( + run_cogify_copy = PythonOperator( task_id="cogify_and_copy_data", trigger_rule="none_failed", - cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", - task_definition=f"{mwaa_stack_conf.get('PREFIX')}-transfer-tasks", - launch_type="FARGATE", - do_xcom_push=True, - execution_timeout=timedelta(minutes=120), - overrides={ - "containerOverrides": [ - { - "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-cogify-transfer", - "command": [ - "/usr/local/bin/python", - "handler.py", - "--payload", - "{}".format("{{ task_instance.dag_run.conf }}"), - ], - "environment": [ - { - "name": "EXTERNAL_ROLE_ARN", - "value": Variable.get( - "ASSUME_ROLE_READ_ARN", default_var="" - ), - }, - ], - "memory": 2048, - "cpu": 1024, - }, - ], - }, - network_configuration={ - "awsvpcConfiguration": { - "securityGroups": mwaa_stack_conf.get("SECURITYGROUPS"), - "subnets": mwaa_stack_conf.get("SUBNETS"), - }, - }, - awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"), - awslogs_stream_prefix=f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-cogify-transfer", # prefix with container name + python_callable=cogify_copy_task ) (cogify_branching >> [run_copy, run_cogify_copy]) diff --git a/dags/veda_data_pipeline/utils/build_stac/handler.py b/dags/veda_data_pipeline/utils/build_stac/handler.py new file mode 100644 index 00000000..a13350b0 --- /dev/null +++ b/dags/veda_data_pipeline/utils/build_stac/handler.py @@ -0,0 +1,133 @@ +import json +from typing import Any, Dict, TypedDict, Union +from uuid import uuid4 +import smart_open +from veda_data_pipeline.utils.build_stac.utils import events +from veda_data_pipeline.utils.build_stac.utils import stac +from concurrent.futures import ThreadPoolExecutor, as_completed + + + +class S3LinkOutput(TypedDict): + stac_file_url: str + + +def using_pool(objects, workers_count: int): + returned_results = [] + with ThreadPoolExecutor(max_workers=workers_count) as executor: + # Submit tasks to the executor + futures = {executor.submit(handler, obj): obj for obj in objects} + + for future in as_completed(futures): + try: + result = future.result() # Get result from future + returned_results.append(result) + except Exception as nex: + print(f"Error {nex} with object {futures[future]}") + + return returned_results + + +class StacItemOutput(TypedDict): + stac_item: Dict[str, Any] + + +def handler(event: Dict[str, Any]) -> Union[S3LinkOutput, StacItemOutput]: + """ + Handler for STAC Collection Item generation + + Arguments: + event - object with event parameters + { + "collection": "OMDOAO3e", + "id_regex": "_(.*).tif", + "assets": { + "OMDOAO3e_LUT": { + "title": "OMDOAO3e_LUT", + "description": "OMDOAO3e_LUT, described", + "href": "s3://climatedashboard-data/OMDOAO3e/OMDOAO3e_LUT.tif", + }, + "OMDOAO3e_LUT": { + "title": "OMDOAO3e_LUT", + "description": "OMDOAO3e_LUT, described", + "href": "s3://climatedashboard-data/OMDOAO3e/OMDOAO3e_LUT.tif", + } + } + } + + """ + + parsed_event = events.RegexEvent.parse_obj(event) + try: + stac_item = stac.generate_stac(parsed_event).to_dict() + except Exception as ex: + out_err: StacItemOutput = {"stac_item": {"error": f"{ex}", "event": event}} + return out_err + + output: StacItemOutput = {"stac_item": stac_item} + return output + + +def sequential_processing(objects): + returned_results = [] + for _object in objects: + result = handler(_object) + returned_results.append(result) + return returned_results + + +def write_outputs_to_s3(key, payload_success, payload_failures): + success_key = f"{key}/build_stac_output_{uuid4()}.json" + with smart_open.open(success_key, "w") as _file: + _file.write(json.dumps(payload_success)) + dead_letter_key = "" + if payload_failures: + dead_letter_key = f"{key}/dead_letter_events/build_stac_failed_{uuid4()}.json" + with smart_open.open(dead_letter_key, "w") as _file: + _file.write(json.dumps(payload_failures)) + return [success_key, dead_letter_key] + + +def stac_handler(payload_src: dict, bucket_output): + payload_event = payload_src.copy() + s3_event = payload_event.pop("payload") + collection = payload_event.get("collection", "not_provided") + key = f"s3://{bucket_output}/events/{collection}" + payload_success = [] + payload_failures = [] + with smart_open.open(s3_event, "r") as _file: + s3_event_read = _file.read() + event_received = json.loads(s3_event_read) + objects = event_received["objects"] + use_multithreading = payload_event.get("use_multithreading", True) + payloads = ( + using_pool(objects, workers_count=4) + if use_multithreading + else sequential_processing(objects) + ) + for payload in payloads: + stac_item = payload["stac_item"] + if "error" in stac_item: + payload_failures.append(stac_item) + else: + payload_success.append(stac_item) + success_key, dead_letter_key = write_outputs_to_s3( + key=key, payload_success=payload_success, payload_failures=payload_failures + ) + + # Silent dead letters are nice, but we want the Airflow UI to quickly alert us if something went wrong. + if len(payload_failures) != 0: + raise ValueError( + f"Some items failed to be processed. Failures logged here: {dead_letter_key}" + ) + + return { + "payload": { + "success_event_key": success_key, + "failed_event_key": dead_letter_key, + "status": { + "successes": len(payload_success), + "failures": len(payload_failures), + }, + } + } diff --git a/dags/veda_data_pipeline/utils/build_stac/utils/__init__.py b/dags/veda_data_pipeline/utils/build_stac/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dags/veda_data_pipeline/utils/build_stac/utils/events.py b/dags/veda_data_pipeline/utils/build_stac/utils/events.py new file mode 100644 index 00000000..c7e835b9 --- /dev/null +++ b/dags/veda_data_pipeline/utils/build_stac/utils/events.py @@ -0,0 +1,19 @@ +from datetime import datetime +from typing import Dict, Literal, Optional + +from pydantic import BaseModel, Field + +INTERVAL = Literal["month", "year", "day"] + + +class RegexEvent(BaseModel, frozen=True): + collection: str + item_id: str + assets: Dict + + start_datetime: Optional[datetime] = None + end_datetime: Optional[datetime] = None + single_datetime: Optional[datetime] = None + + properties: Optional[Dict] = Field(default_factory=dict) + datetime_range: Optional[INTERVAL] = None diff --git a/dags/veda_data_pipeline/utils/build_stac/utils/regex.py b/dags/veda_data_pipeline/utils/build_stac/utils/regex.py new file mode 100644 index 00000000..6493e73d --- /dev/null +++ b/dags/veda_data_pipeline/utils/build_stac/utils/regex.py @@ -0,0 +1,91 @@ +import re +from datetime import datetime +from typing import Callable, Dict, Tuple, Union + +from dateutil.relativedelta import relativedelta + +from . import events + +DATERANGE = Tuple[datetime, datetime] + + +def _calculate_year_range(datetime_obj: datetime) -> DATERANGE: + start_datetime = datetime_obj.replace(month=1, day=1) + end_datetime = datetime_obj.replace(month=12, day=31) + return start_datetime, end_datetime + + +def _calculate_month_range(datetime_obj: datetime) -> DATERANGE: + start_datetime = datetime_obj.replace(day=1) + end_datetime = datetime_obj + relativedelta(day=31) + return start_datetime, end_datetime + + +def _calculate_day_range(datetime_obj: datetime) -> DATERANGE: + start_datetime = datetime_obj + end_datetime = datetime_obj + relativedelta(hour=23, minute=59, second=59) + return start_datetime, end_datetime + + +DATETIME_RANGE_METHODS: Dict[events.INTERVAL, Callable[[datetime], DATERANGE]] = { + "month": _calculate_month_range, + "year": _calculate_year_range, + "day": _calculate_day_range, +} + + +def extract_dates( + filename: str, datetime_range: events.INTERVAL +) -> Union[Tuple[datetime, datetime, None], Tuple[None, None, datetime]]: + """ + Extracts start & end or single date string from filename. + """ + DATE_REGEX_STRATEGIES = [ + (r"[_\.\-](\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", "%Y-%m-%dT%H:%M:%S"), + (r"[_\.\-](\d{8}T\d{6})", "%Y%m%dT%H%M%S"), + (r"[_\.\-](\d{4}_\d{2}_\d{2})", "%Y_%m_%d"), + (r"[_\.\-](\d{4}-\d{2}-\d{2})", "%Y-%m-%d"), + (r"[_\.\-](\d{8})", "%Y%m%d"), + (r"[_\.\-](\d{6})", "%Y%m"), + (r"[_\.\-](\d{4})", "%Y"), + ] + + # Find dates in filename + dates = [] + for pattern, dateformat in DATE_REGEX_STRATEGIES: + dates_found = re.compile(pattern).findall(filename) + if not dates_found: + continue + + for date_str in dates_found: + dates.append(datetime.strptime(date_str, dateformat)) + + break + + num_dates_found = len(dates) + + # No dates found + if not num_dates_found: + raise Exception( + f"No dates provided in {filename=}. " + "At least one date in format yyyy-mm-dd is required." + ) + + # Many dates found + if num_dates_found > 1: + dates.sort() + start_datetime, *_, end_datetime = dates + return start_datetime, end_datetime, None + + # Single date found + single_datetime = dates[0] + + # Convert single date to range + if datetime_range: + start_datetime, end_datetime = DATETIME_RANGE_METHODS[datetime_range]( + single_datetime + ) + return start_datetime, end_datetime, None + + # Return single date + return None, None, single_datetime diff --git a/dags/veda_data_pipeline/utils/build_stac/utils/role.py b/dags/veda_data_pipeline/utils/build_stac/utils/role.py new file mode 100644 index 00000000..817c0ad3 --- /dev/null +++ b/dags/veda_data_pipeline/utils/build_stac/utils/role.py @@ -0,0 +1,10 @@ +import boto3 + + +def assume_role(role_arn, session_name): + sts = boto3.client("sts") + creds = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + ) + return creds["Credentials"] diff --git a/dags/veda_data_pipeline/utils/build_stac/utils/stac.py b/dags/veda_data_pipeline/utils/build_stac/utils/stac.py new file mode 100644 index 00000000..9ec69f14 --- /dev/null +++ b/dags/veda_data_pipeline/utils/build_stac/utils/stac.py @@ -0,0 +1,142 @@ +import os + +import pystac +import rasterio +from pystac.utils import datetime_to_str +from rasterio.session import AWSSession +from rio_stac import stac +from rio_stac.stac import PROJECTION_EXT_VERSION, RASTER_EXT_VERSION + + +from . import events, regex, role + + +def get_sts_session(): + if role_arn := os.environ.get("EXTERNAL_ROLE_ARN"): + creds = role.assume_role(role_arn, "veda-data-pipelines_build-stac") + return AWSSession( + aws_access_key_id=creds["AccessKeyId"], + aws_secret_access_key=creds["SecretAccessKey"], + aws_session_token=creds["SessionToken"], + ) + return + + +def create_item( + item_id, + bbox, + properties, + datetime, + collection, + assets, +) -> pystac.Item: + """ + Function to create a stac item from a COG using rio_stac + """ + # item + item = pystac.Item( + id=item_id, + geometry=stac.bbox_to_geom(bbox), + bbox=bbox, + collection=collection, + stac_extensions=[ + f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json", + f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json", + ], + datetime=datetime, + properties=properties, + ) + + # if we add a collection we MUST add a link + if collection: + item.add_link( + pystac.Link( + pystac.RelType.COLLECTION, + collection, + media_type=pystac.MediaType.JSON, + ) + ) + + for key, asset in assets.items(): + item.add_asset(key=key, asset=asset) + return item + + +def generate_stac(event: events.RegexEvent) -> pystac.Item: + """ + Generate STAC item from user provided datetime range or regex & filename + """ + start_datetime = end_datetime = single_datetime = None + if event.start_datetime and event.end_datetime: + start_datetime = event.start_datetime + end_datetime = event.end_datetime + single_datetime = None + elif single_datetime := event.single_datetime: + start_datetime = end_datetime = None + single_datetime = single_datetime + else: + # Having multiple assets, we try against all filenames. + for asset_name, asset in event.assets.items(): + try: + filename = asset["href"].split("/")[-1] + start_datetime, end_datetime, single_datetime = regex.extract_dates( + filename, event.datetime_range + ) + break + except Exception: + continue + # Raise if dates can't be found + if not (start_datetime or end_datetime or single_datetime): + raise ValueError("No dates found in event config or by regex") + + properties = event.properties or {} + if start_datetime and end_datetime: + properties["start_datetime"] = datetime_to_str(start_datetime) + properties["end_datetime"] = datetime_to_str(end_datetime) + single_datetime = None + assets = {} + + rasterio_kwargs = {} + rasterio_kwargs["session"] = get_sts_session() + with rasterio.Env( + session=rasterio_kwargs.get("session"), + options={**rasterio_kwargs}, + ): + bboxes = [] + for asset_name, asset_definition in event.assets.items(): + with rasterio.open(asset_definition["href"]) as src: + # Get BBOX and Footprint + dataset_geom = stac.get_dataset_geom(src, densify_pts=0, precision=-1) + bboxes.append(dataset_geom["bbox"]) + + media_type = stac.get_media_type(src) + proj_info = { + f"proj:{name}": value + for name, value in stac.get_projection_info(src).items() + } + raster_info = {"raster:bands": stac.get_raster_info(src, max_size=1024)} + + # The default asset name for cogs is "cog_default", so we need to intercept 'default' + if asset_name == "default": + asset_name = "cog_default" + assets[asset_name] = pystac.Asset( + title=asset_definition["title"], + description=asset_definition["description"], + href=asset_definition["href"], + media_type=media_type, + roles=["data", "layer"], + extra_fields={**proj_info, **raster_info}, + ) + + minx, miny, maxx, maxy = zip(*bboxes) + bbox = [min(minx), min(miny), max(maxx), max(maxy)] + + create_item_response = create_item( + item_id=event.item_id, + bbox=bbox, + properties=properties, + datetime=single_datetime, + collection=event.collection, + assets=assets, + ) + return create_item_response diff --git a/dags/veda_data_pipeline/utils/cogify_transfer/handler.py b/dags/veda_data_pipeline/utils/cogify_transfer/handler.py new file mode 100644 index 00000000..0e9db5eb --- /dev/null +++ b/dags/veda_data_pipeline/utils/cogify_transfer/handler.py @@ -0,0 +1,86 @@ +import re +import tempfile + +import boto3 +from rio_cogeo.cogeo import cog_translate + + +def assume_role(role_arn, session_name="veda-airflow-pipelines_transfer_files"): + sts = boto3.client("sts") + credentials = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + ) + creds = credentials["Credentials"] + return { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds.get("SecretAccessKey"), + "aws_session_token": creds.get("SessionToken"), + } + + +def get_matching_files(s3_client, bucket, prefix, regex_pattern): + matching_files = [] + + response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) + while True: + for obj in response["Contents"]: + file_key = obj["Key"] + if re.match(regex_pattern, file_key): + matching_files.append(file_key) + + if "NextContinuationToken" in response: + response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=prefix, + ContinuationToken=response["NextContinuationToken"], + ) + else: + break + + return matching_files + + +def transfer_file(s3_client, file_key, local_file_path, destination_bucket, collection): + filename = file_key.split("/")[-1] + target_key = f"{collection}/{filename}" + s3_client.upload_file(local_file_path, destination_bucket, target_key) + + +def cogify_transfer_handler(event_src, external_role_arn=None): + event = event_src.copy() + kwargs = {} + if external_role_arn: + creds = assume_role(external_role_arn, "veda-data-pipelines_data-transfer") + kwargs = { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds["SecretAccessKey"], + "aws_session_token": creds["SessionToken"], + } + source_s3 = boto3.client("s3") + target_s3 = boto3.client("s3", **kwargs) + + origin_bucket = event.get("origin_bucket") + origin_prefix = event.get("origin_prefix") + regex_pattern = event.get("filename_regex") + target_bucket = event.get("target_bucket", "veda-data-store-staging") + collection = event.get("collection") + + matching_files = get_matching_files( + source_s3, origin_bucket, origin_prefix, regex_pattern + ) + if not event.get("dry_run"): + for origin_key in matching_files: + with tempfile.NamedTemporaryFile() as local_tif, tempfile.NamedTemporaryFile() as local_cog: + local_tif_path = local_tif.name + local_cog_path = local_cog.name + source_s3.download_file(origin_bucket, origin_key, local_tif_path) + cog_translate(local_tif_path, local_cog_path, quiet=True) + filename = origin_key.split("/")[-1] + destination_key = f"{collection}/{filename}" + target_s3.upload_file(local_cog_path, target_bucket, destination_key) + else: + print( + f"Would have copied {len(matching_files)} files from {origin_bucket} to {target_bucket}" + ) + print(f"Files matched: {matching_files}") diff --git a/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt b/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt new file mode 100644 index 00000000..56e091b1 --- /dev/null +++ b/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt @@ -0,0 +1,11 @@ +aws-lambda-powertools +awslambdaric +boto3 +pystac==1.4.0 +python-cmr +rasterio==1.3.3 +rio-cogeo==4.0.0 +shapely +smart-open==6.3.0 +pydantic==1.10.7 +typing-extensions==4.5.0 diff --git a/dags/veda_data_pipeline/utils/collection_generation.py b/dags/veda_data_pipeline/utils/collection_generation.py index abba2de5..e04e1356 100644 --- a/dags/veda_data_pipeline/utils/collection_generation.py +++ b/dags/veda_data_pipeline/utils/collection_generation.py @@ -27,6 +27,7 @@ class GenerateCollection: "is_periodic", "time_density", "type", + "transfer" ] def get_template(self, dataset: Dict[str, Any]) -> dict: @@ -97,14 +98,13 @@ def create_cog_collection(self, dataset: Dict[str, Any]) -> dict: # Override the extents if they exists if spatial_extent := dataset.get("spatial_extent"): - collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]}, + collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]} if temporal_extent := dataset.get("temporal_extent"): collection_stac["extent"]["temporal"] = { "interval": [ - # most of our data uses the Z suffix for UTC - isoformat() doesn't [ - datetime.fromisoformat(x).astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + x if x else None for x in list(temporal_extent.values()) ] diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 5a275701..4b164298 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -216,7 +216,7 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No key = f"s3://{bucket_output}/events/{collection}" records = 0 out_keys = [] - discovered = 0 + discovered = [] kwargs = assume_role(role_arn=role_arn) if role_arn else {} s3client = boto3.client("s3", **kwargs) @@ -277,13 +277,13 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No if records == chunk_size: out_keys.append(generate_payload(s3_prefix_key=key, payload=payload)) records = 0 - discovered += len(payload["objects"]) + discovered.append(len(payload["objects"])) payload["objects"] = [] records += 1 if payload["objects"]: out_keys.append(generate_payload(s3_prefix_key=key, payload=payload)) - discovered += len(payload["objects"]) + discovered.append(len(payload["objects"])) # We need to make sure the payload isn't too large for ECS overrides try: del event["assets"] diff --git a/dags/veda_data_pipeline/utils/submit_stac.py b/dags/veda_data_pipeline/utils/submit_stac.py index 1d4edfca..22542ab8 100644 --- a/dags/veda_data_pipeline/utils/submit_stac.py +++ b/dags/veda_data_pipeline/utils/submit_stac.py @@ -103,7 +103,7 @@ def submission_handler( cognito_app_secret=None, stac_ingestor_api_url=None, context=None, -) -> None: +) -> None | dict: if context is None: context = {} @@ -121,8 +121,7 @@ def submission_handler( secret_id=cognito_app_secret, base_url=stac_ingestor_api_url, ) - ingestor.submit(event=stac_item, endpoint=endpoint) - # print("Successfully submitted STAC item") + return ingestor.submit(event=stac_item, endpoint=endpoint) if __name__ == "__main__": diff --git a/dags/veda_data_pipeline/utils/transfer.py b/dags/veda_data_pipeline/utils/transfer.py index 20823f37..a56e2e23 100644 --- a/dags/veda_data_pipeline/utils/transfer.py +++ b/dags/veda_data_pipeline/utils/transfer.py @@ -44,6 +44,7 @@ def get_matching_files(s3_client, bucket, prefix, regex_pattern): def transfer_files_within_s3( s3_client, origin_bucket, matching_files, destination_bucket, collection ): + transfer_exceptions = False for file_key in matching_files: filename = file_key.split("/")[-1] # print(f"Transferring file: {filename}") @@ -66,12 +67,22 @@ def transfer_files_within_s3( ) except s3_client.exceptions.ClientError as err: if err.response["Error"]["Code"] == "404": - # print(f"Copying file: {filename}") + # File not found OK to copy s3_client.copy_object( CopySource=copy_source, Bucket=destination_bucket, Key=target_key ) + else: + msg = f"ClientError copying {filename=} {err=}" + print(msg) + transfer_exceptions = True + except Exception as e: + msg = f"Exception copying {filename=} {e=}" + print(msg) + transfer_exceptions = True + if transfer_exceptions: + raise Exception(f"{transfer_exceptions=}") def data_transfer_handler(event, role_arn=None): diff --git a/dags/veda_data_pipeline/utils/vector_ingest/handler.py b/dags/veda_data_pipeline/utils/vector_ingest/handler.py new file mode 100644 index 00000000..09e7d437 --- /dev/null +++ b/dags/veda_data_pipeline/utils/vector_ingest/handler.py @@ -0,0 +1,377 @@ +import base64 +from argparse import ArgumentParser +import boto3 +import os +import subprocess +import json +import smart_open +from urllib.parse import urlparse +import psycopg2 +import geopandas as gpd +from shapely import wkb +from geoalchemy2 import Geometry +import sqlalchemy +from sqlalchemy import create_engine, MetaData, Table, Column, inspect +import concurrent.futures +from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION, INTEGER, VARCHAR, TIMESTAMP + + +def download_file(file_uri: str, role_arn:[str, None]): + session = boto3.Session() + if role_arn: + sts = boto3.client("sts") + response = sts.assume_role( + RoleArn=role_arn, + RoleSessionName="airflow_vector_ingest", + ) + session = boto3.Session( + aws_access_key_id=response["Credentials"]["AccessKeyId"], + aws_secret_access_key=response["Credentials"]["SecretAccessKey"], + aws_session_token=response["Credentials"]["SessionToken"], + ) + s3 = session.client("s3") + + url_parse = urlparse(file_uri) + + bucket = url_parse.netloc + path = url_parse.path[1:] + filename = url_parse.path.split("/")[-1] + target_filepath = os.path.join("/tmp", filename) + + s3.download_file(bucket, path, target_filepath) + + print(f"downloaded {target_filepath}") + + + return target_filepath + + +def get_connection_string(secret: dict, as_uri: bool = False) -> str: + if as_uri: + return f"postgresql://{secret['username']}:{secret['password']}@{secret['host']}:5432/{secret['dbname']}" + else: + return f"PG:host={secret['host']} dbname={secret['dbname']} user={secret['username']} password={secret['password']}" + + +def get_gdf_schema(gdf, target_projection): + """map GeoDataFrame columns into a table schema + + :param gdf: GeoDataFrame from geopandas + :param target_projection: srid for the target table geometry column + :return: + """ + # map geodatafrome dtypes to sqlalchemy types + dtype_map = { + "int64": INTEGER, + "float64": DOUBLE_PRECISION, + "object": VARCHAR, + "datetime64": TIMESTAMP, + } + schema = [] + for column, dtype in zip(gdf.columns, gdf.dtypes): + if str(dtype) == "geometry": + # do not inpsect to retrieve geom type, just use generic GEOMETRY + # geom_type = str(gdf[column].geom_type.unique()[0]).upper() + geom_type = str(dtype).upper() + # do not taKe SRID from existing file for target table + # we always want to transform from file EPSG to Table EPSG() + column_type = Geometry(geometry_type=geom_type, srid=target_projection) + else: + dtype_str = str(dtype) + column_type = dtype_map.get(dtype_str.split("[")[0], VARCHAR) + + if column == "primarykey": + schema.append(Column(column.lower(), column_type, unique=True)) + else: + schema.append(Column(column.lower(), column_type)) + return schema + + +def ensure_table_exists( + db_metadata: MetaData, gpkg_file: str, target_projection: int, table_name: str +): + """create a table if it doesn't exist or just + validate GeoDataFrame columns against existing table + + :param db_metadata: instance of sqlalchemy.MetaData + :param gpkg_file: file path to GPKG + :param target_projection: srid for target DB table geometry column + :param table_name: name of table to create + :return: None + """ + gdf = gpd.read_file(gpkg_file) + gdf_schema = get_gdf_schema(gdf, target_projection) + engine = db_metadata.bind + try: + Table(table_name, db_metadata, autoload_with=engine) + except sqlalchemy.exc.NoSuchTableError: + Table(table_name, db_metadata, *gdf_schema) + db_metadata.create_all(engine) + + # validate gdf schema against existing table schema + insp = inspect(engine) + existing_columns = insp.get_columns(table_name) + existing_column_names = [col["name"] for col in existing_columns] + for column in gdf_schema: + if column.name not in existing_column_names: + raise ValueError( + f"your .gpkg seems to have a column={column.name} that does not exist in the existing table columns={existing_column_names}" + ) + + +def delete_region( + engine, + gpkg_path: str, + table_name: str, +): + gdf = gpd.read_file(gpkg_path) + if 'region' in gdf.columns: + region_name = gdf["region"].iloc[0] + with engine.connect() as conn: + with conn.begin(): + delete_sql = sqlalchemy.text( + f""" + DELETE FROM {table_name} WHERE region=:region_name + """ + ) + conn.execute(delete_sql, {'region_name': region_name}) + else: + print(f"'region' column not found in {gpkg_path}. No records deleted.") + + +def upsert_to_postgis( + engine, + gpkg_path: str, + target_projection: int, + table_name: str, + batch_size: int = 10000, +): + """batch the GPKG file and upsert via threads + + :param engine: instance of sqlalchemy.Engine + :param gpkg_path: file path to GPKG + :param table_name: name of the target table + :param batch_size: upper limit of batch size + :return: + """ + gdf = gpd.read_file(gpkg_path) + source_epsg_code = gdf.crs.to_epsg() + if not source_epsg_code: + # assume NAD27 Equal Area for now :shrug: + # since that's what the default is for Fire Atlas team exports + # that's what PROJ4 does under the hood for 9311 :wethinksmirk: + source_epsg_code = 2163 + + # convert the `t` column to something suitable for sql insertion otherwise we get 'Timestamp()' + gdf["t"] = gdf["t"].dt.strftime("%Y-%m-%d %H:%M:%S") + # convert to WKB + gdf["geometry"] = gdf["geometry"].apply(lambda geom: wkb.dumps(geom, hex=True)) + + def upsert_batch(batch): + with engine.connect() as conn: + with conn.begin(): + for row in batch.to_dict(orient="records"): + # make sure all column names are lower case for keys and values + row = {k.lower(): v for k, v in row.items()} + columns = [col.lower() for col in batch.columns] + + non_geom_placeholders = ", ".join( + [f":{col}" for col in columns[:-1]] + ) + # NOTE: we need to escape `::geometry` so parameterized statements don't try to replace it + # because parametrized statements in sqlalchemy are `:` + geom_placeholder = f"ST_Transform(ST_SetSRID(ST_GeomFromWKB(:geometry\:\:geometry), {source_epsg_code}), {target_projection})" # noqa: W605 + upsert_sql = sqlalchemy.text( + f""" + INSERT INTO {table_name} ({', '.join([col for col in columns])}) + VALUES ({non_geom_placeholders},{geom_placeholder}) + ON CONFLICT (primarykey) + DO UPDATE SET {', '.join(f"{col}=EXCLUDED.{col}" for col in columns if col != 'primarykey')} + """ + ) + + # logging.debug(f"[ UPSERT SQL ]:\n{str(upsert_sql)}") + conn.execute(upsert_sql, row) + + batches = [gdf.iloc[i : i + batch_size] for i in range(0, len(gdf), batch_size)] + # set `max_workers` to something below max concurrent connections for postgresql + # https://www.postgresql.org/docs/14/runtime-config-connection.html + with concurrent.futures.ThreadPoolExecutor(max_workers=75) as executor: + executor.map(upsert_batch, batches) + + +def get_secret(secret_name: str, region_name: str = "us-west-2") -> None: + """Retrieve secrets from AWS Secrets Manager + + Args: + secret_name (str): name of aws secrets manager secret containing database connection secrets + + Returns: + secrets (dict): decrypted secrets in dict + """ + + # Create a Secrets Manager client + session = boto3.session.Session(region_name=region_name) + client = session.client(service_name="secretsmanager") + + # In this sample we only handle the specific exceptions for the 'GetSecretValue' API. + # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html + # We rethrow the exception by default. + + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + + # Decrypts secret using the associated KMS key. + # Depending on whether the secret is a string or binary, one of these fields will be populated. + if "SecretString" in get_secret_value_response: + return json.loads(get_secret_value_response["SecretString"]) + else: + return json.loads(base64.b64decode(get_secret_value_response["SecretBinary"])) + + +def load_to_featuresdb( + filename: str, + collection: str, + vector_secret_name: str, + extra_flags: list = None, + target_projection: str = "EPSG:4326", +): + if extra_flags is None: + extra_flags = ["-overwrite", "-progress"] + + secret_name = vector_secret_name + + con_secrets = get_secret(secret_name) + connection = get_connection_string(con_secrets) + + print(f"running ogr2ogr import for collection: {collection}") + options = [ + "ogr2ogr", + "-f", + "PostgreSQL", + connection, + "-t_srs", + target_projection, + filename, + "-nln", + collection, + *extra_flags, + ] + out = subprocess.run( + options, + check=False, + capture_output=True, + ) + + if out.stderr: + error_description = f"Error: {out.stderr}" + print(error_description) + return {"status": "failure", "reason": error_description} + + return {"status": "success"} + + +def load_to_featuresdb_eis( + filename: str, + collection: str, + vector_secret_name: str, + target_projection: int = 4326, +): + """create table if not exists and upload GPKG + + :param filename: the file path to the downloaded GPKG + :param collection: the name of the collection + :param target_projection: srid for the target table + :return: None + """ + secret_name = vector_secret_name + conn_secrets = get_secret(secret_name) + connection_string = get_connection_string(conn_secrets, as_uri=True) + + # NOTE: about `collection.rsplit` below: + # + # EIS Fire team naming convention for outputs + # Snapshots: "snapshot_{layer_name}_nrt_{region_name}.gpkg" + # Lf_archive: "lf_{layer_name}_archive_{region_name}.gpkg" + # Lf_nrt: "lf_{layer_name}_nrt_{region_name}.gpkg" + # + # Insert/Alter on table call everything except the region name: + # e.g. `snapshot_perimeter_nrt_conus` this gets inserted into the table `eis_fire_snapshot_perimeter_nrt` + collection = collection.rsplit("_", 1)[0] + target_table_name = f"eis_fire_{collection}" + + engine = create_engine(connection_string) + metadata = MetaData() + metadata.bind = engine + + ensure_table_exists(metadata, filename, target_projection, target_table_name) + delete_region(engine, filename, target_table_name) + upsert_to_postgis(engine, filename, target_projection, target_table_name) + return {"status": "success"} + + +def alter_datetime_add_indexes_eis(collection: str,vector_secret_name: str ): + # NOTE: about `collection.rsplit` below: + # + # EIS Fire team naming convention for outputs + # Snapshots: "snapshot_{layer_name}_nrt_{region_name}.gpkg" + # Lf_archive: "lf_{layer_name}_archive_{region_name}.gpkg" + # Lf_nrt: "lf_{layer_name}_nrt_{region_name}.gpkg" + # + # Insert/Alter on table call everything except the region name: + # e.g. `snapshot_perimeter_nrt_conus` this gets inserted into the table `eis_fire_snapshot_perimeter_nrt` + collection = collection.rsplit("_", 1)[0] + + secret_name = vector_secret_name + conn_secrets = get_secret(secret_name) + conn = psycopg2.connect( + host=conn_secrets["host"], + dbname=conn_secrets["dbname"], + user=conn_secrets["username"], + password=conn_secrets["password"], + ) + + cur = conn.cursor() + cur.execute( + f"ALTER table eis_fire_{collection} " + f"ALTER COLUMN t TYPE TIMESTAMP USING t::timestamp without time zone; " + f"CREATE INDEX IF NOT EXISTS idx_eis_fire_{collection}_datetime ON eis_fire_{collection}(t);" + f"CREATE INDEX IF NOT EXISTS idx_eis_fire_{collection}_primarykey ON eis_fire_{collection}(primarykey);" + f"CREATE INDEX IF NOT EXISTS idx_eis_fire_{collection}_region ON eis_fire_{collection}(region);" + ) + conn.commit() + + +def handler(payload_src: dict, vector_secret_name: str, assume_role_arn: [str, None]): + + payload_event = payload_src.copy() + s3_event = payload_event.pop("payload") + with smart_open.open(s3_event, "r") as _file: + s3_event_read = _file.read() + event_received = json.loads(s3_event_read) + s3_objects = event_received["objects"] + status = list() + for s3_object in s3_objects: + href = s3_object["assets"]["default"]["href"] + collection = s3_object["collection"] + downloaded_filepath = download_file(href, assume_role_arn) + print(f"[ DOWNLOAD FILEPATH ]: {downloaded_filepath}") + print(f"[ COLLECTION ]: {collection}") + + s3_object_prefix = event_received["prefix"] + if s3_object_prefix.startswith("EIS/"): + coll_status = load_to_featuresdb_eis(downloaded_filepath, collection, vector_secret_name) + else: + coll_status = load_to_featuresdb(downloaded_filepath, collection, vector_secret_name) + + status.append(coll_status) + # delete file after ingest + os.remove(downloaded_filepath) + + if coll_status["status"] == "success" and s3_object_prefix.startswith("EIS/"): + alter_datetime_add_indexes_eis(collection, vector_secret_name) + elif coll_status["status"] != "success": + # bubble exception so Airflow shows it as a failure + raise Exception(coll_status["reason"]) + return status + + diff --git a/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt b/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt new file mode 100644 index 00000000..35d23946 --- /dev/null +++ b/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt @@ -0,0 +1,7 @@ +smart-open==6.3.0 +psycopg2-binary==2.9.9 +requests==2.30.0 +boto3==1.26.129 +GeoAlchemy2==0.14.2 +geopandas==0.14.4 +SQLAlchemy==2.0.23 diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index d456a80a..6c5f1fc6 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -1,36 +1,38 @@ import pendulum from airflow import DAG -from airflow.decorators import task +from airflow.models.param import Param +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task from airflow.operators.dummy_operator import DummyOperator as EmptyOperator -from airflow.utils.trigger_rule import TriggerRule from veda_data_pipeline.groups.collection_group import collection_task_group -from veda_data_pipeline.groups.discover_group import subdag_discover +from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task, extract_discovery_items_from_payload, remove_thumbnail_asset -dag_doc_md = """ +template_dag_run_conf = { + "collection": "", + "data_type": "cog", + "description": "", + "discovery_items": + [ + { + "bucket": "", + "datetime_range": "", + "discovery": "s3", + "filename_regex": "", + "prefix": "" + } + ], + "is_periodic": Param(True, type="boolean"), + "license": "", + "time_density": "", + "title": "" +} + +dag_doc_md = f""" ### Dataset Pipeline Generates a collection and triggers the file discovery process #### Notes - This DAG can run with the following configuration
```json -{ - "collection": "collection-id", - "data_type": "cog", - "description": "collection description", - "discovery_items": - [ - { - "bucket": "veda-data-store-staging", - "datetime_range": "year", - "discovery": "s3", - "filename_regex": "^(.*).tif$", - "prefix": "example-prefix/" - } - ], - "is_periodic": true, - "license": "collection-LICENSE", - "time_density": "year", - "title": "collection-title" -} +{template_dag_run_conf} ``` """ @@ -42,39 +44,13 @@ "tags": ["collection", "discovery"], } -@task -def extract_discovery_items(**kwargs): - ti = kwargs.get("ti") - discovery_items = ti.dag_run.conf.get("discovery_items") - print(discovery_items) - return discovery_items - -template_dag_run_conf = { - "collection": "", - "data_type": "cog", - "description": "", - "discovery_items": - [ - { - "bucket": "", - "datetime_range": "", - "discovery": "s3", - "filename_regex": "", - "prefix": "" - } - ], - "is_periodic": "", - "license": "", - "time_density": "", - "title": "" -} - with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag: - start = EmptyOperator(task_id="start", dag=dag) - end = EmptyOperator(task_id="end", dag=dag) - - collection_grp = collection_task_group() - discover_grp = subdag_discover.expand(event=extract_discovery_items()) - - start >> collection_grp >> discover_grp >> end - + start = EmptyOperator(task_id="start") + end = EmptyOperator(task_id="end") + + mutated_payloads = start >> collection_task_group() >> remove_thumbnail_asset() + discovery_items = extract_discovery_items_from_payload(payload=mutated_payloads) + discover = discover_from_s3_task.partial(payload=mutated_payloads).expand(event=discovery_items) + get_files = get_files_task(payload=discover) + build_stac = build_stac_task.expand(payload=get_files) + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) >> end diff --git a/dags/veda_data_pipeline/veda_discover_pipeline.py b/dags/veda_data_pipeline/veda_discover_pipeline.py index 37a5d520..0b0f8d1d 100644 --- a/dags/veda_data_pipeline/veda_discover_pipeline.py +++ b/dags/veda_data_pipeline/veda_discover_pipeline.py @@ -1,8 +1,9 @@ import pendulum from airflow import DAG from airflow.operators.dummy_operator import DummyOperator -from airflow.utils.trigger_rule import TriggerRule -from veda_data_pipeline.groups.discover_group import subdag_discover +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task + +from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task dag_doc_md = """ ### Discover files from S3 @@ -46,7 +47,7 @@ "is_paused_upon_creation": False, } -templat_dag_run_conf = { +template_dag_run_conf = { "collection": "", "bucket": "", "prefix": "/", @@ -69,22 +70,33 @@ } -def get_discover_dag(id, event={}): - params_dag_run_conf = event or templat_dag_run_conf + + + +def get_discover_dag(id, event=None): + if not event: + event = {} + params_dag_run_conf = event or template_dag_run_conf with DAG( - id, - schedule_interval=event.get("schedule"), - params=params_dag_run_conf, - **dag_args + id, + schedule_interval=event.get("schedule"), + params=params_dag_run_conf, + **dag_args ) as dag: start = DummyOperator(task_id="Start", dag=dag) end = DummyOperator( - task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag + task_id="End", dag=dag ) + # define DAG using taskflow notation - discover_grp = subdag_discover(event) + discover = discover_from_s3_task(event=event) + get_files = get_files_task(payload=discover) + build_stac = build_stac_task.expand(payload=get_files) + # .output is needed coming from a non-taskflow operator + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) - start >> discover_grp >> end + discover.set_upstream(start) + submit_stac.set_downstream(end) return dag diff --git a/dags/veda_data_pipeline/veda_generic_vector_pipeline.py b/dags/veda_data_pipeline/veda_generic_vector_pipeline.py new file mode 100644 index 00000000..b359d6dd --- /dev/null +++ b/dags/veda_data_pipeline/veda_generic_vector_pipeline.py @@ -0,0 +1,84 @@ +import pendulum +from airflow.decorators import task +from airflow import DAG +from airflow.models.variable import Variable +import json +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.trigger_rule import TriggerRule +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process + +dag_doc_md = """ +### Generic Ingest Vector +#### Purpose +This DAG is used to ingest vector data for use in the VEDA Features API + +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "", + "prefix": "transformed_csv/", + "bucket": "ghgc-data-store-develop", + "filename_regex": ".*.csv$", + "discovery": "s3", + "datetime_range": "month", + "vector": true, + "id_regex": "", + "id_template": "NIST_Urban_Testbed_test-{}", + "datetime_range": "", + "vector": true, + "x_possible": "longitude", + "y_possible": "latitude", + "source_projection": "EPSG:4326", + "target_projection": "EPSG:4326", + "extra_flags": ["-overwrite", "-lco", "OVERWRITE=YES"] + "discovered": 33, + "payload": "s3://data-pipeline-ghgc-dev-mwaa-597746869805/events/test_layer_name2/s3_discover_output_f88257e8-ee50-4a14-ace4-5612ae6ebf38.jsonn" +} +``` +- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) +""" + +template_dag_run_conf = { + "collection": "", + "prefix": "/", + "bucket": "", + "filename_regex": "", + "id_template": "-{}", + "datetime_range": "|", + "vector": "false | true", + "x_possible": "", + "y_possible": "", + "source_projection": "", + "target_projection": "", + "extra_flags": "", + "payload": "", +} +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, +} + + +@task +def ingest_vector_task(payload): + from veda_data_pipeline.utils.vector_ingest.handler import handler + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + read_role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") + vector_secret_name = airflow_vars_json.get("VECTOR_SECRET_NAME") + return handler(payload_src=payload, vector_secret_name=vector_secret_name, + assume_role_arn=read_role_arn) + + +with DAG(dag_id="veda_generic_ingest_vector", params=template_dag_run_conf, **dag_args) as dag: + + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) + discover = discover_from_s3_task() + get_files = get_files_to_process(payload=discover) + vector_ingest = ingest_vector_task.expand(payload=get_files) + discover.set_upstream(start) + vector_ingest.set_downstream(end) diff --git a/dags/veda_data_pipeline/veda_process_generic_vector_pipeline.py b/dags/veda_data_pipeline/veda_process_generic_vector_pipeline.py deleted file mode 100644 index 226c3314..00000000 --- a/dags/veda_data_pipeline/veda_process_generic_vector_pipeline.py +++ /dev/null @@ -1,120 +0,0 @@ -import pendulum -from airflow import DAG -from airflow.models.variable import Variable -from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.utils.trigger_rule import TriggerRule - -from datetime import timedelta - -dag_doc_md = """ -### Generic Ingest Vector -#### Purpose -This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API - -#### Notes -- This DAG can run with the following configuration
-```json -{ - "collection": "", - "prefix": "transformed_csv/", - "bucket": "ghgc-data-store-develop", - "filename_regex": ".*.csv$", - "discovery": "s3", - "datetime_range": "month", - "vector": true, - "id_regex": "", - "id_template": "NIST_Urban_Testbed_test-{}", - "datetime_range": "", - "vector": true, - "x_possible": "longitude", - "y_possible": "latitude", - "source_projection": "EPSG:4326", - "target_projection": "EPSG:4326", - "extra_flags": ["-overwrite", "-lco", "OVERWRITE=YES"] - "discovered": 33, - "payload": "s3://data-pipeline-ghgc-dev-mwaa-597746869805/events/test_layer_name2/s3_discover_output_f88257e8-ee50-4a14-ace4-5612ae6ebf38.jsonn" -} -``` -- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) -""" - -templat_dag_run_conf = { - "collection": "", - "prefix": "/", - "bucket": "", - "filename_regex": "", - "id_template": "-{}", - "datetime_range": "|", - "vector": "false | true", - "x_possible": "", - "y_possible": "", - "source_projection": "", - "target_projection": "", - "extra_flags": "", - "payload": "", -} -dag_args = { - "start_date": pendulum.today("UTC").add(days=-1), - "schedule_interval": None, - "catchup": False, - "doc_md": dag_doc_md, -} - -with DAG(dag_id="veda_generic_ingest_vector", params=templat_dag_run_conf, **dag_args) as dag: - start = DummyOperator(task_id="Start", dag=dag) - end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) - - mwaa_stack_conf = Variable.get( - "MWAA_STACK_CONF", default_var={}, deserialize_json=True - ) - vector_ecs_conf = Variable.get("VECTOR_ECS_CONF", deserialize_json=True) - - generic_ingest_vector = EcsRunTaskOperator( - task_id="generic_ingest_vector", - trigger_rule=TriggerRule.NONE_FAILED, - cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", - task_definition=f"{mwaa_stack_conf.get('PREFIX')}-generic-vector-tasks", - launch_type="FARGATE", - do_xcom_push=True, - execution_timeout=timedelta(minutes=120), - overrides={ - "containerOverrides": [ - { - "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-generic_vector_ingest", - "command": [ - "/var/lang/bin/python", - "handler.py", - "--payload", - "{}".format("{{ task_instance.dag_run.conf }}"), - ], - "environment": [ - { - "name": "EXTERNAL_ROLE_ARN", - "value": Variable.get( - "ASSUME_ROLE_READ_ARN", default_var="" - ), - }, - { - "name": "AWS_REGION", - "value": mwaa_stack_conf.get("AWS_REGION"), - }, - { - "name": "VECTOR_SECRET_NAME", - "value": Variable.get("VECTOR_SECRET_NAME"), - }, - ], - }, - ], - }, - network_configuration={ - "awsvpcConfiguration": { - "securityGroups": vector_ecs_conf.get("VECTOR_SECURITY_GROUP") + mwaa_stack_conf.get("SECURITYGROUPS"), - "subnets": vector_ecs_conf.get("VECTOR_SUBNETS"), - }, - }, - awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"), - awslogs_stream_prefix=f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-generic-vector_ingest", # prefix with container name - ) - - start >> generic_ingest_vector >> end diff --git a/dags/veda_data_pipeline/veda_process_vector_pipeline.py b/dags/veda_data_pipeline/veda_process_vector_pipeline.py deleted file mode 100644 index 57099cc5..00000000 --- a/dags/veda_data_pipeline/veda_process_vector_pipeline.py +++ /dev/null @@ -1,108 +0,0 @@ -import pendulum -from airflow import DAG -from airflow.models.variable import Variable -from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.utils.trigger_rule import TriggerRule - -from datetime import timedelta - -dag_doc_md = """ -### Build and submit stac -#### Purpose -This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API - -#### Notes -- This DAG can run with the following configuration
-```json -{ - "collection": "geoglam", - "prefix": "geoglam/", - "bucket": "veda-data-store-staging", - "filename_regex": "^(.*).tif$", - "discovery": "s3", - "datetime_range": "month", - "upload": false, - "cogify": false, - "discovered": 33, - "payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-19d164531cdc.json" -} -``` -- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) -""" - -templat_dag_run_conf = { - "collection": "", - "prefix": "/", - "bucket": "", - "filename_regex": "", - "discovery": "|cmr", - "datetime_range": "|", - "upload": " | true", - "cogify": "false | true", - "payload": "> ingest_vector >> end diff --git a/dags/veda_data_pipeline/veda_promotion_pipeline.py b/dags/veda_data_pipeline/veda_promotion_pipeline.py new file mode 100644 index 00000000..02c8a15f --- /dev/null +++ b/dags/veda_data_pipeline/veda_promotion_pipeline.py @@ -0,0 +1,100 @@ +import pendulum +from airflow import DAG +from airflow.decorators import task +from airflow.operators.dummy_operator import DummyOperator as EmptyOperator +from airflow.models.variable import Variable +import json +from veda_data_pipeline.groups.collection_group import collection_task_group +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_dataset_files_to_process +from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task, extract_discovery_items_from_payload, remove_thumbnail_asset +from veda_data_pipeline.groups.transfer_group import transfer_data + +dag_doc_md = """ +### Promotion Pipeline +Generates a collection and triggers the file discovery process. +This DAG uses the same input ad `veda-dataset-pipeline` but adds the ability to transfer assets to the production bucket. +This will mutate the payload, so that item references will target the new asset locations. +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "collection-id", + "data_type": "cog", + "description": "collection description", + "discovery_items": + [ + { + "bucket": "veda-data-store-staging", + "datetime_range": "year", + "discovery": "s3", + "filename_regex": "^(.*).tif$", + "prefix": "example-prefix/" + } + ], + "is_periodic": true, + "license": "collection-LICENSE", + "time_density": "year", + "title": "collection-title" +} +``` +""" + +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, + "tags": ["collection", "discovery"], +} + +template_dag_run_conf = { + "collection": "", + "data_type": "cog", + "description": "", + "discovery_items": + [ + { + "bucket": "", + "datetime_range": "", + "discovery": "s3", + "filename_regex": "", + "prefix": "" + } + ], + "is_periodic": "", + "license": "", + "time_density": "", + "title": "", + "transfer": " # transfer assets to production bucket if true (false by default)", +} + +@task(max_active_tis_per_dag=3) +def transfer_assets_to_production_bucket(payload): + transfer_data(payload) + # if transfer complete, update discovery payload to reflect new bucket + payload.update({"bucket": "veda-data-store"}) + payload.update({"prefix": payload.get("collection")+"/"}) + return payload + +with DAG("veda_promotion_pipeline", params=template_dag_run_conf, **dag_args) as dag: + # ECS dependency variable + + start = EmptyOperator(task_id="start", dag=dag) + end = EmptyOperator(task_id="end", dag=dag) + + collection_grp = collection_task_group() + mutate_payload_task = remove_thumbnail_asset() + extract_from_payload = extract_discovery_items_from_payload() + + # asset transfer to production bucket + transfer_task = transfer_assets_to_production_bucket.expand(payload=extract_from_payload) + discover = discover_from_s3_task.partial(payload=mutate_payload_task).expand(event=transfer_task) + discover.set_upstream(collection_grp) # do not discover until collection exists + + get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step + build_stac = build_stac_task.expand(payload=get_files) + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) + + collection_grp.set_upstream(start) + mutate_payload_task.set_upstream(start) + submit_stac.set_downstream(end) diff --git a/dags/veda_data_pipeline/veda_process_raster_pipeline.py b/dags/veda_data_pipeline/veda_vector_pipeline.py similarity index 52% rename from dags/veda_data_pipeline/veda_process_raster_pipeline.py rename to dags/veda_data_pipeline/veda_vector_pipeline.py index 2555c6a9..c264ea92 100644 --- a/dags/veda_data_pipeline/veda_process_raster_pipeline.py +++ b/dags/veda_data_pipeline/veda_vector_pipeline.py @@ -1,8 +1,11 @@ import pendulum +from airflow.decorators import task from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.trigger_rule import TriggerRule -from veda_data_pipeline.groups.processing_group import subdag_process +from airflow.models.variable import Variable +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process +import json dag_doc_md = """ ### Build and submit stac @@ -19,8 +22,10 @@ "filename_regex": "^(.*).tif$", "discovery": "s3", "datetime_range": "month", + "upload": false, + "cogify": false, "discovered": 33, - "payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-.json" + "payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-19d164531cdc.json" } ``` - [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) @@ -31,22 +36,36 @@ "prefix": "/", "bucket": "", "filename_regex": "", - "discovery": "", + "discovery": "|cmr", "datetime_range": "|", - "payload": " | true", + "cogify": "false | true" } dag_args = { - "max_active_runs": 20, "start_date": pendulum.today("UTC").add(days=-1), "schedule_interval": None, "catchup": False, "doc_md": dag_doc_md, } -with DAG(dag_id="veda_ingest_raster", params=template_dag_run_conf, **dag_args) as dag: - start = DummyOperator(task_id="Start", dag=dag) - end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) - process_grp = subdag_process() +@task +def ingest_vector_task(payload): + from veda_data_pipeline.utils.vector_ingest.handler import handler + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + read_role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") + vector_secret_name = airflow_vars_json.get("VECTOR_SECRET_NAME") + return handler(payload_src=payload, vector_secret_name=vector_secret_name, + assume_role_arn=read_role_arn) - start >> process_grp >> end + +with DAG(dag_id="veda_ingest_vector", params=template_dag_run_conf, **dag_args) as dag: + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) + discover = discover_from_s3_task() + get_files = get_files_to_process(payload=discover) + vector_ingest = ingest_vector_task.expand(payload=get_files) + discover.set_upstream(start) + vector_ingest.set_downstream(end) diff --git a/docker_tasks/build_stac/handler.py b/docker_tasks/build_stac/handler.py index 748e7eb4..bdfd9a1a 100644 --- a/docker_tasks/build_stac/handler.py +++ b/docker_tasks/build_stac/handler.py @@ -148,8 +148,8 @@ def stac_handler(payload_event): # For cloud watch log to work the task should stay alife for at least 30 s start = time() print(f"Start at {start}") - - payload_event = ast.literal_eval(args.payload) + print(args) + payload_event = json.loads(args.payload) building_stac_response = stac_handler(payload_event) response = json.dumps({**payload_event, **building_stac_response}) end = time() - start diff --git a/docker_tasks/vector_ingest/handler.py b/docker_tasks/vector_ingest/handler.py index 35ff27cb..a4b86ba5 100644 --- a/docker_tasks/vector_ingest/handler.py +++ b/docker_tasks/vector_ingest/handler.py @@ -2,7 +2,6 @@ from argparse import ArgumentParser import boto3 import os -import ast import subprocess import json import smart_open @@ -19,9 +18,11 @@ def download_file(file_uri: str): sts = boto3.client("sts") + print(f'Assuming role: {os.environ.get("EXTERNAL_ROLE_ARN")}') + role_arn = os.environ.get("EXTERNAL_ROLE_ARN") response = sts.assume_role( - RoleArn=os.environ.get("EXTERNAL_ROLE_ARN"), - RoleSessionName="sts-assume-114506680961", + RoleArn=role_arn, + RoleSessionName="airflow_vector_ingest", ) new_session = boto3.Session( aws_access_key_id=response["Credentials"]["AccessKeyId"], @@ -123,17 +124,19 @@ def delete_region( gpkg_path: str, table_name: str, ): - """delete all existing records by region name""" gdf = gpd.read_file(gpkg_path) - region_name = gdf["region"].iloc[0] - with engine.connect() as conn: - with conn.begin(): - delete_sql = sqlalchemy.text( - f""" - DELETE FROM {table_name} WHERE region='{region_name}' - """ - ) - conn.execute(delete_sql) + if 'region' in gdf.columns: + region_name = gdf["region"].iloc[0] + with engine.connect() as conn: + with conn.begin(): + delete_sql = sqlalchemy.text( + f""" + DELETE FROM {table_name} WHERE region=:region_name + """ + ) + conn.execute(delete_sql, {'region_name': region_name}) + else: + print(f"'region' column not found in {gpkg_path}. No records deleted.") def upsert_to_postgis( @@ -348,7 +351,7 @@ def handler(): ) args = parser.parse_args() - payload_event = ast.literal_eval(args.payload) + payload_event = json.loads(args.payload) s3_event = payload_event.pop("payload") with smart_open.open(s3_event, "r") as _file: s3_event_read = _file.read() diff --git a/docker_tasks/vector_ingest/requirements.txt b/docker_tasks/vector_ingest/requirements.txt index 38263eed..35d23946 100644 --- a/docker_tasks/vector_ingest/requirements.txt +++ b/docker_tasks/vector_ingest/requirements.txt @@ -3,5 +3,5 @@ psycopg2-binary==2.9.9 requests==2.30.0 boto3==1.26.129 GeoAlchemy2==0.14.2 -geopandas==0.14.0 -SQLAlchemy==2.0.23 \ No newline at end of file +geopandas==0.14.4 +SQLAlchemy==2.0.23 diff --git a/infrastructure/aws_data.tf b/infrastructure/aws_data.tf index 2fe94f38..70bbe108 100644 --- a/infrastructure/aws_data.tf +++ b/infrastructure/aws_data.tf @@ -8,6 +8,7 @@ terraform { provider "aws" { + alias = "aws_current" profile = var.aws_profile region = var.aws_region } diff --git a/infrastructure/main.tf b/infrastructure/main.tf index 0f22eb64..4d7390d5 100644 --- a/infrastructure/main.tf +++ b/infrastructure/main.tf @@ -54,14 +54,14 @@ module "custom_policy" { vector_secret_name = var.vector_secret_name } -data "aws_subnets" "private" { +data "aws_subnets" "vector_aws_subnets" { filter { name = "vpc-id" values = [var.vector_vpc == null ? "" : var.vector_vpc] } tags = { - "Scope" = "private" + Scope = "private" } } @@ -109,8 +109,8 @@ resource "local_file" "mwaa_variables" { stac_ingestor_api_url = var.stac_ingestor_api_url stac_url = var.stac_url vector_secret_name = var.vector_secret_name - vector_subnet_1 = length(data.aws_subnets.subnet_ids.ids) > 0 ? data.aws_subnets.subnet_ids.ids[0] : "" - vector_subnet_2 = length(data.aws_subnets.subnet_ids.ids) > 0 ? data.aws_subnets.subnet_ids.ids[1] : "" + vector_subnet_1 = length(data.aws_subnets.vector_aws_subnets.ids) > 0 ? data.aws_subnets.vector_aws_subnets.ids[0] : data.aws_subnets.subnet_ids.ids[0] + vector_subnet_2 = length(data.aws_subnets.vector_aws_subnets.ids) > 0 ? data.aws_subnets.vector_aws_subnets.ids[1] : data.aws_subnets.subnet_ids.ids[1] vector_security_group = length(aws_security_group.vector_sg) > 0 ? aws_security_group.vector_sg[0].id : "" vector_vpc = var.vector_vpc }) @@ -180,7 +180,7 @@ resource "aws_iam_policy" "s3_bucket_access" { Action = [ "s3:*", ] - Effect = "Allow" + Effect = "Allow" Resource = [ "arn:aws:s3:::*", "arn:aws:s3:::*/*" @@ -201,7 +201,7 @@ resource "aws_iam_role_policy_attachment" "lambda_basic_execution" { } resource "aws_iam_role_policy_attachment" "lambda_s3_access" { - role = aws_iam_role.lambda_execution_role.name + role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.s3_bucket_access.arn } @@ -273,8 +273,9 @@ resource "null_resource" "update_workflows_lambda_image" { # API Gateway HTTP API resource "aws_apigatewayv2_api" "workflows_http_api" { - name = "${var.prefix}_workflows_http_api" - protocol_type = "HTTP" + name = "${var.prefix}_workflows_http_api" + protocol_type = "HTTP" + disable_execute_api_endpoint = var.disable_default_apigw_endpoint } # Lambda Integration for API Gateway @@ -305,19 +306,3 @@ resource "aws_lambda_permission" "api-gateway" { principal = "apigateway.amazonaws.com" source_arn = "${aws_apigatewayv2_api.workflows_http_api.execution_arn}/*/$default" } - -# Cloudfront update - -resource "null_resource" "update_cloudfront" { - triggers = { - always_run = "${timestamp()}" - } - - count = coalesce(var.cloudfront_id, false) != false ? 1 : 0 - - provisioner "local-exec" { - command = "${path.module}/cf_update.sh ${var.cloudfront_id} workflows_api_origin \"${aws_apigatewayv2_api.workflows_http_api.api_endpoint}\"" - } - - depends_on = [aws_apigatewayv2_api.workflows_http_api] -} diff --git a/infrastructure/terraform.tfvars.tmpl b/infrastructure/terraform.tfvars.tmpl index 0e49bc17..cf5e6613 100644 --- a/infrastructure/terraform.tfvars.tmpl +++ b/infrastructure/terraform.tfvars.tmpl @@ -17,7 +17,6 @@ vector_secret_name="${VECTOR_SECRET_NAME}" vector_security_group="${VECTOR_SECURITY_GROUP}" vector_vpc="${VECTOR_VPC:-null}" workflow_root_path="${WORKFLOW_ROOT_PATH}" -cloudfront_id="${VEDA_CLOUDFRONT_ID}" cognito_domain="${VEDA_COGNITO_DOMAIN}" client_id="${VEDA_CLIENT_ID}" userpool_id="${VEDA_USERPOOL_ID}" diff --git a/infrastructure/variables.tf b/infrastructure/variables.tf index 9658a6a3..25d08958 100644 --- a/infrastructure/variables.tf +++ b/infrastructure/variables.tf @@ -76,6 +76,11 @@ variable "vector_vpc" { default = "null" } +variable "deploy_vector_automation" { + type = bool + default = "false" +} + variable "data_access_role_arn" { type = string } @@ -93,10 +98,6 @@ variable "workflow_root_path" { default = "/api/workflows" } -variable "cloudfront_id" { - type = string -} - variable "cognito_domain" { type = string } @@ -129,3 +130,8 @@ variable "ecs_task_memory" { type = number default = 4096 } + +variable "disable_default_apigw_endpoint" { + type = bool + default = false +} \ No newline at end of file diff --git a/sm2a/.deploy_env_example b/sm2a/.deploy_env_example index 53247eed..336c4243 100644 --- a/sm2a/.deploy_env_example +++ b/sm2a/.deploy_env_example @@ -1,19 +1,28 @@ -AIRFLOW_UID=501 -PREFIX=**** -VPC_ID=**** -STATE_BUCKET_NAME=**** -STATE_BUCKET_KEY=**** -STATE_DYNAMO_TABLE=**** -PRIVATE_SUBNETS_TAGNAME=**** -PUBLIC_SUBNETS_TAGNAME=**** -AIRFLOW_FERNET_KEY=**** -AIRFLOW_DB_NAME=**** -AIRFLOW_DB_USERNAME=**** -AIRFLOW_DB_PASSWORD=**** -PERMISSION_BOUNDARIES_ARN=**** -DOMAIN_NAME=openveda.cloud -STAGE=**** -TF_VAR_gh_app_client_id=**** -TF_VAR_gh_app_client_secret=**** -TF_VAR_gh_team_name=**** -TF_VAR_subdomain=**** +AIRFLOW_UID=[REDACTED] +PREFIX=[REDACTED] +VPC_ID=[REDACTED] +STATE_BUCKET_NAME=[REDACTED] +STATE_BUCKET_KEY=[REDACTED] +STATE_DYNAMO_TABLE=[REDACTED] +PRIVATE_SUBNETS_TAGNAME=[REDACTED] +PUBLIC_SUBNETS_TAGNAME=[REDACTED] +AIRFLOW_FERNET_KEY=Q9O3TOkB8YVXwy78yjgrQSOJ5njoqr6d2jM69sHiaOs=[REDACTED] +AIRFLOW_DB_NAME=[REDACTED] +AIRFLOW_DB_USERNAME=[REDACTED] +AIRFLOW_DB_PASSWORD=[REDACTED] +PERMISSION_BOUNDARIES_ARN=[REDACTED] +DOMAIN_NAME=[REDACTED] +STAGE=[REDACTED] +TF_VAR_subdomain=[REDACTED] +TF_VAR_gh_app_client_id=[REDACTED] +TF_VAR_gh_app_client_secret=[REDACTED] +TF_VAR_gh_team_name=[REDACTED] +TF_VAR_event_bucket=[REDACTED] +TF_VAR_workflows_client_secret=[REDACTED] +TF_VAR_stac_ingestor_api_url=[REDACTED] +TF_VAR_stac_url=[REDACTED] +TF_VAR_vector_secret_name=[REDACTED] +TF_VAR_storage_bucket_name=[REDACTED] +TF_VAR_s3_invoke_filter_prefix=[REDACTED] +TF_VAR_sm2a_secret_manager_name=[REDACTED] +TF_VAR_target_dag_id=[REDACTED] diff --git a/sm2a/Makefile b/sm2a/Makefile index 57a4e020..b092e3ce 100644 --- a/sm2a/Makefile +++ b/sm2a/Makefile @@ -12,9 +12,9 @@ count_down = \ @echo "Spinning up the system please wait..."; \ secs=40 ;\ while [ $$secs -gt 0 ]; do \ - printf "%d\033[0K\r" $$secs; \ + printf "%d\033[0K\r" "$$secs"; \ sleep 1; \ - : $$((secs--)); \ + secs=$$((secs - 1)); \ done; .PHONY: @@ -24,6 +24,11 @@ count_down = \ all: sm2a-local-init sm2a-local-run +refresh: sm2a-local-build sm2a-local-run + +count_down_test: + $(count_down) + sm2a-local-run: sm2a-local-stop @echo "Running SM2A" docker compose up -d @@ -34,7 +39,6 @@ sm2a-local-run: sm2a-local-stop @echo "To use local SM2A with AWS update ${SM2A_FOLDER}/sm2a-local-config/.env AWS credentials" sm2a-local-init: - cp sm2a-local-config/env_example sm2a-local-config/.env cp -r ../dags . docker compose run --rm airflow-cli db init docker compose run --rm airflow-cli users create --email airflow@example.com --firstname airflow --lastname airflow --password airflow --username airflow --role Admin diff --git a/sm2a/airflow_worker/Dockerfile b/sm2a/airflow_worker/Dockerfile index 154cd1ae..4b391665 100644 --- a/sm2a/airflow_worker/Dockerfile +++ b/sm2a/airflow_worker/Dockerfile @@ -18,7 +18,6 @@ WORKDIR /opt/airflow RUN chown $UNAME:$GID /opt/airflow RUN apt-get -y update \ - && apt install -y python3-pip \ && apt-get install -y --no-install-recommends gcc libc6-dev libcurl4-openssl-dev libssl-dev \ && apt-get autoremove -yqq --purge \ && apt-get clean \ @@ -26,7 +25,17 @@ RUN apt-get -y update \ USER airflow -ENV PATH $PATH:/home/airflow/.local/bin + +RUN mkdir -p /home/airflow/miniconda3 \ + && curl -o /home/airflow/miniconda3/miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \ + && bash /home/airflow/miniconda3/miniconda.sh -b -u -p /home/airflow/miniconda3 \ + && rm /home/airflow/miniconda3/miniconda.sh + +# Install python 3.11 +RUN /home/airflow/miniconda3/bin/conda create -n py11 --yes python=3.11 + +ENV PATH $PATH:/home/airflow/miniconda3/envs/py11/bin + COPY --chown=airflow:airflow airflow_worker/requirements.txt "${AIRFLOW_HOME}/requirements.txt" @@ -41,8 +50,6 @@ COPY --chown=airflow:airflow scripts "${AIRFLOW_HOME}/scripts" RUN cp ${AIRFLOW_HOME}/configuration/airflow.cfg* ${AIRFLOW_HOME}/. -RUN pip install pypgstac==0.7.4 - # ENV ENV AIRFLOW_HOME ${AIRFLOW_HOME} ENV TZ UTC diff --git a/sm2a/airflow_worker/requirements.txt b/sm2a/airflow_worker/requirements.txt index 7c66fbda..f8798eb4 100644 --- a/sm2a/airflow_worker/requirements.txt +++ b/sm2a/airflow_worker/requirements.txt @@ -14,6 +14,7 @@ smart-open airflow_multi_dagrun apache-airflow-providers-postgres apache-airflow-providers-common-sql +apache-airflow-providers-http typing-extensions pyOpenSSL stac-pydantic @@ -21,4 +22,13 @@ fsspec s3fs xarray xstac - +pystac +rasterio +rio-stac +GeoAlchemy2 +geopandas==0.14.4 +fiona==1.9.6 +h5netcdf +rioxarray +apache-airflow-providers-slack +apache-airflow-providers-slack[http] diff --git a/sm2a/infrastructure/.terraform.lock.hcl b/sm2a/infrastructure/.terraform.lock.hcl index b9eab3ab..2bdb5aef 100644 --- a/sm2a/infrastructure/.terraform.lock.hcl +++ b/sm2a/infrastructure/.terraform.lock.hcl @@ -1,6 +1,25 @@ # This file is maintained automatically by "terraform init". # Manual edits may be lost in future updates. +provider "registry.terraform.io/hashicorp/archive" { + version = "2.6.0" + hashes = [ + "h1:upAbF0KeKLAs3UImwwp5veC7jRcLnpKWVjkbd4ziWhM=", + "zh:29273484f7423b7c5b3f5df34ccfc53e52bb5e3d7f46a81b65908e7a8fd69072", + "zh:3cba58ec3aea5f301caf2acc31e184c55d994cc648126cac39c63ae509a14179", + "zh:55170cd17dbfdea842852c6ae2416d057fec631ba49f3bb6466a7268cd39130e", + "zh:7197db402ba35631930c3a4814520f0ebe980ae3acb7f8b5a6f70ec90dc4a388", + "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", + "zh:8bf7fe0915d7fb152a3a6b9162614d2ec82749a06dba13fab3f98d33c020ec4f", + "zh:8ce811844fd53adb0dabc9a541f8cb43aacfa7d8e39324e4bd3592b3428f5bfb", + "zh:bca795bca815b8ac90e3054c0a9ab1ccfb16eedbb3418f8ad473fc5ad6bf0ef7", + "zh:d9355a18df5a36cf19580748b23249de2eb445c231c36a353709f8f40a6c8432", + "zh:dc32cc32cfd8abf8752d34f2a783de0d3f7200c573b885ecb64ece5acea173b4", + "zh:ef498e20391bf7a280d0fd6fd6675621c85fbe4e92f0f517ae4394747db89bde", + "zh:f2bc5226c765b0c8055a7b6207d0fe1eb9484e3ec8880649d158827ac6ed3b22", + ] +} + provider "registry.terraform.io/hashicorp/aws" { version = "4.67.0" constraints = "~> 4.0" diff --git a/sm2a/infrastructure/configuration/airflow.cfg b/sm2a/infrastructure/configuration/airflow.cfg deleted file mode 100755 index 49591f28..00000000 --- a/sm2a/infrastructure/configuration/airflow.cfg +++ /dev/null @@ -1,63 +0,0 @@ -[api] -auth_backends = airflow.api.auth.backend.basic_auth - -[core] -executor = CeleryExecutor -dags_are_paused_at_creation = true -load_examples = false -load_default_connections = false -# Allow airflow to run hundreds of tasks in parallel, because we will scale workers -# automatically. -# https://programmaticponderings.com/2020/12/29/amazon-managed-workflows-for-apache-airflow-configuration-understanding-amazon-mwaas-configuration-options/ -max_active_tasks_per_dag = 10000 -parallelism = 10000 - -[celery] -broker_url = sqs:// -celery_config_options = configuration.celery_config.CELERY_CONFIG - - -[github_enterprise] -api_rev = v3 -host = github.com -client_id = Iv23lil9JEmXAM6QJlFe -client_secret = 8cbd483d2cb4e73599dffba93dbd0295ef0830c5 -oauth_callback_route = /home -allowed_teams = VEDA - -[webserver] -authenticate = True -auth_backends = airflow.contrib.auth.backends.github_enterprise_auth -dag_default_view = grid -expose_config = true -dag_orientation = TB -warn_deployment_exposure = false - -# On ECS, you can deploy the CloudWatch agent as a sidecar to your application container to collect metrics. -# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/deploy_servicelens_CloudWatch_agent_deploy_ECS.html -# https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html -# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-statsd.html -# https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-autoscaling.html -# https://docs.aws.amazon.com/mwaa/latest/userguide/access-metrics-cw-202.html#available-metrics-cw-v202 -# [metrics] -# statsd_on = true -# statsd_host = localhost -# statsd_port = 8125 -# statsd_prefix = airflow - -[scheduler] -catchup_by_default = false - -[logging] -# logging_config_class = configuration.logging_config.STDOUT_LOGGING_CONFIG -remote_logging = true -# We set this value as an environment variable -# remote_base_log_folder = - -[secrets] -# AWS Secrets Manager Backend -# https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/secrets-backends/aws-secrets-manager.html -# Setting full_url_mode to false allows us to use multiple fields when storing connections -# Source code: https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/secrets/secrets_manager.py -backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend -backend_kwargs = {"connections_prefix": "sm2a-dev/airflow/connections", "variables_prefix": "sm2a-dev/airflow/variables","connections_lookup_pattern": "_default$", "variables_lookup_pattern": "^aws_", "config_prefix": "sm2a-dev/airflow/config"} diff --git a/sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py b/sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py new file mode 100644 index 00000000..d48e22c6 --- /dev/null +++ b/sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py @@ -0,0 +1,61 @@ +import http.client +import json +import os +import uuid +from base64 import b64encode + +import boto3 + + +def lambda_handler(event, context): + secrets_client = boto3.client("secretsmanager") + sm2a_secret_manager_name = os.getenv("SM2A_SECRET_MANAGER_NAME") + dag_name = os.getenv("TARGET_DAG_ID") + storage_bucket = os.getenv("STORAGE_BUCKET") + try: + secret_response = secrets_client.get_secret_value( + SecretId=sm2a_secret_manager_name + ) + secret_data = json.loads(secret_response["SecretString"]) + sm2a_domain_name = secret_data["airflow_webserver_url"] + username = secret_data["airflow_admin_username"] + password = secret_data["airflow_admin_password"] + record = event["Records"][0] + print(record) + # Create the HTTP connection + conn = http.client.HTTPSConnection(sm2a_domain_name) + except Exception as ex: + return {"statusCode": 500, "body": json.dumps(f"Error: {ex}")} + + s3_event_key = record["s3"]["object"]["key"] + s3_filename_target = os.path.split(s3_event_key)[-1] + s3_filename_no_ext = os.path.splitext(s3_filename_target)[0] + bucket_key_prefix = os.path.dirname(s3_event_key) + data = { + "conf": { + "discovery": "s3", + "collection": s3_filename_no_ext, + "prefix": bucket_key_prefix, + "bucket": storage_bucket, + "filename_regex": f"^(.*){s3_filename_target}$", + "vector_eis": True, + }, + "dag_run_id": f"{dag_name}-{uuid.uuid4()}", + "note": "Run from S3 Event bridge", + } + headers = { + "Content-Type": "application/json", + "Authorization": "Basic " + + b64encode(f"{username}:{password}".encode()).decode(), + } + + conn.request("POST", f"/api/v1/dags/{dag_name}/dagRuns", json.dumps(data), headers) + + # Get the response + response = conn.getresponse() + response_data = response.read() + + # Close the connection + conn.close() + + return {"statusCode": response.status, "body": response_data.decode()} diff --git a/sm2a/infrastructure/locals.tf b/sm2a/infrastructure/locals.tf new file mode 100644 index 00000000..8cd5b4eb --- /dev/null +++ b/sm2a/infrastructure/locals.tf @@ -0,0 +1,12 @@ +provider "aws" { + alias = "aws_current" + region = var.aws_region +} + +data "aws_caller_identity" "current" {} +data "aws_region" "current" {} + +locals { + aws_region = data.aws_region.current.name + account_id = data.aws_caller_identity.current.account_id +} diff --git a/sm2a/infrastructure/main.tf b/sm2a/infrastructure/main.tf index f7b6aa22..dcfb0161 100644 --- a/sm2a/infrastructure/main.tf +++ b/sm2a/infrastructure/main.tf @@ -19,7 +19,7 @@ resource "random_password" "password" { module "sma-base" { - source = "https://github.com/NASA-IMPACT/self-managed-apache-airflow/releases/download/v1.1.4/self-managed-apache-airflow.zip" + source = "https://github.com/NASA-IMPACT/self-managed-apache-airflow/releases/download/v1.1.5/self-managed-apache-airflow.zip" project = var.project_name airflow_db = var.airflow_db fernet_key = var.fernet_key @@ -28,23 +28,22 @@ module "sma-base" { public_subnets_tagname = var.public_subnets_tagname vpc_id = var.vpc_id state_bucketname = var.state_bucketname - desired_max_workers_count = var.workers_configuration[var.stage].max_desired_workers + desired_max_workers_count = var.desired_max_workers_count airflow_admin_password = random_password.password.result airflow_admin_username = "admin" rds_publicly_accessible = var.rds_publicly_accessible permission_boundaries_arn = var.permission_boundaries_arn custom_worker_policy_statement = var.custom_worker_policy_statement - worker_cpu = var.workers_configuration[var.stage].cpu - worker_memory = var.workers_configuration[var.stage].memory + worker_cpu = tonumber(var.workers_cpu) + worker_memory = tonumber(var.workers_memory) number_of_schedulers = var.number_of_schedulers - scheduler_cpu = var.scheduler_cpu - scheduler_memory = var.scheduler_memory - rds_engine_version = var.rds_configuration[var.stage].rds_engine_version - rds_instance_class = var.rds_configuration[var.stage].rds_instance_class - rds_allocated_storage = var.rds_configuration[var.stage].rds_allocated_storage - rds_max_allocated_storage = var.rds_configuration[var.stage].rds_max_allocated_storage - workers_logs_retention_days = var.workers_configuration[var.stage].workers_logs_retention_days - airflow_custom_variables = var.airflow_custom_variables + scheduler_cpu = tonumber(var.scheduler_cpu) + scheduler_memory = tonumber(var.scheduler_memory) + rds_engine_version = var.rds_engine_version + rds_instance_class = var.rds_instance_class + rds_allocated_storage = tonumber(var.rds_allocated_storage) + rds_max_allocated_storage = tonumber(var.rds_max_allocated_storage) + workers_logs_retention_days = tonumber(var.workers_logs_retention_days) extra_airflow_task_common_environment = [ { @@ -53,7 +52,7 @@ module "sma-base" { }, { name = "AIRFLOW__CORE__DEFAULT_TASK_RETRIES" - value = var.workers_configuration[var.stage].task_retries + value = var.workers_task_retries }, { name = "GH_CLIENT_ID" @@ -69,7 +68,7 @@ module "sma-base" { }, { name = "GH_USER_TEAM_ID" - value = "csda-airflow-data-pipeline-users" + value = var.gh_user_team_id } @@ -82,6 +81,17 @@ module "sma-base" { domain_name = var.domain_name stage = var.stage subdomain = var.subdomain - worker_cmd = ["/home/airflow/.local/bin/airflow", "celery", "worker"] + worker_cmd = ["airflow", "celery", "worker"] + + airflow_custom_variables = { + EVENT_BUCKET = var.state_bucketname + COGNITO_APP_SECRET = var.workflows_client_secret + STAC_INGESTOR_API_URL = var.stac_ingestor_api_url + STAC_URL = var.stac_url + VECTOR_SECRET_NAME = var.vector_secret_name + ASSUME_ROLE_READ_ARN = var.assume_role_read_arn + ASSUME_ROLE_WRITE_ARN = var.assume_role_write_arn + SM2A_BASE_URL = module.sma-base.airflow_url + } } diff --git a/sm2a/infrastructure/s3_event_bridge_lambda.tf b/sm2a/infrastructure/s3_event_bridge_lambda.tf new file mode 100644 index 00000000..f0aca022 --- /dev/null +++ b/sm2a/infrastructure/s3_event_bridge_lambda.tf @@ -0,0 +1,177 @@ +##################################################### + +##################################################### +# Execution Role +##################################################### +resource "aws_iam_role" "lambda_exec_role" { + provider = aws.aws_current + name = "lambda-exec-role-s3-event-bridge-veda-${var.stage}" + permissions_boundary = var.permission_boundaries_arn + + assume_role_policy = <