diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..752d66f
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,4 @@
+**/__pycache__/
+*.py[cod]
+src/data/
+.DS_Store
diff --git a/.env.example b/.env.example
new file mode 100644
index 0000000..06102cf
--- /dev/null
+++ b/.env.example
@@ -0,0 +1,12 @@
+AWS_ACCESS_KEY_ID=
+AWS_SECRET_ACCESS_KEY=
+
+EA_FASTAPI_S3_URL=s3://fcx-downloader/
+EA_FASTAPI_S3_BUCKET=fcx-downloader
+EA_FASTAPI_S3_OBJECT_URL=https://fcx-downloader.s3.amazonaws.com/
+EA_FASTAPI_CLOUDFRONT_URL=https://d1ntjpytjzo3fx.cloudfront.net
+EA_FASTAPI_FILE_DOWNLOAD_PATH=
+
+MACHINE=urs.earthdata.nasa.gov
+EARTHDATA_USERNAME=
+EARTHDATA_PASSWORD=
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 9db6f38..6ab7976 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,5 @@
-folder
+.env
+**/__pycache__/
+*.py[cod]
+src/data/
+.DS_Store
\ No newline at end of file
diff --git a/.idea/EA-fastapi.iml b/.idea/EA-fastapi.iml
new file mode 100644
index 0000000..7a78d6f
--- /dev/null
+++ b/.idea/EA-fastapi.iml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..1757c3b
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 0000000..80961f9
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,110 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {
+ "associatedIndex": 7
+}
+
+
+
+
+
+ {
+ "keyToString": {
+ "ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true",
+ "Python.stl_viewer_python.executor": "Run",
+ "RunOnceActivity.ShowReadmeOnStart": "true",
+ "git-widget-placeholder": "GHRCCLOUD-6449",
+ "last_opened_file_path": "C:/Users/nselvaraj/github/EA-fastapi/src",
+ "node.js.detected.package.eslint": "true",
+ "node.js.detected.package.tslint": "true",
+ "node.js.selected.package.eslint": "(autodetect)",
+ "node.js.selected.package.tslint": "(autodetect)",
+ "nodejs_package_manager_path": "npm",
+ "settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable",
+ "vue.rearranger.settings.migration": "true"
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1724100864067
+
+
+ 1724100864067
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..4a22a37
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,15 @@
+FROM ghcr.io/osgeo/gdal:ubuntu-small-3.9.1
+RUN apt-get update && apt-get -y install python3-pip --fix-missing
+COPY requirements.txt .
+COPY requirements-additional.txt .
+# RUN ln -s /usr/include/hdf5/serial /usr/include/hdf5/include
+# RUN export HDF5_DIR=/usr/include/hdf5
+# RUN pip install versioned-hdf5 --break-system-packages
+RUN apt-get -y install pkg-config libhdf5-dev
+RUN apt-get -y install python3.12-venv
+RUN pip install --no-binary=h5py h5py --break-system-packages
+RUN pip install -r requirements.txt --break-system-packages
+RUN pip install -r requirements-additional.txt --break-system-packages
+COPY . .
+EXPOSE 8000
+CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..f63ba38
--- /dev/null
+++ b/README.md
@@ -0,0 +1,30 @@
+# ea-fastapi
+
+In this repo, FastAPI is used as a wrapper around EarthAccess library. There are 4 endpoints defined which can be validated in Postman for tracking,
+1. start_download - Downloads files from search result locally and uploads them to S3. Returns a unique job id of 8 characters.
+2. status - Returns latest status of the job.
+3. get_file_path - Returns S3 path of the uploaded granules.
+4. get_metadata - Displays the metadata of granules.
+
+## Pre-requisites
+
+Accounts in AWS and EarthData are necessary for using this wrapper application. The credentials for these accounts should be entered in .env file.
+Postman and Docker needs to be installed.
+
+## Steps to Use
+
+Run the following commands:
+
+`docker build -t .`
+
+`docker run -p 8000:8000 `
+
+Once the Docker container is up and running, the following endpoints can be used to send GET/POST/PUT requests from http://0.0.0.0:8000/:
+
+| Request type | Endpoint | Parameters |
+| --- | --- | --- |
+| GET | / | N/A |
+| PUT | /start_download | short_name, date_range, bounding_box(coordinates can be entered manually or passed as a text in Request Body), concept_id(optional) |
+| POST | /status | job_id |
+| POST | /get_file_path | job_id |
+| POST | /get_metadata | job_id |
diff --git a/build_and_docker.sh b/build_and_docker.sh
new file mode 100644
index 0000000..58eb920
--- /dev/null
+++ b/build_and_docker.sh
@@ -0,0 +1,4 @@
+docker build -t fcx-fastapi .
+docker stop fcx-fastapi-container
+docker rm fcx-fastapi-container
+docker run -d --name fcx-fastapi-container -p80:80 fcx-fastapi
\ No newline at end of file
diff --git a/config.py b/config.py
new file mode 100644
index 0000000..f76cfd6
--- /dev/null
+++ b/config.py
@@ -0,0 +1,14 @@
+from dotenv import load_dotenv
+import os
+
+load_dotenv()
+
+s3Url = os.environ.get('EA_FASTAPI_S3_URL')
+cloudfrontUrl = os.environ.get('EA_FASTAPI_CLOUDFRONT_URL')
+s3BucketName = os.environ.get('EA_FASTAPI_S3_BUCKET')
+s3ObjectUrl = os.environ.get('EA_FASTAPI_S3_OBJECT_URL')
+fileDownloadPath = os.environ.get('EA_FASTAPI_FILE_DOWNLOAD_PATH')
+aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
+aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
+username = os.environ.get('EARTHDATA_USERNAME')
+password = os.environ.get('EARTHDATA_PASSWORD')
\ No newline at end of file
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..39e42ba
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,9 @@
+version: '3.8'
+services:
+ api:
+ container_name: "docker-fastapi"
+ build: .
+ ports:
+ - 8000:8000
+ volumes:
+ - .:/usr/src/app
\ No newline at end of file
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..faff7db
--- /dev/null
+++ b/main.py
@@ -0,0 +1,81 @@
+import random
+from typing import Any, Dict
+from fastapi import Body, FastAPI, Query, WebSocket
+import string
+
+from starlette.middleware import Middleware
+from starlette.middleware.cors import CORSMiddleware
+
+from src import *
+from src.models.job_model import Job, Metadata, Coord
+from src.login_handler import login_handler
+from src.websocket_handler import ConnectionManager
+from src.root_handler import root_handler
+from http import HTTPStatus
+from fastapi import BackgroundTasks
+from src.websocket_handler import *
+from src.download_handler import *
+from src.file_path_handler import *
+from src.status_handler import *
+from src.metadata_handler import *
+
+# Enable CORS for all origins
+middleware = [
+ Middleware(
+ CORSMiddleware,
+ allow_origins=['*'],
+ allow_credentials=True,
+ allow_methods=['*'],
+ allow_headers=['*']
+ )
+]
+
+# Dict as job storage
+jobs: Dict[str, Job] = {}
+app = FastAPI(middleware=middleware)
+
+# Authenticate user and allow login
+login_handler()
+
+# Root endpoint
+@app.get("/")
+async def root():
+ return await root_handler()
+
+manager = ConnectionManager()
+
+# Establish WebSocket connection
+@app.websocket("/ws")
+async def websocket_endpoint(websocket: WebSocket):
+ return await websocket_endpoint_handler(websocket, manager)
+
+# Start a download job in the background
+@app.put("/start_download", status_code=HTTPStatus.ACCEPTED)
+async def start_download(background_tasks: BackgroundTasks, short_name: str, date_range: list = Query([]) , concept_id: str | None = None, bounding_box: list | None = Query(None), isPangeoForge: bool | None = None):
+
+ # Create a new Job instance and add it to the jobs dictionary
+ # Job should always start with a letter because we use this for pangeo forge jobid which has this specific requirement
+ current_job = Job()
+ key1 = random.choices(string.ascii_lowercase, k=1)[0]
+ key2 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=7))
+ current_job.uid = key1+key2
+ jobs[current_job.uid] = current_job
+
+ # Add the download job to the background tasks
+ background_tasks.add_task(download_handler, current_job, short_name, tuple(date_range), concept_id, bounding_box, manager, isPangeoForge)
+ return current_job.uid
+
+# Check the status of a job
+@app.post("/status")
+async def status(uid: str):
+ return await status_handler(uid, jobs)
+
+# Get the files associated with a job
+@app.post("/get_file_path")
+async def get_file_path(uid: str):
+ return await file_path_handler(uid, jobs)
+
+# Display metadata associated with a job(only earthaccess)
+@app.post("/get_metadata")
+async def get_metadata(uid: str):
+ return await metadata_handler(uid, jobs)
diff --git a/requirements-additional.txt b/requirements-additional.txt
new file mode 100644
index 0000000..8f8fac1
--- /dev/null
+++ b/requirements-additional.txt
@@ -0,0 +1,5 @@
+boto3
+dask==2024.3.1
+pangeo-forge-earthdatalogin==0.2
+pangeo-forge-recipes==0.10.5
+pangeo-forge-runner==0.10.2
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..16dddbe
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,7 @@
+fastapi[all]==0.104.1 #includes websocket, uvicorn
+earthaccess==0.7.1
+fiona==1.9.5
+xarray==2023.8.0
+geopandas==0.14.1
+python-dotenv==1.0.0
+h5netcdf==1.3.0
\ No newline at end of file
diff --git a/src/__init__.py b/src/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/bake.sh b/src/bake.sh
new file mode 100644
index 0000000..4f8185b
--- /dev/null
+++ b/src/bake.sh
@@ -0,0 +1 @@
+pangeo-forge-runner bake --config=$CONFIG_FILE --repo=$REPO --Bake.job_name=$JOB_NAME --prune
\ No newline at end of file
diff --git a/src/download_handler.py b/src/download_handler.py
new file mode 100644
index 0000000..d442276
--- /dev/null
+++ b/src/download_handler.py
@@ -0,0 +1,56 @@
+import earthaccess as ea
+from src.utils.pangeo_forge_utils import handlePangeoForge
+from src.utils.earthaccess_utils import handleEarthAccess
+from src.utils.utils import callWithNonNoneArgs
+import asyncio
+
+from src.websocket_handler import ConnectionManager
+
+from src.models.status_model import JobStatus, Status, JobType
+
+def download_handler(current_job, short_name = None, date_range = None, concept_id = None, bounding_box = None, manager: ConnectionManager = None, isPangeoForge = False):
+ """
+ Handles the download process for data based on specified parameters, supporting both EarthAccess and pangeo-forge workflows.
+
+ Args:
+ current_job (Job): The current job object containing information about the job.
+ short_name (str): Short name identifier for the dataset.
+ date_range (str): String specifying the date range for the dataset query.
+ concept_id (str, optional): The unique identifier for the dataset, if applicable.
+ bounding_box (tuple, optional): Tuple specifying the geographic bounding box for the dataset query.
+ isPangeoForge (bool): Flag to determine if the job is for Pangeo Forge; otherwise, it's for Earth Access.
+
+ This function sets the job status, performs a EarthAccess search with non-None parameters,
+ and processes results according to the specified workflow (Pangeo Forge or Earth Access).
+ It also handles broadcasting status updates through the specified connection manager.
+ """
+ jobType = JobType.PANGEO_FORGE if isPangeoForge else JobType.EARTH_ACCESS
+ current_job.status = "Querying data"
+ asyncio.run(manager.broadcast(Status(jobType, current_job.uid, current_job.status)))
+
+ # Assign bounding_box with geojson coordinates
+ if(bounding_box is None):
+ bounding_box = (-180.0, -90.0, 180.0, 90.0)
+ else:
+ bounding_box = tuple(bounding_box)
+
+ results = callWithNonNoneArgs(ea.search_data,
+ concept_id = concept_id,
+ short_name=short_name,
+ cloud_hosted=True,
+ temporal=date_range,
+ bounding_box=bounding_box,
+ count=3
+ )
+
+ if (results == []):
+ current_job.progress = 100
+ current_job.status = JobStatus.NO_GRANULES
+ current_job.completed = True
+ asyncio.run(manager.broadcast(Status(jobType, current_job.uid, current_job.status)))
+ return
+
+ if (isPangeoForge):
+ handlePangeoForge(results, current_job, manager)
+ else:
+ handleEarthAccess(results, current_job, manager)
\ No newline at end of file
diff --git a/src/feedstock/__init__.py b/src/feedstock/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/feedstock/meta.yaml b/src/feedstock/meta.yaml
new file mode 100644
index 0000000..ec24ef3
--- /dev/null
+++ b/src/feedstock/meta.yaml
@@ -0,0 +1,3 @@
+recipes:
+ - id: earthaccess-ghrc
+ object: "recipe:recipe"
\ No newline at end of file
diff --git a/src/feedstock/recipe.py b/src/feedstock/recipe.py
new file mode 100644
index 0000000..e3785ad
--- /dev/null
+++ b/src/feedstock/recipe.py
@@ -0,0 +1,41 @@
+import aiohttp
+import apache_beam as beam
+import os
+import sys
+from pathlib import Path
+
+__file__ = "recipe.py"
+DIR = Path(__file__).parent.absolute().as_posix()
+sys.path.append(DIR)
+from config import username, password
+
+from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
+from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
+
+urls_file_path = f"{Path(__file__).parent.absolute().as_posix()}/src/feedstock/url_list.txt"
+
+outfile = open(urls_file_path, 'r')
+lines = outfile.readlines()
+count_lines = len(lines)
+range_lines = range(count_lines)
+
+os.remove(urls_file_path)
+
+def make_url(time):
+ return lines[time]
+
+concat_dim = ConcatDim("time", range_lines)
+pattern = FilePattern(make_url, concat_dim)
+
+# for index, url in pattern.items():
+# print(url)
+
+recipe = (
+ beam.Create(pattern.items())
+ | OpenURLWithFSSpec(open_kwargs={'client_kwargs': {"auth" : aiohttp.BasicAuth(username, password)}})
+ | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
+ | StoreToZarr(
+ store_name="zarr",
+ combine_dims=pattern.combine_dim_keys,
+ )
+)
\ No newline at end of file
diff --git a/src/feedstock/requirements.txt b/src/feedstock/requirements.txt
new file mode 100644
index 0000000..b7e3a16
--- /dev/null
+++ b/src/feedstock/requirements.txt
@@ -0,0 +1,2 @@
+pangeo-forge-recipes==0.10.0
+apache-beam
\ No newline at end of file
diff --git a/src/file_path_handler.py b/src/file_path_handler.py
new file mode 100644
index 0000000..ef6e218
--- /dev/null
+++ b/src/file_path_handler.py
@@ -0,0 +1,25 @@
+from src.models.status_model import JobStatus
+
+async def file_path_handler(uid, jobs):
+ """
+ Asynchronously retrieves the file paths from the job registry if the job is completed.
+
+ Args:
+ uid (str): Unique identifier for the job.
+ jobs (dict): Dictionary containing job statuses and data.
+
+ Returns:
+ A list of file paths if available, or a job status indicating the current state or issues (e.g., not available, in progress, no granules).
+ """
+ if uid not in jobs:
+ return JobStatus.NOT_AVAILABLE
+
+ if jobs[uid].completed == False:
+ print(JobStatus.IN_PROGRESS)
+ return JobStatus.IN_PROGRESS
+
+ if len(jobs[uid].files) == 0:
+ print(JobStatus.NO_GRANULES)
+ return JobStatus.NO_GRANULES
+
+ return jobs[uid].files
\ No newline at end of file
diff --git a/src/local_config.py b/src/local_config.py
new file mode 100644
index 0000000..7d1c8b9
--- /dev/null
+++ b/src/local_config.py
@@ -0,0 +1,23 @@
+# Let's put all our data on the same dir as this config file
+from pathlib import Path
+import os
+HERE = Path(__file__).parent
+
+DATA_PREFIX = HERE / 'data'
+os.makedirs(DATA_PREFIX, exist_ok=True)
+
+# Target output should be partitioned by job id
+c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
+c.TargetStorage.root_path = f"{DATA_PREFIX}/{{job_name}}"
+
+c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
+c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
+
+# Input data cache should *not* be partitioned by job id, as we want to get the datafile
+# from the source only once
+c.InputCacheStorage.root_path = f"{DATA_PREFIX}/cache/input"
+
+c.MetadataCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
+c.MetadataCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
+# Metadata cache should be per job, as kwargs changing can change metadata
+c.MetadataCacheStorage.root_path = f"{DATA_PREFIX}/{{job_name}}/cache/metadata"
\ No newline at end of file
diff --git a/src/login_handler.py b/src/login_handler.py
new file mode 100644
index 0000000..481c023
--- /dev/null
+++ b/src/login_handler.py
@@ -0,0 +1,13 @@
+import earthaccess as ea
+from dotenv import load_dotenv
+
+def login_handler():
+ # For .netrc to load environment variables from .env file
+ load_dotenv()
+ auth = ea.login(strategy="environment", persist=True)
+
+ # Check if the authentication is an instance of ea.Auth and if it's authenticated
+ if isinstance(auth, ea.Auth) and auth.authenticated:
+ print("Login successful")
+ else:
+ raise Exception("Login failed. Check if valid credentials are provided")
diff --git a/src/metadata_handler.py b/src/metadata_handler.py
new file mode 100644
index 0000000..f05e4a8
--- /dev/null
+++ b/src/metadata_handler.py
@@ -0,0 +1,37 @@
+import earthaccess as ea
+import xarray as xr
+
+from src.models.status_model import JobStatus
+
+async def metadata_handler(uid, jobs):
+ """
+ Asynchronously retrieves and processes the metadata for the specified job using EarthAccess API and xarray.
+
+ Args:
+ uid (str): Unique identifier for the job.
+ jobs (dict): Dictionary of jobs which may contain granule results and other metadata.
+
+ Returns:
+ A dictionary containing processed metadata or a job status; or an error message if metadata cannot be parsed.
+ """
+ try:
+ if uid not in jobs:
+ return JobStatus.NOT_AVAILABLE
+
+ if not jobs[uid].completed:
+ return JobStatus.IN_PROGRESS
+
+ if len(jobs[uid].files) == 0:
+ return JobStatus.NO_GRANULES
+
+ current_job = jobs[uid]
+
+ # Display metadata of one granule
+ fileset = ea.open([current_job.result_granules[0]])
+
+ # Open through xarray
+ current_job.data = xr.open_mfdataset(fileset, decode_times=False).to_dict(data=False)
+ return current_job.data
+
+ except Exception:
+ return "Metadata could not be parsed, ensure that the files are in NetCDF format."
\ No newline at end of file
diff --git a/src/models/__init__.py b/src/models/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/models/job_model.py b/src/models/job_model.py
new file mode 100644
index 0000000..842cc44
--- /dev/null
+++ b/src/models/job_model.py
@@ -0,0 +1,27 @@
+from typing import List, Dict
+from pydantic import BaseModel
+import earthaccess as ea
+import json
+
+class Coord(BaseModel):
+ dims: List[str]
+ attrs: Dict[str, str]
+ dtype: str
+ shape: List[int]
+
+class Metadata(BaseModel):
+ coords: Dict[str, Coord]
+
+class Job(BaseModel):
+ # unique uid with length=8 can't be created with Pydantic model
+ uid: str = ""
+ status: str = "in_progress"
+ progress: int = 0
+ completed: bool = False
+ files: list[str] = [] # List of files downloaded
+ data: Metadata = {}
+ result_granules: List[ea.results.DataGranule] = [] # Store results of ea.search_data()
+
+ # Type of result_granules is not in-built
+ class Config:
+ arbitrary_types_allowed = True
\ No newline at end of file
diff --git a/src/models/status_model.py b/src/models/status_model.py
new file mode 100644
index 0000000..87ba2f1
--- /dev/null
+++ b/src/models/status_model.py
@@ -0,0 +1,18 @@
+from enum import Enum
+from dataclasses import dataclass
+
+class JobType(str, Enum):
+ EARTH_ACCESS = "EA"
+ PANGEO_FORGE = "PF"
+
+class JobStatus(str, Enum):
+ NOT_AVAILABLE = "Job not available"
+ IN_PROGRESS = "Job in progress"
+ NO_GRANULES = "No granules found"
+ JOB_COMPLETE = "Job completed"
+
+@dataclass
+class Status():
+ jobType: JobType = JobType.EARTH_ACCESS
+ uid: str = ""
+ status: str = ""
\ No newline at end of file
diff --git a/src/root_handler.py b/src/root_handler.py
new file mode 100644
index 0000000..11d29ed
--- /dev/null
+++ b/src/root_handler.py
@@ -0,0 +1,2 @@
+async def root_handler():
+ return {"Welcome to fcx-downloader!"}
\ No newline at end of file
diff --git a/src/status_handler.py b/src/status_handler.py
new file mode 100644
index 0000000..f2551f9
--- /dev/null
+++ b/src/status_handler.py
@@ -0,0 +1,18 @@
+from src.models.status_model import JobStatus
+
+async def status_handler(uid, jobs):
+ """
+ Asynchronously retrieves the status of a specified job.
+
+ Args:
+ uid (str): Unique identifier for the job.
+ jobs (dict): Dictionary containing job details and statuses.
+
+ Returns:
+ The current status of the job if available; otherwise, returns a status indicating the job is not available.
+ """
+ if uid in jobs:
+ return jobs[uid].status
+
+ return JobStatus.NOT_AVAILABLE
+
\ No newline at end of file
diff --git a/src/utils/__init__.py b/src/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/utils/earthaccess_utils.py b/src/utils/earthaccess_utils.py
new file mode 100644
index 0000000..70c0fce
--- /dev/null
+++ b/src/utils/earthaccess_utils.py
@@ -0,0 +1,56 @@
+import boto3
+import logging
+import earthaccess as ea
+import asyncio
+import os
+from src.models.status_model import JobType, Status, JobStatus
+
+from config import cloudfrontUrl, s3BucketName, fileDownloadPath, aws_access_key_id, aws_secret_access_key
+
+# Uploads file to S3 and returns the CloudFront URL if successful
+def s3_upload_earthaccess(file, uid, manager):
+ s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
+ file_name = os.path.split(file)[1]
+ key=f'{uid}/{file_name}'
+ status = f"Uploading {file_name} to S3"
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, uid, status)))
+
+ try:
+ s3_client.upload_file(file, s3BucketName, key)
+ except Exception as e:
+ logging.exception(e)
+ return ""
+
+ return f"{cloudfrontUrl}/{key}"
+
+# Downloads result granules and uploads to S3(Main EarthAccess function)
+def handleEarthAccess(results, current_job, manager):
+ print("In handleEarthAccess")
+ current_job.result_granules = results
+ current_job.progress = 0
+
+ # CHECK: ea should tell about calculating percentage, can't use external libraries as this is unique for each case.
+ # might be available for s3 upload, % for downloading from EA should be given by EA
+ remote_files = []
+ for result in results:
+ current_job.progress += 1
+ current_job.status = f"In progress - downloading files {current_job.progress}/{len(results)}"
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status)))
+
+ # download from CMR to a local path
+ result_file = ea.download(granules=result,local_path=fileDownloadPath)
+ response = s3_upload_earthaccess(result_file[0], current_job.uid, manager)
+ if (len(response) == 0):
+ current_job.status = "File could not be uploaded to S3"
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status)))
+ current_job.completed = True
+ return
+
+ # Upload the downloaded file to S3 and get the S3 URL
+ remote_files.append(response)
+
+ current_job.files = remote_files
+ current_job.completed = True
+ current_job.status = JobStatus.JOB_COMPLETE
+ print(current_job.completed, current_job.status)
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status)))
\ No newline at end of file
diff --git a/src/utils/pangeo_forge_utils.py b/src/utils/pangeo_forge_utils.py
new file mode 100644
index 0000000..f24cb14
--- /dev/null
+++ b/src/utils/pangeo_forge_utils.py
@@ -0,0 +1,122 @@
+import asyncio
+import os
+from pathlib import Path
+import subprocess
+import s3fs
+from config import cloudfrontUrl, s3BucketName, aws_access_key_id, aws_secret_access_key
+from zipfile import ZipFile
+import boto3
+
+from src.models.status_model import JobType, Status, JobStatus
+
+def get_all_file_paths(directory):
+ """
+ Collects and returns all file paths within the specified directory, including its subdirectories.
+ """
+ file_paths = []
+ for root, directories, files in os.walk(directory):
+ for filename in files:
+ filepath = os.path.join(root, filename)
+ file_paths.append(filepath)
+
+ return file_paths
+
+def zip_zarr_files(zip_zarr_path, zarr_path):
+ """
+ Zips all zarr files located in the specified directory into a single zip file.
+ """
+ file_paths = get_all_file_paths(zarr_path)
+ with ZipFile(zip_zarr_path,'w') as zip:
+ for file in file_paths:
+ zip.write(file, "./" + "/".join(file.split('/')[-2:]))
+
+def upload_file_to_s3(file_name, object_name):
+ """
+ Uploads file to S3 and returns the CloudFront URL if successful.
+ """
+ s3_client = boto3.client('s3')
+ try:
+ response = s3_client.upload_file(file_name, s3BucketName, object_name)
+ if response == None:
+ print(f"Upload Successful")
+ return f"{cloudfrontUrl}/{object_name}"
+ except Exception as e:
+ print(f"Error uploading file: {e}")
+ return ""
+ return ""
+
+def s3_upload_pangeoforge(dir, uid, manager):
+ """
+ Uploads a directory to S3 and broadcasts the upload status.
+ """
+ status = f"Uploading {dir} to S3"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, uid, status)))
+
+ s3_file = s3fs.S3FileSystem(anon=False, key=aws_access_key_id, secret=aws_secret_access_key)
+ s3_path = f"{s3BucketName}/{uid}"
+ s3_file.put(dir, s3_path, recursive=True)
+
+ return f"{cloudfrontUrl}/{uid}"
+
+# Main PangeoForge function
+def handlePangeoForge(results, current_job, manager):
+ """
+ Processes result granules, generates zipped zarr files and uploads to S3.
+ """
+ data_link_list = []
+ dir_path = f"{Path(__file__).parent.parent.absolute().as_posix()}/feedstock"
+
+ for granule in results:
+ for asset in granule.data_links():
+ if (".nc" in asset):
+ data_link_list.append(asset)
+
+ with open(f"{dir_path}/url_list.txt", 'w') as f:
+ for line in data_link_list:
+ f.write(f"{line}\n")
+
+ current_job.status = "Granules found, generating zarr files"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+
+ DIR = Path(__file__).parent.parent.absolute().as_posix()
+ bake_script = f"{DIR}/bake.sh"
+ zarr_path = f"{DIR}/data/{current_job.uid}/zarr"
+
+ # following pangeo forge documentation
+ cmd = ["sh", bake_script]
+ env = os.environ.copy() | {
+ "REPO": f"{DIR}",
+ "CONFIG_FILE": f"{DIR}/local_config.py",
+ "JOB_NAME": current_job.uid,
+ }
+
+ # show live output
+ proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, env=env, text=True)
+ while (line := proc.stdout.readline()) != "":
+ print(line)
+
+ if(len(os.listdir(zarr_path)) > 0):
+ current_job.status = "Generated zarr files"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+
+ zip_zarr_path = f"{DIR}/data/{current_job.uid}/{current_job.uid}.zip"
+ zip_zarr_files(zip_zarr_path, zarr_path)
+
+ response = upload_file_to_s3(zip_zarr_path, f"{current_job.uid}.zip")
+ if (len(response) == 0):
+ current_job.status = "Failed uploading zarr to S3"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+ current_job.completed = True
+ return
+ else:
+ current_job.status = "Zarr files could not be generated"
+ current_job.completed = True
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+ return
+
+ remote_files = [] #similar to EA as FE expects array not string path
+ remote_files.append(response)
+ current_job.files = remote_files
+ current_job.completed = True
+ current_job.status = JobStatus.JOB_COMPLETE
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
\ No newline at end of file
diff --git a/src/utils/utils.py b/src/utils/utils.py
new file mode 100644
index 0000000..094f8e0
--- /dev/null
+++ b/src/utils/utils.py
@@ -0,0 +1,6 @@
+def callWithNonNoneArgs(f, *args, **kwargs):
+ """
+ Calls a function, passing through only those keyword arguments that are not None.
+ """
+ kwargsNotNone = {k: v for k, v in kwargs.items() if v is not None}
+ return f(*args, **kwargsNotNone)
\ No newline at end of file
diff --git a/src/websocket_handler.py b/src/websocket_handler.py
new file mode 100644
index 0000000..ffa4aa5
--- /dev/null
+++ b/src/websocket_handler.py
@@ -0,0 +1,52 @@
+import dataclasses
+import json
+from fastapi import WebSocket, WebSocketDisconnect
+
+from src.models.status_model import Status
+
+status = Status()
+
+class ConnectionManager:
+ """
+ Manages WebSocket connections, allowing for connection handling, messaging, and broadcasting.
+ """
+ # Initializes the ConnectionManager with an empty list for storing active WebSocket connections
+ def __init__(self):
+ self.active_connections: list[WebSocket] = []
+
+ # Asynchronously accepts a new WebSocket connection and adds it to the list of active connections
+ async def connect(self, websocket: WebSocket):
+ await websocket.accept()
+ self.active_connections.append(websocket)
+
+ # Removes a WebSocket connection from the list of active connections
+ def disconnect(self, websocket: WebSocket):
+ self.active_connections.remove(websocket)
+
+ # Asynchronously sends a message to a specified WebSocket
+ async def send_personal_message(self, message: str, websocket: WebSocket):
+ await websocket.send_text(message)
+
+ # Asynchronously sends a broadcast message to all active WebSocket connections
+ async def broadcast(self, status):
+ message = json.dumps(dataclasses.asdict(status))
+ for connection in self.active_connections:
+ await connection.send_text(message)
+
+async def websocket_endpoint_handler(websocket, manager):
+ """
+ Asynchronously handles incoming WebSocket connections and messages,
+ and manages connection events such as connect, disconnect, and messaging.
+ """
+ await manager.connect(websocket)
+ try:
+ while True:
+ data = await websocket.receive_text()
+ if(data == "Disconnect"):
+ manager.disconnect(websocket)
+ await websocket.close()
+ return
+ else:
+ await manager.send_personal_message(f"Unrecognised message: {data}", websocket)
+ except WebSocketDisconnect:
+ manager.disconnect(websocket)