Skip to content

Commit

Permalink
Query on OTF List (#560)
Browse files Browse the repository at this point in the history
  • Loading branch information
jterry64 authored Aug 7, 2024
1 parent 0053319 commit ae21eaa
Show file tree
Hide file tree
Showing 23 changed files with 837 additions and 152 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
line_length = 88
multi_line_output = 3
include_trailing_comma = True
known_third_party = _pytest,aenum,affine,aiohttp,alembic,asgi_lifespan,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer
known_third_party = _pytest,aenum,affine,alembic,asgi_lifespan,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer
12 changes: 12 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
versions,
)
from .routes.geostore import geostore as geostore_top
from .routes.jobs import job
from .routes.tasks import task

################
Expand Down Expand Up @@ -161,6 +162,16 @@ async def rve_error_handler(
for r in analysis_routers:
app.include_router(r, prefix="/analysis")


###############
# JOB API
###############

job_routes = (job.router,)
for r in job_routes:
app.include_router(r, prefix="/job")


###############
# HEALTH API
###############
Expand All @@ -185,6 +196,7 @@ async def rve_error_handler(
{"name": "Geostore", "description": geostore.__doc__},
{"name": "Tasks", "description": task.__doc__},
{"name": "Analysis", "description": analysis.__doc__},
{"name": "Job", "description": job.__doc__},
{"name": "Health", "description": health.__doc__},
]

Expand Down
9 changes: 8 additions & 1 deletion app/models/pydantic/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@

from app.models.enum.creation_options import Delimiters
from app.models.pydantic.base import StrictBaseModel
from app.models.pydantic.geostore import Geometry
from app.models.pydantic.geostore import FeatureCollection, Geometry


class QueryRequestIn(StrictBaseModel):
geometry: Optional[Geometry]
sql: str


class QueryBatchRequestIn(StrictBaseModel):
feature_collection: Optional[FeatureCollection]
uri: Optional[str]
id_field: str
sql: str


class CsvQueryRequestIn(QueryRequestIn):
delimiter: Delimiters = Delimiters.comma
18 changes: 18 additions & 0 deletions app/models/pydantic/user_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional
from uuid import UUID

from pydantic import BaseModel

from .responses import Response


class UserJob(BaseModel):
job_id: UUID
status: str = "pending"
download_link: Optional[str] = None
failed_geometries_link: Optional[str] = None
progress: Optional[str] = "0%"


class UserJobResponse(Response):
data: UserJob
44 changes: 28 additions & 16 deletions app/routes/assets/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

from app.models.pydantic.responses import Response
from app.settings.globals import API_URL
from ..datasets.downloads import _get_presigned_url

from ...authentication.token import get_manager
from ...crud import assets
from ...crud import metadata as metadata_crud
from ...crud import tasks
Expand All @@ -49,6 +49,7 @@
asset_metadata_factory,
)
from ...models.pydantic.assets import AssetResponse, AssetType, AssetUpdateIn
from ...models.pydantic.authentication import User
from ...models.pydantic.change_log import ChangeLog, ChangeLogResponse
from ...models.pydantic.creation_options import (
CreationOptions,
Expand All @@ -69,11 +70,9 @@
from ...utils.paginate import paginate_collection
from ...utils.path import infer_srid_from_grid, split_s3_path
from ..assets import asset_response
from ..tasks import paginated_tasks_response, tasks_response

from ...authentication.token import get_manager
from ...models.pydantic.authentication import User
from ..datasets import _get_presigned_url
from ..datasets.dataset import get_owner
from ..tasks import paginated_tasks_response, tasks_response

router = APIRouter()

Expand Down Expand Up @@ -111,7 +110,8 @@ async def update_asset(
) -> AssetResponse:
"""Update Asset metadata.
Only the dataset's owner or a user with `ADMIN` user role can do this operation.
Only the dataset's owner or a user with `ADMIN` user role can do
this operation.
"""

try:
Expand Down Expand Up @@ -322,7 +322,7 @@ async def get_tiles_info(asset_id: UUID = Path(...)):
if asset.asset_type != AssetType.raster_tile_set:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Tiles information only available for raster tile sets"
detail="Tiles information only available for raster tile sets",
)

bucket, asset_key = split_s3_path(asset.asset_uri)
Expand Down Expand Up @@ -383,12 +383,16 @@ async def get_field_metadata(*, asset_id: UUID = Path(...), field_name: str):
response_model=FieldMetadataResponse,
)
async def update_field_metadata(
*, asset_id: UUID = Path(...), field_name: str, request: FieldMetadataUpdate,
*,
asset_id: UUID = Path(...),
field_name: str,
request: FieldMetadataUpdate,
user: User = Depends(get_manager),
):
"""Update the field metadata for an asset.
Only the dataset's owner or a user with `ADMIN` user role can do this operation.
Only the dataset's owner or a user with `ADMIN` user role can do
this operation.
"""

