Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ghrccloud 5519/fastapi #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
*.py[cod]
11 changes: 11 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

EA_FASTAPI_S3_URL=
EA_FASTAPI_S3_BUCKET=
EA_FASTAPI_S3_OBJECT_URL=
EA_FASTAPI_FILE_DOWNLOAD_PATH=

MACHINE=urs.earthdata.nasa.gov
EARTHDATA_USERNAME=
EARTHDATA_PASSWORD=
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
folder
.env
__pycache__/
*.py[cod]
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM osgeo/gdal:ubuntu-small-latest
RUN apt-get update && apt-get -y install python3-pip --fix-missing
WORKDIR /app
COPY requirements.txt .
COPY requirements1.txt .
RUN pip install -r requirements.txt
RUN pip install -r requirements1.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# ea-fastapi

In this repo, FastAPI is used as a wrapper around EarthAccess library. There are 4 endpoints defined which can be validated in Postman for tracking,
1. start_download - Downloads files from search result locally and uploads them to S3. Returns a unique job id of 8 characters.
2. status - Returns latest status of the job.
3. get_file_path - Returns S3 path of the uploaded granules.
4. get_metadata - Displays the metadata of granules.

## Pre-requisites

Accounts in AWS and EarthData are necessary for using this wrapper application. The credentials for these accounts should be entered in .env file.
Postman and Docker needs to be installed.

## Steps to Use

Run the following commands:

`docker compose build`

`docker compose up`

Once the Docker container is up and running, the following endpoints can be used to send GET/POST/PUT requests from http://0.0.0.0:8080/:

| Request type | Endpoint | Parameters |
| --- | --- | --- |
| GET | / | N/A |
| PUT | /start_download | short_name, date_range, bounding_box(coordinates can be entered manually or passed as a text in Request Body), concept_id(optional) |
| POST | /status | job_id |
| POST | /get_file_path | job_id |
| POST | /get_metadata | job_id |

For checking status in websocket, choose 'WebSocket' request type and enter ws://localhost:8080/ws.
Enter 'Disconnect' in Messages for disconnecting.
11 changes: 11 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dotenv import load_dotenv
import os

load_dotenv()

s3Url = os.environ.get('EA_FASTAPI_S3_URL')
s3BucketName = os.environ.get('EA_FASTAPI_S3_BUCKET')
s3ObjectUrl = os.environ.get('EA_FASTAPI_S3_OBJECT_URL')
fileDownloadPath = os.environ.get('EA_FASTAPI_FILE_DOWNLOAD_PATH')
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: '3.8'
services:
api:
container_name: "docker-fastapi"
build: .
ports:
- 8080:8000
volumes:
- .:/usr/src/app
56 changes: 56 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import random
from typing import Dict
from fastapi import Body, FastAPI, Query, WebSocket
import string

from src import *

from http import HTTPStatus
from fastapi import BackgroundTasks

# Dict as job storage
jobs: Dict[str, Job] = {}
app = FastAPI()

# Authenticate user and allow login
login_handler()

# Root endpoint
@app.get("/")
async def root():
return await root_handler()

manager = ConnectionManager()

# Establish WebSocket connection
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
return await websocket_endpoint_handler(websocket, manager)

# Start a download job in the background
@app.put("/start_download", status_code=HTTPStatus.ACCEPTED)
async def start_download(background_tasks: BackgroundTasks, short_name: str | None = None, date_range: list = Query([]) , concept_id: str | None = None, bounding_box: list | None = Query(None), bounding_box_geojson: str | None = Body(None)):

# Create a new Job instance and add it to the jobs dictionary
current_job = Job()
current_job.uid = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
jobs[current_job.uid] = current_job

# Add the download job to the background tasks
background_tasks.add_task(download_handler, current_job, short_name, tuple(date_range), concept_id, bounding_box, bounding_box_geojson, manager)
return current_job.uid

# Check the status of a job
@app.post("/status")
async def status(uid: str):
return await status_handler(uid, jobs)

# Get the files associated with a job
@app.post("/get_file_path")
async def get_file_path(uid: str):
return await file_path_handler(uid, jobs)

# Display metadata associated with a job
@app.post("/get_metadata")
async def get_metadata(uid: str):
return await metadata_handler(uid, jobs)
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fastapi[all] #includes websocket, uvicorn
earthaccess
fiona==1.8.22
xarray
geopandas
python-dotenv
h5netcdf
2 changes: 2 additions & 0 deletions requirements1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
dask
9 changes: 9 additions & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from src.utils import callWithNonNoneArgs
from src.status_handler import status_handler
from src.download_handler import download_handler
from src.metadata_handler import metadata_handler
from src.file_path_handler import file_path_handler
from src.root_handler import root_handler
from src.login_handler import login_handler
from src.websocket_handler import websocket_endpoint_handler, ConnectionManager
from src.job_model import Job
91 changes: 91 additions & 0 deletions src/download_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import earthaccess as ea
from src.utils import callWithNonNoneArgs
import boto3
import os
import geopandas as gpd
import asyncio
import logging

from config import s3Url, s3ObjectUrl,s3BucketName, fileDownloadPath, aws_access_key_id, aws_secret_access_key
from src.websocket_handler import ConnectionManager

# TO-DO: s3_upload can be moved into a separate class like ConnectionManager in main.py
# so that any error while setting up S3 is taken care of in the beginning itself
# CHECK: The s3 status should be known only to the backend? The client in frontend need not know
# what's being done in s3

