diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..752d66f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +**/__pycache__/ +*.py[cod] +src/data/ +.DS_Store diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..06102cf --- /dev/null +++ b/.env.example @@ -0,0 +1,12 @@ +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= + +EA_FASTAPI_S3_URL=s3://fcx-downloader/ +EA_FASTAPI_S3_BUCKET=fcx-downloader +EA_FASTAPI_S3_OBJECT_URL=https://fcx-downloader.s3.amazonaws.com/ +EA_FASTAPI_CLOUDFRONT_URL=https://d1ntjpytjzo3fx.cloudfront.net +EA_FASTAPI_FILE_DOWNLOAD_PATH= + +MACHINE=urs.earthdata.nasa.gov +EARTHDATA_USERNAME= +EARTHDATA_PASSWORD= \ No newline at end of file diff --git a/.gitignore b/.gitignore index 9db6f38..6ab7976 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ -folder +.env +**/__pycache__/ +*.py[cod] +src/data/ +.DS_Store \ No newline at end of file diff --git a/.idea/EA-fastapi.iml b/.idea/EA-fastapi.iml new file mode 100644 index 0000000..7a78d6f --- /dev/null +++ b/.idea/EA-fastapi.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..1757c3b --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..80961f9 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,110 @@ + + + + + + + + + + + + + + + + { + "associatedIndex": 7 +} + + + + { + "keyToString": { + "ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true", + "Python.stl_viewer_python.executor": "Run", + "RunOnceActivity.ShowReadmeOnStart": "true", + "git-widget-placeholder": "GHRCCLOUD-6449", + "last_opened_file_path": "C:/Users/nselvaraj/github/EA-fastapi/src", + "node.js.detected.package.eslint": "true", + "node.js.detected.package.tslint": "true", + "node.js.selected.package.eslint": "(autodetect)", + "node.js.selected.package.tslint": "(autodetect)", + "nodejs_package_manager_path": "npm", + "settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable", + "vue.rearranger.settings.migration": "true" + } +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1724100864067 + + + + + + + + + \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4a22a37 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM ghcr.io/osgeo/gdal:ubuntu-small-3.9.1 +RUN apt-get update && apt-get -y install python3-pip --fix-missing +COPY requirements.txt . +COPY requirements-additional.txt . +# RUN ln -s /usr/include/hdf5/serial /usr/include/hdf5/include +# RUN export HDF5_DIR=/usr/include/hdf5 +# RUN pip install versioned-hdf5 --break-system-packages +RUN apt-get -y install pkg-config libhdf5-dev +RUN apt-get -y install python3.12-venv +RUN pip install --no-binary=h5py h5py --break-system-packages +RUN pip install -r requirements.txt --break-system-packages +RUN pip install -r requirements-additional.txt --break-system-packages +COPY . . +EXPOSE 8000 +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f63ba38 --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# ea-fastapi + +In this repo, FastAPI is used as a wrapper around EarthAccess library. There are 4 endpoints defined which can be validated in Postman for tracking, +1. start_download - Downloads files from search result locally and uploads them to S3. Returns a unique job id of 8 characters. +2. status - Returns latest status of the job. +3. get_file_path - Returns S3 path of the uploaded granules. +4. get_metadata - Displays the metadata of granules. + +## Pre-requisites + +Accounts in AWS and EarthData are necessary for using this wrapper application. The credentials for these accounts should be entered in .env file. +Postman and Docker needs to be installed. + +## Steps to Use + +Run the following commands: + +`docker build -t .` + +`docker run -p 8000:8000 ` + +Once the Docker container is up and running, the following endpoints can be used to send GET/POST/PUT requests from http://0.0.0.0:8000/: + +| Request type | Endpoint | Parameters | +| --- | --- | --- | +| GET | / | N/A | +| PUT | /start_download | short_name, date_range, bounding_box(coordinates can be entered manually or passed as a text in Request Body), concept_id(optional) | +| POST | /status | job_id | +| POST | /get_file_path | job_id | +| POST | /get_metadata | job_id | diff --git a/build_and_docker.sh b/build_and_docker.sh new file mode 100644 index 0000000..58eb920 --- /dev/null +++ b/build_and_docker.sh @@ -0,0 +1,4 @@ +docker build -t fcx-fastapi . +docker stop fcx-fastapi-container +docker rm fcx-fastapi-container +docker run -d --name fcx-fastapi-container -p80:80 fcx-fastapi \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..f76cfd6 --- /dev/null +++ b/config.py @@ -0,0 +1,14 @@ +from dotenv import load_dotenv +import os + +load_dotenv() + +s3Url = os.environ.get('EA_FASTAPI_S3_URL') +cloudfrontUrl = os.environ.get('EA_FASTAPI_CLOUDFRONT_URL') +s3BucketName = os.environ.get('EA_FASTAPI_S3_BUCKET') +s3ObjectUrl = os.environ.get('EA_FASTAPI_S3_OBJECT_URL') +fileDownloadPath = os.environ.get('EA_FASTAPI_FILE_DOWNLOAD_PATH') +aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') +aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') +username = os.environ.get('EARTHDATA_USERNAME') +password = os.environ.get('EARTHDATA_PASSWORD') \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..39e42ba --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,9 @@ +version: '3.8' +services: + api: + container_name: "docker-fastapi" + build: . + ports: + - 8000:8000 + volumes: + - .:/usr/src/app \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..faff7db --- /dev/null +++ b/main.py @@ -0,0 +1,81 @@ +import random +from typing import Any, Dict +from fastapi import Body, FastAPI, Query, WebSocket +import string + +from starlette.middleware import Middleware +from starlette.middleware.cors import CORSMiddleware + +from src import * +from src.models.job_model import Job, Metadata, Coord +from src.login_handler import login_handler +from src.websocket_handler import ConnectionManager +from src.root_handler import root_handler +from http import HTTPStatus +from fastapi import BackgroundTasks +from src.websocket_handler import * +from src.download_handler import * +from src.file_path_handler import * +from src.status_handler import * +from src.metadata_handler import * + +# Enable CORS for all origins +middleware = [ + Middleware( + CORSMiddleware, + allow_origins=['*'], + allow_credentials=True, + allow_methods=['*'], + allow_headers=['*'] + ) +] + +# Dict as job storage +jobs: Dict[str, Job] = {} +app = FastAPI(middleware=middleware) + +# Authenticate user and allow login +login_handler() + +# Root endpoint +@app.get("/") +async def root(): + return await root_handler() + +manager = ConnectionManager() + +# Establish WebSocket connection +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + return await websocket_endpoint_handler(websocket, manager) + +# Start a download job in the background +@app.put("/start_download", status_code=HTTPStatus.ACCEPTED) +async def start_download(background_tasks: BackgroundTasks, short_name: str, date_range: list = Query([]) , concept_id: str | None = None, bounding_box: list | None = Query(None), isPangeoForge: bool | None = None): + + # Create a new Job instance and add it to the jobs dictionary + # Job should always start with a letter because we use this for pangeo forge jobid which has this specific requirement + current_job = Job() + key1 = random.choices(string.ascii_lowercase, k=1)[0] + key2 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=7)) + current_job.uid = key1+key2 + jobs[current_job.uid] = current_job + + # Add the download job to the background tasks + background_tasks.add_task(download_handler, current_job, short_name, tuple(date_range), concept_id, bounding_box, manager, isPangeoForge) + return current_job.uid + +# Check the status of a job +@app.post("/status") +async def status(uid: str): + return await status_handler(uid, jobs) + +# Get the files associated with a job +@app.post("/get_file_path") +async def get_file_path(uid: str): + return await file_path_handler(uid, jobs) + +# Display metadata associated with a job(only earthaccess) +@app.post("/get_metadata") +async def get_metadata(uid: str): + return await metadata_handler(uid, jobs) diff --git a/requirements-additional.txt b/requirements-additional.txt new file mode 100644 index 0000000..8f8fac1 --- /dev/null +++ b/requirements-additional.txt @@ -0,0 +1,5 @@ +boto3 +dask==2024.3.1 +pangeo-forge-earthdatalogin==0.2 +pangeo-forge-recipes==0.10.5 +pangeo-forge-runner==0.10.2 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..16dddbe --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +fastapi[all]==0.104.1 #includes websocket, uvicorn +earthaccess==0.7.1 +fiona==1.9.5 +xarray==2023.8.0 +geopandas==0.14.1 +python-dotenv==1.0.0 +h5netcdf==1.3.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bake.sh b/src/bake.sh new file mode 100644 index 0000000..4f8185b --- /dev/null +++ b/src/bake.sh @@ -0,0 +1 @@ +pangeo-forge-runner bake --config=$CONFIG_FILE --repo=$REPO --Bake.job_name=$JOB_NAME --prune \ No newline at end of file diff --git a/src/download_handler.py b/src/download_handler.py new file mode 100644 index 0000000..d442276 --- /dev/null +++ b/src/download_handler.py @@ -0,0 +1,56 @@ +import earthaccess as ea +from src.utils.pangeo_forge_utils import handlePangeoForge +from src.utils.earthaccess_utils import handleEarthAccess +from src.utils.utils import callWithNonNoneArgs +import asyncio + +from src.websocket_handler import ConnectionManager + +from src.models.status_model import JobStatus, Status, JobType + +def download_handler(current_job, short_name = None, date_range = None, concept_id = None, bounding_box = None, manager: ConnectionManager = None, isPangeoForge = False): + """ + Handles the download process for data based on specified parameters, supporting both EarthAccess and pangeo-forge workflows. + + Args: + current_job (Job): The current job object containing information about the job. + short_name (str): Short name identifier for the dataset. + date_range (str): String specifying the date range for the dataset query. + concept_id (str, optional): The unique identifier for the dataset, if applicable. + bounding_box (tuple, optional): Tuple specifying the geographic bounding box for the dataset query. + isPangeoForge (bool): Flag to determine if the job is for Pangeo Forge; otherwise, it's for Earth Access. + + This function sets the job status, performs a EarthAccess search with non-None parameters, + and processes results according to the specified workflow (Pangeo Forge or Earth Access). + It also handles broadcasting status updates through the specified connection manager. + """ + jobType = JobType.PANGEO_FORGE if isPangeoForge else JobType.EARTH_ACCESS + current_job.status = "Querying data" + asyncio.run(manager.broadcast(Status(jobType, current_job.uid, current_job.status))) + + # Assign bounding_box with geojson coordinates + if(bounding_box is None): + bounding_box = (-180.0, -90.0, 180.0, 90.0) + else: + bounding_box = tuple(bounding_box) + + results = callWithNonNoneArgs(ea.search_data, + concept_id = concept_id, + short_name=short_name, + cloud_hosted=True, + temporal=date_range, + bounding_box=bounding_box, + count=3 + ) + + if (results == []): + current_job.progress = 100 + current_job.status = JobStatus.NO_GRANULES + current_job.completed = True + asyncio.run(manager.broadcast(Status(jobType, current_job.uid, current_job.status))) + return + + if (isPangeoForge): + handlePangeoForge(results, current_job, manager) + else: + handleEarthAccess(results, current_job, manager) \ No newline at end of file diff --git a/src/feedstock/__init__.py b/src/feedstock/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/feedstock/meta.yaml b/src/feedstock/meta.yaml new file mode 100644 index 0000000..ec24ef3 --- /dev/null +++ b/src/feedstock/meta.yaml @@ -0,0 +1,3 @@ +recipes: + - id: earthaccess-ghrc + object: "recipe:recipe" \ No newline at end of file diff --git a/src/feedstock/recipe.py b/src/feedstock/recipe.py new file mode 100644 index 0000000..e3785ad --- /dev/null +++ b/src/feedstock/recipe.py @@ -0,0 +1,41 @@ +import aiohttp +import apache_beam as beam +import os +import sys +from pathlib import Path + +__file__ = "recipe.py" +DIR = Path(__file__).parent.absolute().as_posix() +sys.path.append(DIR) +from config import username, password + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr + +urls_file_path = f"{Path(__file__).parent.absolute().as_posix()}/src/feedstock/url_list.txt" + +outfile = open(urls_file_path, 'r') +lines = outfile.readlines() +count_lines = len(lines) +range_lines = range(count_lines) + +os.remove(urls_file_path) + +def make_url(time): + return lines[time] + +concat_dim = ConcatDim("time", range_lines) +pattern = FilePattern(make_url, concat_dim) + +# for index, url in pattern.items(): +# print(url) + +recipe = ( + beam.Create(pattern.items()) + | OpenURLWithFSSpec(open_kwargs={'client_kwargs': {"auth" : aiohttp.BasicAuth(username, password)}}) + | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"}) + | StoreToZarr( + store_name="zarr", + combine_dims=pattern.combine_dim_keys, + ) +) \ No newline at end of file diff --git a/src/feedstock/requirements.txt b/src/feedstock/requirements.txt new file mode 100644 index 0000000..b7e3a16 --- /dev/null +++ b/src/feedstock/requirements.txt @@ -0,0 +1,2 @@ +pangeo-forge-recipes==0.10.0 +apache-beam \ No newline at end of file diff --git a/src/file_path_handler.py b/src/file_path_handler.py new file mode 100644 index 0000000..ef6e218 --- /dev/null +++ b/src/file_path_handler.py @@ -0,0 +1,25 @@ +from src.models.status_model import JobStatus + +async def file_path_handler(uid, jobs): + """ + Asynchronously retrieves the file paths from the job registry if the job is completed. + + Args: + uid (str): Unique identifier for the job. + jobs (dict): Dictionary containing job statuses and data. + + Returns: + A list of file paths if available, or a job status indicating the current state or issues (e.g., not available, in progress, no granules). + """ + if uid not in jobs: + return JobStatus.NOT_AVAILABLE + + if jobs[uid].completed == False: + print(JobStatus.IN_PROGRESS) + return JobStatus.IN_PROGRESS + + if len(jobs[uid].files) == 0: + print(JobStatus.NO_GRANULES) + return JobStatus.NO_GRANULES + + return jobs[uid].files \ No newline at end of file diff --git a/src/local_config.py b/src/local_config.py new file mode 100644 index 0000000..7d1c8b9 --- /dev/null +++ b/src/local_config.py @@ -0,0 +1,23 @@ +# Let's put all our data on the same dir as this config file +from pathlib import Path +import os +HERE = Path(__file__).parent + +DATA_PREFIX = HERE / 'data' +os.makedirs(DATA_PREFIX, exist_ok=True) + +# Target output should be partitioned by job id +c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem" +c.TargetStorage.root_path = f"{DATA_PREFIX}/{{job_name}}" + +c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class +c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args + +# Input data cache should *not* be partitioned by job id, as we want to get the datafile +# from the source only once +c.InputCacheStorage.root_path = f"{DATA_PREFIX}/cache/input" + +c.MetadataCacheStorage.fsspec_class = c.TargetStorage.fsspec_class +c.MetadataCacheStorage.fsspec_args = c.TargetStorage.fsspec_args +# Metadata cache should be per job, as kwargs changing can change metadata +c.MetadataCacheStorage.root_path = f"{DATA_PREFIX}/{{job_name}}/cache/metadata" \ No newline at end of file diff --git a/src/login_handler.py b/src/login_handler.py new file mode 100644 index 0000000..481c023 --- /dev/null +++ b/src/login_handler.py @@ -0,0 +1,13 @@ +import earthaccess as ea +from dotenv import load_dotenv + +def login_handler(): + # For .netrc to load environment variables from .env file + load_dotenv() + auth = ea.login(strategy="environment", persist=True) + + # Check if the authentication is an instance of ea.Auth and if it's authenticated + if isinstance(auth, ea.Auth) and auth.authenticated: + print("Login successful") + else: + raise Exception("Login failed. Check if valid credentials are provided") diff --git a/src/metadata_handler.py b/src/metadata_handler.py new file mode 100644 index 0000000..f05e4a8 --- /dev/null +++ b/src/metadata_handler.py @@ -0,0 +1,37 @@ +import earthaccess as ea +import xarray as xr + +from src.models.status_model import JobStatus + +async def metadata_handler(uid, jobs): + """ + Asynchronously retrieves and processes the metadata for the specified job using EarthAccess API and xarray. + + Args: + uid (str): Unique identifier for the job. + jobs (dict): Dictionary of jobs which may contain granule results and other metadata. + + Returns: + A dictionary containing processed metadata or a job status; or an error message if metadata cannot be parsed. + """ + try: + if uid not in jobs: + return JobStatus.NOT_AVAILABLE + + if not jobs[uid].completed: + return JobStatus.IN_PROGRESS + + if len(jobs[uid].files) == 0: + return JobStatus.NO_GRANULES + + current_job = jobs[uid] + + # Display metadata of one granule + fileset = ea.open([current_job.result_granules[0]]) + + # Open through xarray + current_job.data = xr.open_mfdataset(fileset, decode_times=False).to_dict(data=False) + return current_job.data + + except Exception: + return "Metadata could not be parsed, ensure that the files are in NetCDF format." \ No newline at end of file diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/models/job_model.py b/src/models/job_model.py new file mode 100644 index 0000000..842cc44 --- /dev/null +++ b/src/models/job_model.py @@ -0,0 +1,27 @@ +from typing import List, Dict +from pydantic import BaseModel +import earthaccess as ea +import json + +class Coord(BaseModel): + dims: List[str] + attrs: Dict[str, str] + dtype: str + shape: List[int] + +class Metadata(BaseModel): + coords: Dict[str, Coord] + +class Job(BaseModel): + # unique uid with length=8 can't be created with Pydantic model + uid: str = "" + status: str = "in_progress" + progress: int = 0 + completed: bool = False + files: list[str] = [] # List of files downloaded + data: Metadata = {} + result_granules: List[ea.results.DataGranule] = [] # Store results of ea.search_data() + + # Type of result_granules is not in-built + class Config: + arbitrary_types_allowed = True \ No newline at end of file diff --git a/src/models/status_model.py b/src/models/status_model.py new file mode 100644 index 0000000..87ba2f1 --- /dev/null +++ b/src/models/status_model.py @@ -0,0 +1,18 @@ +from enum import Enum +from dataclasses import dataclass + +class JobType(str, Enum): + EARTH_ACCESS = "EA" + PANGEO_FORGE = "PF" + +class JobStatus(str, Enum): + NOT_AVAILABLE = "Job not available" + IN_PROGRESS = "Job in progress" + NO_GRANULES = "No granules found" + JOB_COMPLETE = "Job completed" + +@dataclass +class Status(): + jobType: JobType = JobType.EARTH_ACCESS + uid: str = "" + status: str = "" \ No newline at end of file diff --git a/src/root_handler.py b/src/root_handler.py new file mode 100644 index 0000000..11d29ed --- /dev/null +++ b/src/root_handler.py @@ -0,0 +1,2 @@ +async def root_handler(): + return {"Welcome to fcx-downloader!"} \ No newline at end of file diff --git a/src/status_handler.py b/src/status_handler.py new file mode 100644 index 0000000..f2551f9 --- /dev/null +++ b/src/status_handler.py @@ -0,0 +1,18 @@ +from src.models.status_model import JobStatus + +async def status_handler(uid, jobs): + """ + Asynchronously retrieves the status of a specified job. + + Args: + uid (str): Unique identifier for the job. + jobs (dict): Dictionary containing job details and statuses. + + Returns: + The current status of the job if available; otherwise, returns a status indicating the job is not available. + """ + if uid in jobs: + return jobs[uid].status + + return JobStatus.NOT_AVAILABLE + \ No newline at end of file diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/earthaccess_utils.py b/src/utils/earthaccess_utils.py new file mode 100644 index 0000000..70c0fce --- /dev/null +++ b/src/utils/earthaccess_utils.py @@ -0,0 +1,56 @@ +import boto3 +import logging +import earthaccess as ea +import asyncio +import os +from src.models.status_model import JobType, Status, JobStatus + +from config import cloudfrontUrl, s3BucketName, fileDownloadPath, aws_access_key_id, aws_secret_access_key + +# Uploads file to S3 and returns the CloudFront URL if successful +def s3_upload_earthaccess(file, uid, manager): + s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key) + file_name = os.path.split(file)[1] + key=f'{uid}/{file_name}' + status = f"Uploading {file_name} to S3" + asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, uid, status))) + + try: + s3_client.upload_file(file, s3BucketName, key) + except Exception as e: + logging.exception(e) + return "" + + return f"{cloudfrontUrl}/{key}" + +# Downloads result granules and uploads to S3(Main EarthAccess function) +def handleEarthAccess(results, current_job, manager): + print("In handleEarthAccess") + current_job.result_granules = results + current_job.progress = 0 + + # CHECK: ea should tell about calculating percentage, can't use external libraries as this is unique for each case. + # might be available for s3 upload, % for downloading from EA should be given by EA + remote_files = [] + for result in results: + current_job.progress += 1 + current_job.status = f"In progress - downloading files {current_job.progress}/{len(results)}" + asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status))) + + # download from CMR to a local path + result_file = ea.download(granules=result,local_path=fileDownloadPath) + response = s3_upload_earthaccess(result_file[0], current_job.uid, manager) + if (len(response) == 0): + current_job.status = "File could not be uploaded to S3" + asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status))) + current_job.completed = True + return + + # Upload the downloaded file to S3 and get the S3 URL + remote_files.append(response) + + current_job.files = remote_files + current_job.completed = True + current_job.status = JobStatus.JOB_COMPLETE + print(current_job.completed, current_job.status) + asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status))) \ No newline at end of file diff --git a/src/utils/pangeo_forge_utils.py b/src/utils/pangeo_forge_utils.py new file mode 100644 index 0000000..f24cb14 --- /dev/null +++ b/src/utils/pangeo_forge_utils.py @@ -0,0 +1,122 @@ +import asyncio +import os +from pathlib import Path +import subprocess +import s3fs +from config import cloudfrontUrl, s3BucketName, aws_access_key_id, aws_secret_access_key +from zipfile import ZipFile +import boto3 + +from src.models.status_model import JobType, Status, JobStatus + +def get_all_file_paths(directory): + """ + Collects and returns all file paths within the specified directory, including its subdirectories. + """ + file_paths = [] + for root, directories, files in os.walk(directory): + for filename in files: + filepath = os.path.join(root, filename) + file_paths.append(filepath) + + return file_paths + +def zip_zarr_files(zip_zarr_path, zarr_path): + """ + Zips all zarr files located in the specified directory into a single zip file. + """ + file_paths = get_all_file_paths(zarr_path) + with ZipFile(zip_zarr_path,'w') as zip: + for file in file_paths: + zip.write(file, "./" + "/".join(file.split('/')[-2:])) + +def upload_file_to_s3(file_name, object_name): + """ + Uploads file to S3 and returns the CloudFront URL if successful. + """ + s3_client = boto3.client('s3') + try: + response = s3_client.upload_file(file_name, s3BucketName, object_name) + if response == None: + print(f"Upload Successful") + return f"{cloudfrontUrl}/{object_name}" + except Exception as e: + print(f"Error uploading file: {e}") + return "" + return "" + +def s3_upload_pangeoforge(dir, uid, manager): + """ + Uploads a directory to S3 and broadcasts the upload status. + """ + status = f"Uploading {dir} to S3" + asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, uid, status))) + + s3_file = s3fs.S3FileSystem(anon=False, key=aws_access_key_id, secret=aws_secret_access_key) + s3_path = f"{s3BucketName}/{uid}" + s3_file.put(dir, s3_path, recursive=True) + + return f"{cloudfrontUrl}/{uid}" + +# Main PangeoForge function +def handlePangeoForge(results, current_job, manager): + """ + Processes result granules, generates zipped zarr files and uploads to S3. + """ + data_link_list = [] + dir_path = f"{Path(__file__).parent.parent.absolute().as_posix()}/feedstock" + + for granule in results: + for asset in granule.data_links(): + if (".nc" in asset): + data_link_list.append(asset) + + with open(f"{dir_path}/url_list.txt", 'w') as f: + for line in data_link_list: + f.write(f"{line}\n") + + current_job.status = "Granules found, generating zarr files" + asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status))) + + DIR = Path(__file__).parent.parent.absolute().as_posix() + bake_script = f"{DIR}/bake.sh" + zarr_path = f"{DIR}/data/{current_job.uid}/zarr" + + # following pangeo forge documentation + cmd = ["sh", bake_script] + env = os.environ.copy() | { + "REPO": f"{DIR}", + "CONFIG_FILE": f"{DIR}/local_config.py", + "JOB_NAME": current_job.uid, + } + + # show live output + proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, env=env, text=True) + while (line := proc.stdout.readline()) != "": + print(line) + + if(len(os.listdir(zarr_path)) > 0): + current_job.status = "Generated zarr files" + asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status))) + + zip_zarr_path = f"{DIR}/data/{current_job.uid}/{current_job.uid}.zip" + zip_zarr_files(zip_zarr_path, zarr_path) + + response = upload_file_to_s3(zip_zarr_path, f"{current_job.uid}.zip") + if (len(response) == 0): + current_job.status = "Failed uploading zarr to S3" + asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status))) + current_job.completed = True + return + else: + current_job.status = "Zarr files could not be generated" + current_job.completed = True + asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status))) + return + + remote_files = [] #similar to EA as FE expects array not string path + remote_files.append(response) + current_job.files = remote_files + current_job.completed = True + current_job.status = JobStatus.JOB_COMPLETE + asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status))) \ No newline at end of file diff --git a/src/utils/utils.py b/src/utils/utils.py new file mode 100644 index 0000000..094f8e0 --- /dev/null +++ b/src/utils/utils.py @@ -0,0 +1,6 @@ +def callWithNonNoneArgs(f, *args, **kwargs): + """ + Calls a function, passing through only those keyword arguments that are not None. + """ + kwargsNotNone = {k: v for k, v in kwargs.items() if v is not None} + return f(*args, **kwargsNotNone) \ No newline at end of file diff --git a/src/websocket_handler.py b/src/websocket_handler.py new file mode 100644 index 0000000..ffa4aa5 --- /dev/null +++ b/src/websocket_handler.py @@ -0,0 +1,52 @@ +import dataclasses +import json +from fastapi import WebSocket, WebSocketDisconnect + +from src.models.status_model import Status + +status = Status() + +class ConnectionManager: + """ + Manages WebSocket connections, allowing for connection handling, messaging, and broadcasting. + """ + # Initializes the ConnectionManager with an empty list for storing active WebSocket connections + def __init__(self): + self.active_connections: list[WebSocket] = [] + + # Asynchronously accepts a new WebSocket connection and adds it to the list of active connections + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + + # Removes a WebSocket connection from the list of active connections + def disconnect(self, websocket: WebSocket): + self.active_connections.remove(websocket) + + # Asynchronously sends a message to a specified WebSocket + async def send_personal_message(self, message: str, websocket: WebSocket): + await websocket.send_text(message) + + # Asynchronously sends a broadcast message to all active WebSocket connections + async def broadcast(self, status): + message = json.dumps(dataclasses.asdict(status)) + for connection in self.active_connections: + await connection.send_text(message) + +async def websocket_endpoint_handler(websocket, manager): + """ + Asynchronously handles incoming WebSocket connections and messages, + and manages connection events such as connect, disconnect, and messaging. + """ + await manager.connect(websocket) + try: + while True: + data = await websocket.receive_text() + if(data == "Disconnect"): + manager.disconnect(websocket) + await websocket.close() + return + else: + await manager.send_personal_message(f"Unrecognised message: {data}", websocket) + except WebSocketDisconnect: + manager.disconnect(websocket)