try:
Expand Down Expand Up @@ -434,7 +438,8 @@ async def get_metadata(asset_id: UUID = Path(...)):
async def create_metadata(*, asset_id: UUID = Path(...), request: AssetMetadata):
"""Create metadata record for an asset.
Only the dataset's owner or a user with `ADMIN` user role can do this operation.
Only the dataset's owner or a user with `ADMIN` user role can do
this operation.
"""
input_data = request.dict(exclude_none=True, by_alias=True)
asset = await assets.get_asset(asset_id)
Expand All @@ -457,11 +462,16 @@ async def create_metadata(*, asset_id: UUID = Path(...), request: AssetMetadata)
tags=["Assets"],
response_model=AssetMetadataResponse,
)
async def update_metadata(*, asset_id: UUID = Path(...), request: AssetMetadataUpdate,
user: User = Depends(get_manager)):
async def update_metadata(
*,
asset_id: UUID = Path(...),
request: AssetMetadataUpdate,
user: User = Depends(get_manager),
):
"""Update metadata record for an asset.
Only the dataset's owner or a user with `ADMIN` user role can do this operation.
Only the dataset's owner or a user with `ADMIN` user role can do
this operation.
"""

input_data = request.dict(exclude_none=True, by_alias=True)
Expand All @@ -488,11 +498,13 @@ async def update_metadata(*, asset_id: UUID = Path(...), request: AssetMetadataU
tags=["Assets"],
response_model=AssetMetadataResponse,
)
async def delete_metadata(asset_id: UUID = Path(...),
user: User = Depends(get_manager)):
async def delete_metadata(
asset_id: UUID = Path(...), user: User = Depends(get_manager)
):
"""Delete an asset's metadata record.
Only the dataset's owner or a user with `ADMIN` user role can do this operation.
Only the dataset's owner or a user with `ADMIN` user role can do
this operation.
"""
try:
asset = await assets.get_asset(asset_id)
Expand Down
97 changes: 96 additions & 1 deletion app/routes/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Any, Dict, List
from collections import defaultdict
from typing import Any, Dict, List, Sequence
from urllib.parse import urlparse

from botocore.exceptions import ClientError
from fastapi import HTTPException

from ...crud import assets
Expand All @@ -11,6 +14,20 @@
from ...tasks.raster_tile_set_assets.raster_tile_set_assets import (
raster_tile_set_validator,
)
from ...utils.aws import get_aws_files, get_s3_client
from ...utils.google import get_gs_files
from ...utils.path import split_s3_path

SUPPORTED_FILE_EXTENSIONS: Sequence[str] = (
".csv",
".geojson",
".gpkg",
".ndjson",
".shp",
".tif",
".tsv",
".zip",
)