# upload file to S3 and return the S3 URL
def s3_upload(file, uid, status, manager):
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
file_name = os.path.split(file)[1]
key=f'{uid}/{file_name}'
print(file, key)
status = f"{uid}: uploading {file_name} to S3"
asyncio.run(manager.broadcast(status))

try:
s3_client.upload_file(file, s3BucketName, key)
except Exception as e:
logging.exception(e)
return ""

return f"{s3ObjectUrl}/{key}"

def download_handler(current_job, short_name = None, date_range = None, concept_id = None, bounding_box = None, bounding_box_geojson = None, manager: ConnectionManager = None):
current_job.status = f"{current_job.uid}: querying data"
asyncio.run(manager.broadcast(current_job.status))

# Assign bounding_box with geojson coordinates
if(bounding_box is None):
if(bounding_box_geojson is not None):
gdf = gpd.read_file(bounding_box_geojson, driver='GeoJSON')
xmin, ymin, xmax, ymax = gdf.total_bounds
bounding_box = (xmin, ymin, xmax, ymax)

else:
bounding_box = (-180.0, -90.0, 180.0, 90.0)

# Call the search_data function with non-None arguments
results = callWithNonNoneArgs(ea.search_data,
concept_id = concept_id,
short_name=short_name,
cloud_hosted=True,
temporal=date_range,
bounding_box=bounding_box,
count = 3 #set count for testing purposes
)

current_job.result_granules = results
current_job.progress = 0
print("results ", results)

if (results == []):
current_job.progress = 100
current_job.status = f"{current_job.uid}: No files to download"
asyncio.run(manager.broadcast(current_job.status))
return

# CHECK: ea should tell about calculating percentage, can't use external libraries as this is unique for each case.
# might be available for s3 upload, % for downloading from ea should be given by ea
remote_files = []
for result in results:
current_job.progress += 1
current_job.status = f"{current_job.uid}: in progress - downloading files {current_job.progress}/{len(results)}"
asyncio.run(manager.broadcast(current_job.status))

# download from CMR to a local path
result_file = ea.download(granules=result,local_path=fileDownloadPath)
print("result_file ", result_file)

response = s3_upload(result_file[0], current_job.uid, current_job.status, manager)
if (len(response) == 0):
current_job.status = f"{current_job.uid}: File could not be uploaded to S3"
asyncio.run(manager.broadcast(current_job.status))
return
# Upload the downloaded file to S3 and get the S3 URL
remote_files.append(response)

current_job.files = remote_files
current_job.status = f"{current_job.uid}: Download completed"
asyncio.run(manager.broadcast(current_job.status))
print("remote files", remote_files)
5 changes: 5 additions & 0 deletions src/file_path_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
async def file_path_handler(uid, jobs):
if uid in jobs:
return jobs[uid].files
else:
return "Job not available"
16 changes: 16 additions & 0 deletions src/job_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import List
from pydantic import BaseModel
import earthaccess as ea

class Job(BaseModel):
# unique uid with length=8 can't be created with Pydantic model
uid: str = ""
status: str = "in_progress"
progress: int = 0
files: list[str] = [] # List of files downloaded
data: list[str] = [] # Data within the files downloaded
result_granules: List[ea.results.DataGranule] = [] # Store results of ea.search_data()

# Type of result_granules is not in-built
class Config:
arbitrary_types_allowed = True
13 changes: 13 additions & 0 deletions src/login_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import earthaccess as ea
from dotenv import load_dotenv

def login_handler():
# For .netrc to load environment variables from .env file
load_dotenv()
auth = ea.login(strategy="environment", persist=True)

# Check if the authentication is an instance of ea.Auth and if it's authenticated
if isinstance(auth, ea.Auth) and auth.authenticated:
print("Login successful")
else:
raise Exception("Login failed. Check if valid credentials are provided")
19 changes: 19 additions & 0 deletions src/metadata_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import earthaccess as ea
import xarray as xr

async def metadata_handler(uid, jobs):
# Check if the specified job UID exists in the jobs dictionary
if uid in jobs:
current_job = jobs[uid]
print("current_job.result_granules: ", current_job.result_granules)
#CHECK: s3url - give region; http_url(from result_granules.data?) - give provider
#can't use fcx-downloader http url as it has no provider
fileset = ea.open(current_job.result_granules)
print("fileset: ", fileset)

# Open through xarray
current_job.data = xr.open_mfdataset(fileset, decode_times=False).to_dict(data=False)
print("current_job.data ", current_job.data)
return current_job.data
else:
return "Job not available"
2 changes: 2 additions & 0 deletions src/root_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
async def root_handler():
return {"Welcome to fcx-downloader!"}
6 changes: 6 additions & 0 deletions src/status_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
async def status_handler(uid, jobs):
if uid in jobs:
return jobs[uid].status
else:
return "Job not available"

3 changes: 3 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def callWithNonNoneArgs(f, *args, **kwargs):
kwargsNotNone = {k: v for k, v in kwargs.items() if v is not None}
return f(*args, **kwargsNotNone)
34 changes: 34 additions & 0 deletions src/websocket_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)

async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)

# Returns the latest status automatically
async def websocket_endpoint_handler(websocket, manager):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
if(data == "Disconnect"):
manager.disconnect(websocket)
await websocket.close()
return
else:
await manager.send_personal_message(f"Unrecognised message: {data}", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)