+# ea-fastapi
+In this repo, FastAPI is used as a wrapper around EarthAccess library. There are 4 endpoints defined which can be validated in Postman for tracking,
+1. start_download - Downloads files from search result locally and uploads them to S3. Returns a unique job id of 8 characters.
+2. status - Returns latest status of the job.
+3. get_file_path - Returns S3 path of the uploaded granules.
+4. get_metadata - Displays the metadata of granules.
+## Pre-requisites
+Accounts in AWS and EarthData are necessary for using this wrapper application. The credentials for these accounts should be entered in .env file.
+Postman and Docker needs to be installed.
+## Steps to Use
+Run the following commands:
+`docker build -t .`
+`docker run -p 8000:8000 `
+Once the Docker container is up and running, the following endpoints can be used to send GET/POST/PUT requests from
+| Request type | Endpoint | Parameters |
+| --- | --- | --- |
+| GET | / | N/A |
+| PUT | /start_download | short_name, date_range, bounding_box(coordinates can be entered manually or passed as a text in Request Body), concept_id(optional) |
+| POST | /status | job_id |
+| POST | /get_file_path | job_id |
+| POST | /get_metadata | job_id |
+import random
+from typing import Any, Dict
+from fastapi import Body, FastAPI, Query, WebSocket
+import string
+from starlette.middleware import Middleware
+from starlette.middleware.cors import CORSMiddleware
+from src import *
+from src.models.job_model import Job, Metadata, Coord
+from src.login_handler import login_handler
+from src.websocket_handler import ConnectionManager
+from src.root_handler import root_handler
+from http import HTTPStatus
+from fastapi import BackgroundTasks
+from src.websocket_handler import *
+from src.download_handler import *
+from src.file_path_handler import *
+from src.status_handler import *
+from src.metadata_handler import *
+# Enable CORS for all origins
+middleware = [
+ Middleware(
+ CORSMiddleware,
+ allow_origins=['*'],
+ allow_credentials=True,
+ allow_methods=['*'],
+ allow_headers=['*']
+ )
+# Dict as job storage
+jobs: Dict[str, Job] = {}
+app = FastAPI(middleware=middleware)
+# Authenticate user and allow login
+# Root endpoint
+async def root():
+ return await root_handler()
+manager = ConnectionManager()
+# Establish WebSocket connection
+async def websocket_endpoint(websocket: WebSocket):
+ return await websocket_endpoint_handler(websocket, manager)
+# Start a download job in the background
+@app.put("/start_download", status_code=HTTPStatus.ACCEPTED)
+async def start_download(background_tasks: BackgroundTasks, short_name: str, date_range: list = Query([]) , concept_id: str | None = None, bounding_box: list | None = Query(None), isPangeoForge: bool | None = None):
+ # Create a new Job instance and add it to the jobs dictionary
+ # Job should always start with a letter because we use this for pangeo forge jobid which has this specific requirement
+ current_job = Job()
+ key1 = random.choices(string.ascii_lowercase, k=1)[0]
+ key2 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=7))
+ current_job.uid = key1+key2
+ jobs[current_job.uid] = current_job
+ # Add the download job to the background tasks
+ background_tasks.add_task(download_handler, current_job, short_name, tuple(date_range), concept_id, bounding_box, manager, isPangeoForge)
+ return current_job.uid
+# Check the status of a job
+async def status(uid: str):
+ return await status_handler(uid, jobs)
+# Get the files associated with a job
+async def get_file_path(uid: str):
+ return await file_path_handler(uid, jobs)
+# Display metadata associated with a job(only earthaccess)
+async def get_metadata(uid: str):
+ return await metadata_handler(uid, jobs)
+import earthaccess as ea
+from src.utils.pangeo_forge_utils import handlePangeoForge
+from src.utils.earthaccess_utils import handleEarthAccess
+from src.utils.utils import callWithNonNoneArgs
+import asyncio
+from src.websocket_handler import ConnectionManager
+from src.models.status_model import JobStatus, Status, JobType
+def download_handler(current_job, short_name = None, date_range = None, concept_id = None, bounding_box = None, manager: ConnectionManager = None, isPangeoForge = False):
+ """
+ Handles the download process for data based on specified parameters, supporting both EarthAccess and pangeo-forge workflows.
+ Args:
+ current_job (Job): The current job object containing information about the job.
+ short_name (str): Short name identifier for the dataset.
+ date_range (str): String specifying the date range for the dataset query.
+ concept_id (str, optional): The unique identifier for the dataset, if applicable.
+ bounding_box (tuple, optional): Tuple specifying the geographic bounding box for the dataset query.
+ isPangeoForge (bool): Flag to determine if the job is for Pangeo Forge; otherwise, it's for Earth Access.
+ This function sets the job status, performs a EarthAccess search with non-None parameters,
+ and processes results according to the specified workflow (Pangeo Forge or Earth Access).
+ It also handles broadcasting status updates through the specified connection manager.
+ """
+ jobType = JobType.PANGEO_FORGE if isPangeoForge else JobType.EARTH_ACCESS
+ current_job.status = "Querying data"
+ asyncio.run(manager.broadcast(Status(jobType, current_job.uid, current_job.status)))
+ # Assign bounding_box with geojson coordinates
+ if(bounding_box is None):
+ bounding_box = (-180.0, -90.0, 180.0, 90.0)
+ else:
+ bounding_box = tuple(bounding_box)
+ results = callWithNonNoneArgs(ea.search_data,
+ concept_id = concept_id,
+ short_name=short_name,
+ cloud_hosted=True,
+ temporal=date_range,
+ bounding_box=bounding_box,
+ count=3
+ )
+ if (results == []):
+ current_job.progress = 100
+ current_job.status = JobStatus.NO_GRANULES
+ current_job.completed = True
+ asyncio.run(manager.broadcast(Status(jobType, current_job.uid, current_job.status)))
+ return
+ if (isPangeoForge):
+ handlePangeoForge(results, current_job, manager)
+ else:
+ handleEarthAccess(results, current_job, manager)
\ No newline at end of file
+import aiohttp
+import apache_beam as beam
+import os
+import sys
+from pathlib import Path
+__file__ = "recipe.py"
+DIR = Path(__file__).parent.absolute().as_posix()
+from config import username, password
+from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
+from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
+urls_file_path = f"{Path(__file__).parent.absolute().as_posix()}/src/feedstock/url_list.txt"
+outfile = open(urls_file_path, 'r')
+lines = outfile.readlines()
+count_lines = len(lines)
+range_lines = range(count_lines)
+def make_url(time):
+ return lines[time]
+concat_dim = ConcatDim("time", range_lines)
+pattern = FilePattern(make_url, concat_dim)
+# for index, url in pattern.items():
+# print(url)
+recipe = (
+ beam.Create(pattern.items())
+ | OpenURLWithFSSpec(open_kwargs={'client_kwargs': {"auth" : aiohttp.BasicAuth(username, password)}})
+ | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
+ | StoreToZarr(
+ store_name="zarr",
+ combine_dims=pattern.combine_dim_keys,
+ )
\ No newline at end of file
+from src.models.status_model import JobStatus
+async def file_path_handler(uid, jobs):
+ """
+ Asynchronously retrieves the file paths from the job registry if the job is completed.
+ Args:
+ uid (str): Unique identifier for the job.
+ jobs (dict): Dictionary containing job statuses and data.
+ Returns:
+ A list of file paths if available, or a job status indicating the current state or issues (e.g., not available, in progress, no granules).
+ """
+ if uid not in jobs:
+ return JobStatus.NOT_AVAILABLE
+ if jobs[uid].completed == False:
+ print(JobStatus.IN_PROGRESS)
+ return JobStatus.IN_PROGRESS
+ if len(jobs[uid].files) == 0:
+ print(JobStatus.NO_GRANULES)
+ return JobStatus.NO_GRANULES
+ return jobs[uid].files
\ No newline at end of file
+# Let's put all our data on the same dir as this config file
+from pathlib import Path
+import os
+HERE = Path(__file__).parent
+DATA_PREFIX = HERE / 'data'
+os.makedirs(DATA_PREFIX, exist_ok=True)
+# Target output should be partitioned by job id
+c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
+c.TargetStorage.root_path = f"{DATA_PREFIX}/{{job_name}}"
+c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
+c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
+# Input data cache should *not* be partitioned by job id, as we want to get the datafile
+# from the source only once
+c.InputCacheStorage.root_path = f"{DATA_PREFIX}/cache/input"
+c.MetadataCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
+c.MetadataCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
+# Metadata cache should be per job, as kwargs changing can change metadata
+c.MetadataCacheStorage.root_path = f"{DATA_PREFIX}/{{job_name}}/cache/metadata"
\ No newline at end of file
+import earthaccess as ea
+from dotenv import load_dotenv
+def login_handler():
+ # For .netrc to load environment variables from .env file
+ load_dotenv()
+ auth = ea.login(strategy="environment", persist=True)
+ # Check if the authentication is an instance of ea.Auth and if it's authenticated
+ if isinstance(auth, ea.Auth) and auth.authenticated:
+ print("Login successful")
+ else:
+ raise Exception("Login failed. Check if valid credentials are provided")
+import earthaccess as ea
+import xarray as xr
+from src.models.status_model import JobStatus
+async def metadata_handler(uid, jobs):
+ """
+ Asynchronously retrieves and processes the metadata for the specified job using EarthAccess API and xarray.
+ Args:
+ uid (str): Unique identifier for the job.
+ jobs (dict): Dictionary of jobs which may contain granule results and other metadata.
+ Returns:
+ A dictionary containing processed metadata or a job status; or an error message if metadata cannot be parsed.
+ """
+ try:
+ if uid not in jobs:
+ return JobStatus.NOT_AVAILABLE
+ if not jobs[uid].completed:
+ return JobStatus.IN_PROGRESS
+ if len(jobs[uid].files) == 0:
+ return JobStatus.NO_GRANULES
+ current_job = jobs[uid]
+ # Display metadata of one granule
+ fileset = ea.open([current_job.result_granules[0]])
+ # Open through xarray
+ current_job.data = xr.open_mfdataset(fileset, decode_times=False).to_dict(data=False)
+ return current_job.data
+ except Exception:
+ return "Metadata could not be parsed, ensure that the files are in NetCDF format."
\ No newline at end of file
+from typing import List, Dict
+from pydantic import BaseModel
+import earthaccess as ea
+import json
+class Coord(BaseModel):
+ dims: List[str]
+ attrs: Dict[str, str]
+ dtype: str
+ shape: List[int]
+class Metadata(BaseModel):
+ coords: Dict[str, Coord]
+class Job(BaseModel):
+ # unique uid with length=8 can't be created with Pydantic model
+ uid: str = ""
+ status: str = "in_progress"
+ progress: int = 0
+ completed: bool = False
+ files: list[str] = [] # List of files downloaded
+ data: Metadata = {}
+ result_granules: List[ea.results.DataGranule] = [] # Store results of ea.search_data()
+ # Type of result_granules is not in-built
+ class Config:
+ arbitrary_types_allowed = True
\ No newline at end of file
+from enum import Enum
+from dataclasses import dataclass
+class JobType(str, Enum):
+class JobStatus(str, Enum):
+ NOT_AVAILABLE = "Job not available"
+ IN_PROGRESS = "Job in progress"
+ NO_GRANULES = "No granules found"
+ JOB_COMPLETE = "Job completed"
+class Status():
+ jobType: JobType = JobType.EARTH_ACCESS
+ uid: str = ""
+ status: str = ""
\ No newline at end of file
+async def root_handler():
+ return {"Welcome to fcx-downloader!"}
\ No newline at end of file
+from src.models.status_model import JobStatus
+async def status_handler(uid, jobs):
+ """
+ Asynchronously retrieves the status of a specified job.
+ Args:
+ uid (str): Unique identifier for the job.
+ jobs (dict): Dictionary containing job details and statuses.
+ Returns:
+ The current status of the job if available; otherwise, returns a status indicating the job is not available.
+ """
+ if uid in jobs:
+ return jobs[uid].status
+ return JobStatus.NOT_AVAILABLE
\ No newline at end of file
+import boto3
+import logging
+import earthaccess as ea
+import asyncio
+import os
+from src.models.status_model import JobType, Status, JobStatus
+from config import cloudfrontUrl, s3BucketName, fileDownloadPath, aws_access_key_id, aws_secret_access_key
+# Uploads file to S3 and returns the CloudFront URL if successful
+def s3_upload_earthaccess(file, uid, manager):
+ s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
+ file_name = os.path.split(file)[1]
+ key=f'{uid}/{file_name}'
+ status = f"Uploading {file_name} to S3"
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, uid, status)))
+ try:
+ s3_client.upload_file(file, s3BucketName, key)
+ except Exception as e:
+ logging.exception(e)
+ return ""
+ return f"{cloudfrontUrl}/{key}"
+# Downloads result granules and uploads to S3(Main EarthAccess function)
+def handleEarthAccess(results, current_job, manager):
+ print("In handleEarthAccess")
+ current_job.result_granules = results
+ current_job.progress = 0
+ # CHECK: ea should tell about calculating percentage, can't use external libraries as this is unique for each case.
+ # might be available for s3 upload, % for downloading from EA should be given by EA
+ remote_files = []
+ for result in results:
+ current_job.progress += 1
+ current_job.status = f"In progress - downloading files {current_job.progress}/{len(results)}"
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status)))
+ # download from CMR to a local path
+ result_file = ea.download(granules=result,local_path=fileDownloadPath)
+ response = s3_upload_earthaccess(result_file[0], current_job.uid, manager)
+ if (len(response) == 0):
+ current_job.status = "File could not be uploaded to S3"
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status)))
+ current_job.completed = True
+ return
+ # Upload the downloaded file to S3 and get the S3 URL
+ remote_files.append(response)
+ current_job.files = remote_files
+ current_job.completed = True
+ current_job.status = JobStatus.JOB_COMPLETE
+ print(current_job.completed, current_job.status)
+ asyncio.run(manager.broadcast(Status(JobType.EARTH_ACCESS, current_job.uid, current_job.status)))
\ No newline at end of file
+import asyncio
+import os
+from pathlib import Path
+import subprocess
+import s3fs
+from config import cloudfrontUrl, s3BucketName, aws_access_key_id, aws_secret_access_key
+from zipfile import ZipFile
+import boto3
+from src.models.status_model import JobType, Status, JobStatus
+def get_all_file_paths(directory):
+ """
+ Collects and returns all file paths within the specified directory, including its subdirectories.
+ """
+ file_paths = []
+ for root, directories, files in os.walk(directory):
+ for filename in files:
+ filepath = os.path.join(root, filename)
+ file_paths.append(filepath)
+ return file_paths
+def zip_zarr_files(zip_zarr_path, zarr_path):
+ """
+ Zips all zarr files located in the specified directory into a single zip file.
+ """
+ file_paths = get_all_file_paths(zarr_path)
+ with ZipFile(zip_zarr_path,'w') as zip:
+ for file in file_paths:
+ zip.write(file, "./" + "/".join(file.split('/')[-2:]))
+def upload_file_to_s3(file_name, object_name):
+ """
+ Uploads file to S3 and returns the CloudFront URL if successful.
+ """
+ s3_client = boto3.client('s3')
+ try:
+ response = s3_client.upload_file(file_name, s3BucketName, object_name)
+ if response == None:
+ print(f"Upload Successful")
+ return f"{cloudfrontUrl}/{object_name}"
+ except Exception as e:
+ print(f"Error uploading file: {e}")
+ return ""
+ return ""
+def s3_upload_pangeoforge(dir, uid, manager):
+ """
+ Uploads a directory to S3 and broadcasts the upload status.
+ """
+ status = f"Uploading {dir} to S3"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, uid, status)))
+ s3_file = s3fs.S3FileSystem(anon=False, key=aws_access_key_id, secret=aws_secret_access_key)
+ s3_path = f"{s3BucketName}/{uid}"
+ s3_file.put(dir, s3_path, recursive=True)
+ return f"{cloudfrontUrl}/{uid}"
+# Main PangeoForge function
+def handlePangeoForge(results, current_job, manager):
+ """
+ Processes result granules, generates zipped zarr files and uploads to S3.
+ """
+ data_link_list = []
+ dir_path = f"{Path(__file__).parent.parent.absolute().as_posix()}/feedstock"
+ for granule in results:
+ for asset in granule.data_links():
+ if (".nc" in asset):
+ data_link_list.append(asset)
+ with open(f"{dir_path}/url_list.txt", 'w') as f:
+ for line in data_link_list:
+ f.write(f"{line}\n")
+ current_job.status = "Granules found, generating zarr files"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+ DIR = Path(__file__).parent.parent.absolute().as_posix()
+ bake_script = f"{DIR}/bake.sh"
+ zarr_path = f"{DIR}/data/{current_job.uid}/zarr"
+ # following pangeo forge documentation
+ cmd = ["sh", bake_script]
+ env = os.environ.copy() | {
+ "REPO": f"{DIR}",
+ "CONFIG_FILE": f"{DIR}/local_config.py",
+ "JOB_NAME": current_job.uid,
+ }
+ # show live output
+ proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, env=env, text=True)
+ while (line := proc.stdout.readline()) != "":
+ print(line)
+ if(len(os.listdir(zarr_path)) > 0):
+ current_job.status = "Generated zarr files"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+ zip_zarr_path = f"{DIR}/data/{current_job.uid}/{current_job.uid}.zip"
+ zip_zarr_files(zip_zarr_path, zarr_path)
+ response = upload_file_to_s3(zip_zarr_path, f"{current_job.uid}.zip")
+ if (len(response) == 0):
+ current_job.status = "Failed uploading zarr to S3"
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+ current_job.completed = True
+ return
+ else:
+ current_job.status = "Zarr files could not be generated"
+ current_job.completed = True
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
+ return
+ remote_files = [] #similar to EA as FE expects array not string path
+ remote_files.append(response)
+ current_job.files = remote_files
+ current_job.completed = True
+ current_job.status = JobStatus.JOB_COMPLETE
+ asyncio.run(manager.broadcast(Status(JobType.PANGEO_FORGE, current_job.uid, current_job.status)))
\ No newline at end of file
+def callWithNonNoneArgs(f, *args, **kwargs):
+ """
+ Calls a function, passing through only those keyword arguments that are not None.
+ """
+ kwargsNotNone = {k: v for k, v in kwargs.items() if v is not None}
+ return f(*args, **kwargsNotNone)
\ No newline at end of file
+import dataclasses
+import json
+from fastapi import WebSocket, WebSocketDisconnect
+from src.models.status_model import Status
+status = Status()
+class ConnectionManager:
+ """
+ Manages WebSocket connections, allowing for connection handling, messaging, and broadcasting.
+ """
+ # Initializes the ConnectionManager with an empty list for storing active WebSocket connections
+ def __init__(self):
+ self.active_connections: list[WebSocket] = []
+ # Asynchronously accepts a new WebSocket connection and adds it to the list of active connections
+ async def connect(self, websocket: WebSocket):
+ await websocket.accept()
+ self.active_connections.append(websocket)
+ # Removes a WebSocket connection from the list of active connections
+ def disconnect(self, websocket: WebSocket):
+ self.active_connections.remove(websocket)
+ # Asynchronously sends a message to a specified WebSocket
+ async def send_personal_message(self, message: str, websocket: WebSocket):
+ await websocket.send_text(message)
+ # Asynchronously sends a broadcast message to all active WebSocket connections
+ async def broadcast(self, status):
+ message = json.dumps(dataclasses.asdict(status))
+ for connection in self.active_connections:
+ await connection.send_text(message)
+async def websocket_endpoint_handler(websocket, manager):
+ """
+ Asynchronously handles incoming WebSocket connections and messages,
+ and manages connection events such as connect, disconnect, and messaging.
+ """
+ await manager.connect(websocket)
+ try:
+ while True:
+ data = await websocket.receive_text()
+ if(data == "Disconnect"):
+ manager.disconnect(websocket)
+ await websocket.close()
+ return
+ else:
+ await manager.send_personal_message(f"Unrecognised message: {data}", websocket)
+ except WebSocketDisconnect:
+ manager.disconnect(websocket)