async def verify_version_status(dataset, version):
Expand Down Expand Up @@ -82,3 +99,81 @@ async def validate_creation_options(
await validator[input_data["asset_type"]](dataset, version, input_data)
except KeyError:
pass


# I cannot seem to satisfy mypy WRT the type of this default dict. Last thing I tried:
# DefaultDict[str, Callable[[str, str, int, int, ...], List[str]]]
source_uri_lister_constructor = defaultdict((lambda: lambda w, x, limit=None, exit_after_max=None, extensions=None: list())) # type: ignore
source_uri_lister_constructor.update(**{"gs": get_gs_files, "s3": get_aws_files}) # type: ignore


def _verify_source_file_access(sources: List[str]) -> None:

# TODO:
# 1. Making the list functions asynchronous and using asyncio.gather
# to check for valid sources in a non-blocking fashion would be good.
# Perhaps use the aioboto3 package for aws, gcloud-aio-storage for gcs.
# 2. It would be nice if the acceptable file extensions were passed
# into this function so we could say, for example, that there must be
# TIFFs found for a new raster tile set, but a CSV is required for a new
# vector tile set version. Even better would be to specify whether
# paths to individual files or "folders" (prefixes) are allowed.

invalid_sources: List[str] = list()

for source in sources:
url_parts = urlparse(source, allow_fragments=False)
list_func = source_uri_lister_constructor[url_parts.scheme.lower()]
bucket = url_parts.netloc
prefix = url_parts.path.lstrip("/")

# Allow pseudo-globbing: Tolerate a "*" at the end of a
# src_uri entry to allow partial prefixes (for example
# /bucket/prefix_part_1/prefix_fragment* will match
# /bucket/prefix_part_1/prefix_fragment_1.tif and
# /bucket/prefix_part_1/prefix_fragment_2.tif, etc.)
# If the prefix doesn't end in "*" or an acceptable file extension
# add a "/" to the end of the prefix to enforce it being a "folder".
new_prefix: str = prefix
if new_prefix.endswith("*"):
new_prefix = new_prefix[:-1]
elif not new_prefix.endswith("/") and not any(
[new_prefix.endswith(suffix) for suffix in SUPPORTED_FILE_EXTENSIONS]
):
new_prefix += "/"

if not list_func(
bucket,
new_prefix,
limit=10,
exit_after_max=1,
extensions=SUPPORTED_FILE_EXTENSIONS,
):
invalid_sources.append(source)

if invalid_sources:
raise HTTPException(
status_code=400,
detail=(
"Cannot access all of the source files (non-existent or access denied). "
f"Invalid sources: {invalid_sources}"
),
)


async def _get_presigned_url_from_path(path):
bucket, key = split_s3_path(path)
return await _get_presigned_url(bucket, key)


async def _get_presigned_url(bucket, key):
s3_client = get_s3_client()
try:
presigned_url = s3_client.generate_presigned_url(
"get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=900
)
except ClientError:
raise HTTPException(
status_code=404, detail="Requested resources does not exist."
)
return presigned_url
31 changes: 11 additions & 20 deletions app/routes/datasets/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,26 @@
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID, uuid4

from aiohttp import ClientError
from fastapi import APIRouter, Depends, HTTPException, Query

# from fastapi.openapi.models import APIKey
from fastapi.responses import RedirectResponse

# from ...authentication.api_keys import get_api_key
from ...crud.assets import get_assets_by_filter
from ...crud.versions import get_version
from ...main import logger
from ...models.enum.assets import AssetType
from ...models.enum.creation_options import Delimiters
from ...models.enum.geostore import GeostoreOrigin
from ...models.enum.pixetl import Grid
from ...models.pydantic.downloads import DownloadCSVIn, DownloadJSONIn
from ...models.pydantic.geostore import GeostoreCommon
from ...responses import CSVStreamingResponse, ORJSONStreamingResponse
from ...utils.aws import get_s3_client
from ...utils.geostore import get_geostore
from ...utils.path import split_s3_path
from .. import dataset_version_dependency

# from ...authentication.api_keys import get_api_key
from . import _get_presigned_url
from .queries import _query_dataset_csv, _query_dataset_json

router: APIRouter = APIRouter()
Expand All @@ -37,7 +36,10 @@
async def download_json(
dataset_version: Tuple[str, str] = Depends(dataset_version_dependency),
sql: str = Query(..., description="SQL query."),
geostore_id: Optional[UUID] = Query(None, description="Geostore ID. The geostore must represent a Polygon or MultiPolygon."),
geostore_id: Optional[UUID] = Query(
None,
description="Geostore ID. The geostore must represent a Polygon or MultiPolygon.",
),
geostore_origin: GeostoreOrigin = Query(
GeostoreOrigin.gfw, description="Service to search first for geostore."
),
Expand Down Expand Up @@ -118,7 +120,10 @@ async def download_json_post(
async def download_csv(
dataset_version: Tuple[str, str] = Depends(dataset_version_dependency),
sql: str = Query(..., description="SQL query."),
geostore_id: Optional[UUID] = Query(None, description="Geostore ID. The geostore must represent a Polygon or MultiPolygon."),
geostore_id: Optional[UUID] = Query(
None,
description="Geostore ID. The geostore must represent a Polygon or MultiPolygon.",
),
geostore_origin: GeostoreOrigin = Query(
GeostoreOrigin.gfw, description="Service to search first for geostore."
),
Expand Down Expand Up @@ -316,20 +321,6 @@ async def _get_asset_url(dataset: str, version: str, asset_type: str) -> str:
return assets[0].asset_uri


async def _get_presigned_url(bucket, key):
s3_client = get_s3_client()
try:
presigned_url = s3_client.generate_presigned_url(
"get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=900
)
except ClientError as e:
logger.error(e)
raise HTTPException(
status_code=404, detail="Requested resources does not exist."
)
return presigned_url


async def _check_downloadability(dataset, version):
v = await get_version(dataset, version)
if not v.is_downloadable:
Expand Down
Loading

0 comments on commit ae21eaa

Please sign in to comment.