Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev to main BLOCKED by issue #252 #245

Open
wants to merge 135 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
c1d67da
WIP: dynamic ingest mapping for continuous tracing
ividito Jul 19, 2024
55744c6
feat: use taskflow for concurrent task runs
ividito Jul 31, 2024
ee98cf7
clean up debug code
ividito Jul 31, 2024
6e151a9
Bugfix vector ingest
ividito Aug 1, 2024
82c724f
merge dev
smohiudd Aug 20, 2024
54b885d
add generic vector pipeline
smohiudd Aug 20, 2024
a7b51e1
remove process generic vector pipeline
smohiudd Aug 20, 2024
318d39a
Update dags/veda_data_pipeline/veda_generic_vector_pipeline.py
ividito Aug 27, 2024
0ab867b
Restructure dataset get_files output, add retries to some tasks to av…
ividito Sep 9, 2024
f1411fe
Changes to make vector ingest work in shared VPC environment
ividito Oct 4, 2024
b6b4b5e
added the statistics calculations in pipeline
Oct 16, 2024
42e88d7
generating JSON file for the stats and uploading it to S3
Oct 16, 2024
a5dcb51
using temp file for JSON
Oct 17, 2024
4db9b2c
feat:port event-driven vector automation to terraform
ividito Oct 17, 2024
ebb9993
fix: feature flag for vector automation lambda
ividito Oct 17, 2024
4eca40e
fix: adjust log group count
ividito Oct 17, 2024
f98add0
testing JSON update for multiple datasets
Oct 18, 2024
0005892
fix tf vector automation vars
smohiudd Oct 18, 2024
38bff94
update vars in event bridge tf
smohiudd Oct 18, 2024
90aed8f
Merge pull request #239 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
021f4ff
add new line
smohiudd Oct 18, 2024
8ec0cd4
Merge pull request #240 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
5678be5
tested the creation of JSON file for different datasets
Oct 18, 2024
abba102
fix tag filter
smohiudd Oct 18, 2024
006e989
Merge pull request #241 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
f181ca7
update vector subnet ref
smohiudd Oct 18, 2024
39401a0
Merge pull request #242 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
857e2fa
fix vector subnet ids
smohiudd Oct 18, 2024
9b43b89
Merge pull request #243 from NASA-IMPACT/fix/vector-automation
smohiudd Oct 18, 2024
e3870e7
conditional vector subnets
smohiudd Oct 21, 2024
266be89
feat: disable default api gateway endpoint for workflows api
botanical Oct 21, 2024
65f0a8c
Merge pull request #246 from NASA-IMPACT/fix/vector-ingest-subnets
smohiudd Oct 21, 2024
d4a57e3
feat: add variable to configure disabling default apigw endpoint
botanical Oct 21, 2024
b9a9323
Merge pull request #197 from NASA-IMPACT/fix/asset-handling
smohiudd Oct 22, 2024
d77d946
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
7af892a
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
6d3e29a
Using pythonoperators instead of ECS operator
amarouane-ABDELHAK Oct 24, 2024
61a797d
Merge pull request #247 from NASA-IMPACT/jt/issue-452-disable-default…
botanical Oct 24, 2024
686c4c8
Switching to pythonOperator
amarouane-ABDELHAK Oct 24, 2024
c190564
Merge pull request #249 from NASA-IMPACT/feature/use-pythonoperators
amarouane-ABDELHAK Oct 25, 2024
305dd43
test: temporarily use test buckets
anayeaye Oct 25, 2024
188f96e
feat:port event-driven vector automation to terraform
ividito Oct 17, 2024
a2e3bad
fix: feature flag for vector automation lambda
ividito Oct 17, 2024
12ba68c
fix: adjust log group count
ividito Oct 17, 2024
f532a6f
fix tf vector automation vars
smohiudd Oct 18, 2024
89d5251
update vars in event bridge tf
smohiudd Oct 18, 2024
9c636e0
add new line
smohiudd Oct 18, 2024
47b0707
fix tag filter
smohiudd Oct 18, 2024
8169ab6
update vector subnet ref
smohiudd Oct 18, 2024
66cfb79
fix vector subnet ids
smohiudd Oct 18, 2024
c449cb9
conditional vector subnets
smohiudd Oct 21, 2024
0df795e
WIP: dynamic ingest mapping for continuous tracing
ividito Jul 19, 2024
bfce3d5
feat: use taskflow for concurrent task runs
ividito Jul 31, 2024
31a3e84
clean up debug code
ividito Jul 31, 2024
a811fe2
Bugfix vector ingest
ividito Aug 1, 2024
3c034ae
add generic vector pipeline
smohiudd Aug 20, 2024
4a5dc02
remove process generic vector pipeline
smohiudd Aug 20, 2024
de8b69d
Update dags/veda_data_pipeline/veda_generic_vector_pipeline.py
ividito Aug 27, 2024
d28432d
Restructure dataset get_files output, add retries to some tasks to av…
ividito Sep 9, 2024
26feccf
Changes to make vector ingest work in shared VPC environment
ividito Oct 4, 2024
18470d0
feat: disable default api gateway endpoint for workflows api
botanical Oct 21, 2024
919e04b
feat: add variable to configure disabling default apigw endpoint
botanical Oct 21, 2024
81f94de
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
7485696
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
75fef32
Using pythonoperators instead of ECS operator
amarouane-ABDELHAK Oct 24, 2024
6f080a7
Switching to pythonOperator
amarouane-ABDELHAK Oct 24, 2024
dafb213
tested the creation of JSON file for different datasets
Oct 18, 2024
884c296
optimizing the automation
Oct 28, 2024
05cd27d
Merge branch 'dev' into feature/optimize_automated_cog_transformation
Oct 29, 2024
8d64aaa
testing the optimized approach for automation
Oct 29, 2024
3ada005
successfully tested the automation for gridded population dataset
Oct 29, 2024
a619098
Add S3 event bridge
amarouane-ABDELHAK Oct 29, 2024
4c9753f
Add S3 event bridge
amarouane-ABDELHAK Oct 29, 2024
59e1938
Add S3 event bridge
amarouane-ABDELHAK Oct 29, 2024
685e7db
Add env deployment example
amarouane-ABDELHAK Oct 30, 2024
dd0ea9e
Merge pull request #251 from NASA-IMPACT/feat/use-sm2a-in-eventbrige-…
amarouane-ABDELHAK Oct 30, 2024
a64099a
updated readme for the folder and resolved the failure bug
Nov 4, 2024
8a3c584
only execute deploy action on push to dev branch
anayeaye Nov 6, 2024
4a277d3
Merge pull request #254 from NASA-IMPACT/ci/remove-staging-action
anayeaye Nov 6, 2024
db73561
ci: do not automatically deploy mwaa to any environment
anayeaye Nov 6, 2024
c450be2
Merge pull request #255 from NASA-IMPACT/ci/remove-all-mwaa-deploy-ac…
anayeaye Nov 8, 2024
b258d41
Adding condition on s3 event bridge
amarouane-ABDELHAK Nov 13, 2024
eb5e32c
Adding condition on s3 event bridge
amarouane-ABDELHAK Nov 14, 2024
4909f43
Fix some missing variables
amarouane-ABDELHAK Nov 14, 2024
87a08e2
Remove .env file
amarouane-ABDELHAK Nov 15, 2024
cf19bd3
Remove .env file
amarouane-ABDELHAK Nov 15, 2024
1335a92
Remove the airflow config to be created by CICD
amarouane-ABDELHAK Nov 15, 2024
dbcd631
print all transfer exceptions
anayeaye Nov 21, 2024
32b2acb
Merge pull request #257 from NASA-IMPACT/feat/use-sm2a-in-eventbrige-…
amarouane-ABDELHAK Nov 22, 2024
7872364
Merge branch 'dev' into fix/reveal-transfer-exception
anayeaye Nov 22, 2024
4374f34
print client err on failed transfer
anayeaye Nov 22, 2024
091bc80
raise transfer exception
anayeaye Nov 22, 2024
4f14c96
update secret variable type to string and add assume role arns to air…
anayeaye Nov 22, 2024
ef7be91
Merge pull request #260 from NASA-IMPACT/fix/reveal-transfer-exception
anayeaye Nov 22, 2024
fe09938
fix(dockerfile) remove pypgstac install
anayeaye Nov 25, 2024
56738e9
fix(dag-requirements) remove pypgstac install
anayeaye Nov 25, 2024
66193a8
Merge pull request #262 from NASA-IMPACT/fix/remove-pypgstac
anayeaye Nov 25, 2024
4fbd229
let ingest api handle datetime validation and just pass through input…
anayeaye Nov 25, 2024
d9799cc
Merge pull request #263 from NASA-IMPACT/fix/collection-datetime-format
anayeaye Nov 26, 2024
926be29
temp print statement
smohiudd Nov 26, 2024
7e05d2f
Merge pull request #264 from NASA-IMPACT/fix/collection-print
smohiudd Nov 26, 2024
2974701
remove comma
smohiudd Nov 26, 2024
5d8757e
Merge pull request #265 from NASA-IMPACT/fix/collection-print
smohiudd Nov 26, 2024
64683af
Strip thumbnail assets from payload before discovery
ividito Nov 27, 2024
9d7c611
rename var
ividito Nov 27, 2024
3a48975
Update dags/veda_data_pipeline/veda_dataset_pipeline.py
ividito Nov 28, 2024
0e5cc7c
Fix callable
ividito Dec 3, 2024
53416d0
Refactor Dataset Pipeline
amarouane-ABDELHAK Dec 6, 2024
5a41003
Refactor Dataset Pipeline
amarouane-ABDELHAK Dec 6, 2024
cfb2bf2
Return status of the ingest
amarouane-ABDELHAK Dec 6, 2024
4e0416b
Update dags/veda_data_pipeline/groups/discover_group.py
amarouane-ABDELHAK Dec 11, 2024
534e571
Adding aut ingest
amarouane-ABDELHAK Dec 11, 2024
000a38c
Fix the bug of updating a pointer
amarouane-ABDELHAK Dec 11, 2024
64bea83
Merge branch 'refactor-dataset-pipeline' of https://github.com/NASA-I…
amarouane-ABDELHAK Dec 11, 2024
74573b7
Fix updating the copy of config instead of the pointer
amarouane-ABDELHAK Dec 11, 2024
e6ca7d4
Merge pull request #268 from NASA-IMPACT/refactor-dataset-pipeline
amarouane-ABDELHAK Dec 11, 2024
da43490
Fixing automation DAG
amarouane-ABDELHAK Dec 11, 2024
ac3d475
Fixing automation DAG
amarouane-ABDELHAK Dec 11, 2024
b584575
Fixing automation DAG
amarouane-ABDELHAK Dec 11, 2024
487f225
Add slack notification libraries
amarouane-ABDELHAK Dec 11, 2024
82f1109
Add slack notification libraries
amarouane-ABDELHAK Dec 11, 2024
4fdf934
Merge pull request #236 from NASA-IMPACT/feature/optimize_automated_c…
amarouane-ABDELHAK Dec 11, 2024
603a713
Update SM2A workers python version to 3.11
amarouane-ABDELHAK Dec 12, 2024
45175e0
Update the CMD for the workers
amarouane-ABDELHAK Dec 12, 2024
b5b02be
Merge pull request #271 from NASA-IMPACT/fix/update-workers-python-ve…
amarouane-ABDELHAK Dec 16, 2024
9a8d9d2
Add placeholder transfer to dataset DAG (no-op for now, just testing …
ividito Nov 13, 2024
95fe45a
More makefile adjustments, adjust control flow
ividito Nov 13, 2024
caad8ba
Reorder transfer <> discovery, change transfer task input
ividito Nov 20, 2024
4a5f166
Break out promotion to new DAG
ividito Dec 18, 2024
829fa24
Remove transfer flag
ividito Dec 18, 2024
dee4542
Light refactors to shared tasks, fixes to align promotion to dataset …
ividito Dec 18, 2024
46e66e6
set transfer dry run default back to false
ividito Dec 19, 2024
da0650c
Fix makefile, fix ti KeyError
ividito Dec 20, 2024
2f39ead
regex updated
siddharth0248 Dec 20, 2024
c22f94c
Merge pull request #274 from NASA-IMPACT/regex/sid
siddharth0248 Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ permissions:
on:
push:
branches:
- main
- dev
- production
pull_request:
branches:
- main
Expand All @@ -36,12 +34,8 @@ jobs:
- name: Set the environment based on the branch
id: define_environment
run: |
if [ "${{ github.ref }}" = "refs/heads/main" ]; then
echo "env_name=staging" >> $GITHUB_OUTPUT
elif [ "${{ github.ref }}" = "refs/heads/dev" ]; then
if [ "${{ github.ref }}" = "refs/heads/dev" ]; then
echo "env_name=development" >> $GITHUB_OUTPUT
elif [ "${{ github.ref }}" = "refs/heads/production" ]; then
echo "env_name=production" >> $GITHUB_OUTPUT
fi
- name: Print the environment
run: echo "The environment is ${{ steps.define_environment.outputs.env_name }}"
Expand Down Expand Up @@ -71,11 +65,6 @@ jobs:
role-session-name: "veda-airflow-github-${{ needs.define-environment.outputs.env_name }}-deployment"
aws-region: us-west-2

- name: Run MWAA deployment
uses: "./.github/actions/terraform-deploy"
with:
env_aws_secret_name: ${{ secrets.ENV_AWS_SECRET_NAME }}

- name: Run SM2A deployment
# Flag to deploy SM2A
if: ${{ vars.DEPLOY_SM2A }} = "true"
Expand Down
5 changes: 5 additions & 0 deletions dags/automated_transformation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Information about the folder
This folder contains `automation DAG`. It contains the DAG which is essential for transforming the given dataset into COGs.
The folder consists of two files-:
- `automation_dag.py` - It contains the DAG architecture.
- `transformation_pipeline.py` - It contains multiple functions to achieve the transformation of a given netCDF file to a COG. The file fetches checks whether the `transformation plugins` exist on `S3 bucket`, fetches the function and uses it for transformation. The statistics for the `netCDF` and `COG` files are also calculated and stored in a `JSON` file and pushed to `S3` along with the `transformed COG`.
128 changes: 128 additions & 0 deletions dags/automated_transformation/automation_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import annotations

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from slack_notifications import slack_fail_alert

DAG_ID = "automate-cog-transformation"

# Custom validation function


dag_run_config = {
"data_acquisition_method": Param(
"s3", enum=["s3"]
), # To add Other protocols (HTTP, SFTP...)
"plugins_uri": Param(
"https://raw.githubusercontent.com/US-GHG-Center/ghgc-docs/refs/heads/main/",
type="string",
),
"raw_data_bucket": "ghgc-data-store-develop",
"raw_data_prefix": Param(
"delivery/gpw",
type="string",
pattern="^[^/].*[^/]$",
),
"dest_data_bucket": "ghgc-data-store-develop",
"data_prefix": Param("transformed_cogs", type="string", pattern="^[^/].*[^/]$"),
"collection_name": "gpw",
"nodata": Param(-9999, type="number"),
"ext": Param(".tif", type="string", pattern="^\\..*$"),
}

with DAG(
dag_id=DAG_ID,
schedule=None,
catchup=False,
tags=["Transformation", "Report"],
params=dag_run_config,
on_failure_callback=slack_fail_alert,
) as dag:
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

@task
def check_function_exists(ti):
from dags.automated_transformation.transformation_pipeline import (
download_python_file,
)

config = ti.dag_run.conf
folder_name = "data_transformation_plugins"
file_name = f'{config.get("collection_name")}_transformation.py'
try:
plugin_url = f"{config['plugins_uri'].strip('/')}/{folder_name}/{file_name}"
download_python_file(uri=plugin_url, check_exist=True)
return f"The {file_name} exists in {folder_name} in this URL {plugin_url}."
except Exception as e:
raise Exception(f"Error checking file existence: {e}")

@task
def discover_files(ti):
from dags.automated_transformation.transformation_pipeline import (
get_all_s3_keys,
)

config = ti.dag_run.conf.copy()
bucket = config.get("raw_data_bucket")
model_name = config.get("raw_data_prefix")
ext = config.get("ext") # .nc as well
generated_list = get_all_s3_keys(bucket, model_name, ext)
chunk_size = int(len(generated_list) / 900) + 1
return [
generated_list[i : i + chunk_size]
for i in range(0, len(generated_list), chunk_size)
]

@task(max_active_tis_per_dag=1)
def process_files(file_url, **kwargs):
dag_run = kwargs.get("dag_run")
from dags.automated_transformation.transformation_pipeline import transform_cog

config = dag_run.conf.copy()
raw_bucket_name = config.get("raw_data_bucket")
dest_data_bucket = config.get("dest_data_bucket")
data_prefix = config.get("data_prefix")
nodata = config.get("nodata")
collection_name = config.get("collection_name")
print(f"The file I am processing is {file_url}")
print("len of files", len(file_url))
folder_name = "data_transformation_plugins"
file_name = f"{collection_name}_transformation.py"
plugin_url = f"{config['plugins_uri'].strip('/')}/{folder_name}/{file_name}"

file_status = transform_cog(
file_url,
plugin_url=plugin_url,
nodata=nodata,
raw_data_bucket=raw_bucket_name,
dest_data_bucket=dest_data_bucket,
data_prefix=data_prefix,
collection_name=collection_name,
)
return file_status

@task
def generate_report(reports, **kwargs):
dag_run = kwargs.get("dag_run")
collection_name = dag_run.conf.get("collection_name")
count, failed_files = 0, []
for report in reports:
if "failed" in report.values():
failed_files.append(report)
elif "success" in report.values():
count += 1

if failed_files:
raise Exception(f"Error generating COG file {failed_files}")
return {
"collection": collection_name,
"successes": count,
"failures": failed_files,
}

urls = start >> check_function_exists() >> discover_files()
report_data = process_files.expand(file_url=urls)
generate_report(reports=report_data) >> end
Loading
Loading