diff --git a/.isort.cfg b/.isort.cfg index 899fbc7f9..0d3ba851c 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -2,4 +2,4 @@ line_length = 88 multi_line_output = 3 include_trailing_comma = True -known_third_party = alembic,asyncpg,boto3,click,create_table,docker,fastapi,fiona,geoalchemy2,gino,gino_starlette,load,moto,pendulum,post_processing,psycopg2,pydantic,pytest,rasterio,requests,sqlalchemy,sqlalchemy_utils,starlette +known_third_party = alembic,asyncpg,boto3,click,docker,fastapi,fiona,geoalchemy2,gino,gino_starlette,moto,pendulum,psycopg2,pydantic,pytest,rasterio,requests,sqlalchemy,sqlalchemy_utils,starlette diff --git a/app/application.py b/app/application.py index e315766c2..b2a2de76e 100644 --- a/app/application.py +++ b/app/application.py @@ -18,9 +18,7 @@ class ContextualGino(Gino): - """ - Overide the Gino Metadata object to allow to dynamically change the binds - """ + """Override the Gino Metadata object to allow to dynamically change the binds.""" @property def bind(self): @@ -61,7 +59,7 @@ def __init__(self, method): self.method = method async def __aenter__(self): - """ initialize objects """ + """initialize objects""" try: e = CURRENT_ENGINE.get() except LookupError: diff --git a/app/crud/__init__.py b/app/crud/__init__.py index 034e0183e..ccabcf357 100644 --- a/app/crud/__init__.py +++ b/app/crud/__init__.py @@ -10,9 +10,8 @@ async def update_data( row: db.Model, input_data: Union[BaseModel, Dict[str, Any]] # type: ignore ) -> db.Model: # type: ignore - """ - Merge updated metadata filed with existing fields - """ + """Merge updated metadata filed with existing fields.""" + if isinstance(input_data, BaseModel): input_data = input_data.dict(skip_defaults=True) diff --git a/app/crud/datasets.py b/app/crud/datasets.py index c1ac19db0..d59333857 100644 --- a/app/crud/datasets.py +++ b/app/crud/datasets.py @@ -11,9 +11,8 @@ async def get_datasets() -> List[ORMDataset]: - """ - Get list of all datasets - """ + """Get list of all datasets.""" + rows = await db.all(all_datasets) return rows diff --git a/app/crud/versions.py b/app/crud/versions.py index 5a4f99581..97932cc2c 100644 --- a/app/crud/versions.py +++ b/app/crud/versions.py @@ -34,9 +34,8 @@ async def get_version(dataset: str, version: str) -> ORMVersion: async def get_latest_version(dataset) -> str: - """ - Fetch latest version number - """ + """Fetch latest version number.""" + latest: Optional[str] = await ORMVersion.select("version").where( ORMVersion.dataset == dataset ).where(ORMVersion.is_latest).gino.scalar() diff --git a/app/middleware.py b/app/middleware.py index e61296449..c9dc01417 100644 --- a/app/middleware.py +++ b/app/middleware.py @@ -7,9 +7,11 @@ async def set_db_mode(request: Request, call_next): """ + This middleware replaces the db engine depending on the request type. Read requests use the read only pool. Write requests use the write pool. + """ async with ContextEngine(request.method): @@ -19,7 +21,9 @@ async def set_db_mode(request: Request, call_next): async def redirect_latest(request: Request, call_next): """ + Redirect all GET requests using latest version to actual version number. + """ if request.method == "GET" and "latest" in request.url.path: diff --git a/app/models/orm/base.py b/app/models/orm/base.py index c51ac089b..3e0dbe1d8 100644 --- a/app/models/orm/base.py +++ b/app/models/orm/base.py @@ -1,6 +1,5 @@ from datetime import datetime -import geoalchemy2 from geoalchemy2 import Geometry from sqlalchemy.dialects.postgresql import ARRAY, JSONB, TEXT, UUID from sqlalchemy_utils import EmailType, generic_repr diff --git a/app/models/orm/migrations/env.py b/app/models/orm/migrations/env.py index 9035f7866..5ecd86f72 100644 --- a/app/models/orm/migrations/env.py +++ b/app/models/orm/migrations/env.py @@ -1,6 +1,6 @@ """ env.py - Alembic ENV module + Alembic ENV module isort:skip_file """ @@ -46,7 +46,7 @@ def exclude_tables_from_config(config_): exclude_tables = exclude_tables_from_config(config.get_section("alembic:exclude")) -def include_object(object, name, type_, reflected, compare_to): +def include_object(obj, name, type_, reflected, compare_to): if type_ == "table" and name in exclude_tables: return False else: @@ -54,7 +54,9 @@ def include_object(object, name, type_, reflected, compare_to): def run_migrations_offline(): - """Run migrations in 'offline' mode. + """ + + Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable @@ -77,7 +79,9 @@ def run_migrations_offline(): def run_migrations_online(): - """Run migrations in 'online' mode. + """ + + Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. diff --git a/app/models/pydantic/creation_options.py b/app/models/pydantic/creation_options.py index 6b8019df8..1803a827f 100644 --- a/app/models/pydantic/creation_options.py +++ b/app/models/pydantic/creation_options.py @@ -1,9 +1,9 @@ from datetime import date from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import List, Optional, Union from pydantic import BaseModel, Field -from pydantic.types import PositiveInt, constr +from pydantic.types import PositiveInt COLUMN_REGEX = r"^[a-z][a-zA-Z0-9_-]{2,}$" PARTITION_SUFFIX_REGEX = r"^[a-z0-9_-]{3,}$" diff --git a/app/models/pydantic/database.py b/app/models/pydantic/database.py index 2b7cd5fd8..c5ab92166 100644 --- a/app/models/pydantic/database.py +++ b/app/models/pydantic/database.py @@ -1,6 +1,6 @@ from typing import Any, Optional, Union -from pydantic import BaseModel, Field, Schema, fields, validator +from pydantic import BaseModel, Field, fields, validator from sqlalchemy.engine.url import URL from starlette.datastructures import Secret diff --git a/app/models/pydantic/jobs.py b/app/models/pydantic/jobs.py index 77955af50..f1fc934bf 100644 --- a/app/models/pydantic/jobs.py +++ b/app/models/pydantic/jobs.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional from pydantic import BaseModel diff --git a/app/models/pydantic/metadata.py b/app/models/pydantic/metadata.py index 4ce726fbe..05d1aa547 100644 --- a/app/models/pydantic/metadata.py +++ b/app/models/pydantic/metadata.py @@ -1,5 +1,5 @@ from datetime import date -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field diff --git a/app/models/pydantic/versions.py b/app/models/pydantic/versions.py index 642cc8020..0101bde65 100644 --- a/app/models/pydantic/versions.py +++ b/app/models/pydantic/versions.py @@ -1,12 +1,11 @@ -from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import List, Optional, Tuple -from pydantic import BaseModel, Field +from pydantic import BaseModel from .base import Base from .change_log import ChangeLog -from .creation_options import CreationOptions, VectorSourceCreationOptions -from .metadata import FieldMetadata, VersionMetadata +from .creation_options import CreationOptions +from .metadata import VersionMetadata from .sources import SourceType diff --git a/app/routes/__init__.py b/app/routes/__init__.py index 92aa671a7..03af7628d 100644 --- a/app/routes/__init__.py +++ b/app/routes/__init__.py @@ -1,12 +1,8 @@ -from typing import Tuple - import requests -from fastapi import Depends, Form, HTTPException, Path +from fastapi import Depends, HTTPException, Path from fastapi.logger import logger from fastapi.security import OAuth2PasswordBearer -from app.crud.versions import get_latest_version - DATASET_REGEX = r"^[a-z][a-z0-9_-]{2,}$" VERSION_REGEX = r"^v\d{1,8}\.?\d{1,3}\.?\d{1,3}$|^latest$" oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") @@ -35,20 +31,8 @@ async def version_dependency( return version -# async def version_dependency_form( -# version: str = Form(..., title="Dataset version", regex=VERSION_REGEX) -# ): -# -# if version == "latest": -# version = await get_latest_version -# -# return version - - async def is_admin(token: str = Depends(oauth2_scheme)) -> bool: - """ - Calls GFW API to authorize user - """ + """Calls GFW API to authorize user.""" headers = {"Authorization": f"Bearer {token}"} url = "https://production-api.globalforestwatch.org/auth/check-logged" diff --git a/app/routes/assets.py b/app/routes/assets.py index cdf0f32b2..f324ac42f 100644 --- a/app/routes/assets.py +++ b/app/routes/assets.py @@ -36,9 +36,8 @@ async def get_assets( version: str = Depends(version_dependency), asset_type: Optional[AssetType] = Query(None, title="Filter by Asset Type"), ): - """ - Get all assets for a given dataset version - """ + """Get all assets for a given dataset version.""" + rows: List[ORMAsset] = await assets.get_assets(dataset, version) # Filter rows by asset type @@ -65,9 +64,7 @@ async def get_asset( version: str = Depends(version_dependency), asset_id: UUID = Path(...), ): - """ - Get a specific asset - """ + """Get a specific asset.""" row: ORMAsset = await assets.get_asset(asset_id) if row.dataset != dataset and row.version != version: @@ -88,9 +85,7 @@ async def get_asset( async def get_assets_root( *, asset_type: Optional[AssetType] = Query(None, title="Filter by Asset Type") ): - """ - Get all assets - """ + """Get all assets.""" if asset_type: rows: List[ORMAsset] = await assets.get_assets_by_type(asset_type) else: @@ -106,9 +101,7 @@ async def get_assets_root( response_model=Asset, ) async def get_asset_root(*, asset_id: UUID = Path(...)): - """ - Get a specific asset - """ + """Get a specific asset.""" row: ORMAsset = await assets.get_asset(asset_id) return row @@ -129,10 +122,12 @@ async def add_new_asset( response: Response, ): """ + Add a new asset to a dataset version. Managed assets will be generated by the API itself. In that case, the Asset URI is read only and will be set automatically. If the asset is not managed, you need to specify an Asset URI to link to. + """ # row: ORMAsset = ... # response.headers["Location"] = f"/{dataset}/{version}/asset/{row.asset_id}" @@ -154,8 +149,10 @@ async def delete_asset( is_authorized: bool = Depends(is_admin), ): """ + Delete selected asset. For managed assets, all resources will be deleted. For non-managed assets, only the link will be deleted. + """ pass @@ -169,9 +166,7 @@ async def asset_history( request: ChangeLog, is_authorized: bool = Depends(is_admin), ): - """ - Log changes for given asset - """ + """Log changes for given asset.""" row = await assets.update_asset(asset_id, change_log=[request.dict()]) @@ -179,51 +174,11 @@ async def asset_history( async def _asset_response(data: ORMAsset) -> Dict[str, Any]: - """ - Serialize ORM response - """ + """Serialize ORM response.""" response = Asset.from_orm(data).dict(by_alias=True) return response -async def _create_database_table(): - # supported input types - # - table - # - vector - - # steps - # - create table and upload data into database - # - inherit from geostore (vector/ polygon only) - - # creation options: - # - indicies (which fields get an index, what kind of index?) - # - partitioning (how to partition table, on what field?) - # - cluster (how to cluster table, based on which index?) - # - force field types (don't let loader guess field types, but provide list of field types instead) - - # custom metadata - # - fields (field name, field alias, field description, field type, is filter, is feature info) - - raise NotImplementedError - - -async def _create_dynamic_vector_tile_cache(): - # supported input types - # - vector - - # steps - # - wait until database table is created - # - create dynamic vector tile asset entry to enable service - - # creation options: - # - default symbology/ legend - - # custom metadata - # - default symbology/ legend - - raise NotImplementedError - - async def _create_static_vector_tile_cache(): # supported input types # - vector diff --git a/app/routes/datasets.py b/app/routes/datasets.py index 2d730c579..4aa9c8273 100644 --- a/app/routes/datasets.py +++ b/app/routes/datasets.py @@ -27,9 +27,8 @@ "/", response_class=ORJSONResponse, tags=["Dataset"], response_model=List[Dataset] ) async def get_datasets(): - """ - Get list of all datasets - """ + """Get list of all datasets.""" + return await datasets.get_datasets() @@ -40,9 +39,8 @@ async def get_datasets(): response_model=Dataset, ) async def get_dataset(*, dataset: str = Depends(dataset_dependency)): - """ - Get basic metadata and available versions for a given dataset - """ + """Get basic metadata and available versions for a given dataset.""" + row: ORMDataset = await datasets.get_dataset(dataset) return await _dataset_response(dataset, row) @@ -61,9 +59,8 @@ async def create_dataset( is_authorized: bool = Depends(is_admin), response: Response, ): - """ - Create or update a dataset - """ + """Create or update a dataset.""" + new_dataset: ORMDataset = await datasets.create_dataset(dataset, **request.dict()) await db.status(CreateSchema(dataset)) @@ -89,7 +86,9 @@ async def update_dataset_metadata( is_authorized: bool = Depends(is_admin), ): """ + Partially update a dataset. Only metadata field can be updated. All other fields will be ignored. + """ row: ORMDataset = await datasets.update_dataset(dataset, request) @@ -108,9 +107,7 @@ async def delete_dataset( dataset: str = Depends(dataset_dependency), is_authorized: bool = Depends(is_admin), ): - """ - Delete a dataset - """ + """Delete a dataset.""" row: ORMDataset = await datasets.delete_dataset(dataset) await db.status(DropSchema(dataset)) diff --git a/app/routes/features.py b/app/routes/features.py index 57edc6879..84e20bf8e 100644 --- a/app/routes/features.py +++ b/app/routes/features.py @@ -20,8 +20,10 @@ async def get_features( z: int = Query(None, title="Zoom level", ge=0, le=22) ): """ + Retrieve list of features Add optional spatial filter using a point buffer (for info tool). + """ # return await get_features_by_location(db, dataset, version, lat, lng, z) pass @@ -38,9 +40,8 @@ async def get_feature( version: str = Depends(version_dependency), feature_id: int = Path(..., title="Feature ID", ge=0) ): - """ - Retrieve attribute values for a given feature - """ + """Retrieve attribute values for a given feature.""" + pass diff --git a/app/routes/geostore.py b/app/routes/geostore.py index 5b4275e18..09e1340c1 100644 --- a/app/routes/geostore.py +++ b/app/routes/geostore.py @@ -14,9 +14,7 @@ "/geostore", response_class=ORJSONResponse, tags=["Geostore"], ) async def add_new_geostore(): - """ - Add geostore feature to User geostore - """ + """Add geostore feature to User geostore.""" pass @@ -42,7 +40,9 @@ async def get_geostore( geostore_id: UUID = Path(..., title="geostore_id") ): """ + Retrieve GeoJSON representation for a given geostore ID of a dataset version. Obtain geostore ID from feature attributes. + """ pass diff --git a/app/routes/queries.py b/app/routes/queries.py index 9d8510ee1..cc84b0e6b 100644 --- a/app/routes/queries.py +++ b/app/routes/queries.py @@ -17,7 +17,6 @@ async def query_dataset( sql: str = Query(None, title="SQL query"), geostore_id: UUID = Query(None, title="Geostore ID") ): - """ - Execute a read ONLY SQL query on the given dataset version (if implemented) - """ + """Execute a read ONLY SQL query on the given dataset version (if implemented).""" + pass diff --git a/app/routes/versions.py b/app/routes/versions.py index 21eed3919..37c45324a 100644 --- a/app/routes/versions.py +++ b/app/routes/versions.py @@ -1,16 +1,6 @@ -import json -from typing import Any, Dict, List, Tuple -from typing.io import IO - -from fastapi import ( - APIRouter, - BackgroundTasks, - Depends, - File, - Form, - Response, - UploadFile, -) +from typing import Any, Dict, List + +from fastapi import APIRouter, BackgroundTasks, Depends, Response from fastapi.responses import ORJSONResponse from ..crud import versions @@ -18,23 +8,18 @@ from ..models.orm.versions import Version as ORMVersion from ..models.pydantic.change_log import ChangeLog from ..models.pydantic.versions import Version, VersionCreateIn, VersionUpdateIn -from ..routes import ( # version_dependency_form, - dataset_dependency, - is_admin, - version_dependency, -) -from ..settings.globals import BUCKET +from ..routes import dataset_dependency, is_admin, version_dependency from ..tasks.default_assets import create_default_asset router = APIRouter() description = """ - Datasets can have different versions. Versions aer usually - linked to different releases. Versions can be either mutable (data can change) or immutable (data - cannot change). By default versions are immutable. Every version needs one or many source files. - These files can be a remote, publicly accessible URL or an uploaded file. Based on the source file(s), - users can create additional assets and activate additional endpoints to view and query the dataset. - Available assets and endpoints to choose from depend on the source type. - """ +Datasets can have different versions. Versions aer usually +linked to different releases. Versions can be either mutable (data can change) or immutable (data +cannot change). By default versions are immutable. Every version needs one or many source files. +These files can be a remote, publicly accessible URL or an uploaded file. Based on the source file(s), +users can create additional assets and activate additional endpoints to view and query the dataset. +Available assets and endpoints to choose from depend on the source type. +""" # TODO: @@ -52,9 +37,7 @@ async def get_version( dataset: str = Depends(dataset_dependency), version: str = Depends(version_dependency), ): - """ - Get basic metadata for a given version - """ + """Get basic metadata for a given version.""" row: ORMVersion = await versions.get_version(dataset, version) @@ -77,9 +60,7 @@ async def add_new_version( is_authorized: bool = Depends(is_admin), response: Response, ): - """ - Create or update a version for a given dataset - """ + """Create or update a version for a given dataset.""" async def callback(message: Dict[str, Any]) -> None: pass @@ -176,9 +157,11 @@ async def update_version( is_authorized: bool = Depends(is_admin), ): """ + Partially update a version of a given dataset. When using PATCH and uploading files, - this will overwrite the existing source(s) and trigger a complete update of all managed assets + this will overwrite the existing source(s) and trigger a complete update of all managed assets. + """ input_data = request.dict() @@ -201,9 +184,8 @@ async def delete_version( version: str = Depends(version_dependency), is_authorized: bool = Depends(is_admin), ): - """ - Delete a version - """ + """Delete a version.""" + row: ORMVersion = await versions.delete_version(dataset, version) # TODO: @@ -220,9 +202,8 @@ async def version_history( request: ChangeLog, is_authorized: bool = Depends(is_admin), ): - """ - Log changes for given dataset version - """ + """Log changes for given dataset version.""" + message = [request.dict()] return await versions.update_version(dataset, version, change_log=message) @@ -231,9 +212,7 @@ async def version_history( async def _version_response( dataset: str, version: str, data: ORMVersion ) -> Dict[str, Any]: - """ - Assure that version responses are parsed correctly and include associated assets - """ + """Assure that version responses are parsed correctly and include associated assets.""" assets: List[ORMAsset] = await ORMAsset.select("asset_type", "asset_uri").where( ORMAsset.dataset == dataset diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py index 9caeea800..6ceea16c4 100644 --- a/app/tasks/__init__.py +++ b/app/tasks/__init__.py @@ -1,9 +1,8 @@ -from typing import Any, Dict, List, Tuple, Union +from typing import Any, Dict, List from ..application import ContextEngine, db from ..crud import assets from ..models.orm.queries.fields import fields -from ..models.pydantic.creation_options import Partitions from ..models.pydantic.metadata import FieldMetadata from ..settings.globals import ( WRITER_DBNAME, @@ -23,9 +22,7 @@ async def get_field_metadata(dataset: str, version: str) -> List[Dict[str, Any]]: - """ - Get field list for asset and convert into Metadata object - """ + """Get field list for asset and convert into Metadata object.""" rows = await db.all(fields, dataset=dataset, version=version) field_metadata = list() @@ -42,17 +39,14 @@ async def get_field_metadata(dataset: str, version: str) -> List[Dict[str, Any]] async def update_asset_status(asset_id, status): - """ - Update status of asset - """ + """Update status of asset.""" + async with ContextEngine("PUT"): await assets.update_asset(asset_id, status=status) async def update_asset_field_metadata(dataset, version, asset_id): - """ - Update asset field metadata - """ + """Update asset field metadata.""" field_metadata: List[Dict[str, Any]] = await get_field_metadata(dataset, version) metadata = {"fields_": field_metadata} diff --git a/app/tasks/batch.py b/app/tasks/batch.py index f4658454f..3ca8793bd 100644 --- a/app/tasks/batch.py +++ b/app/tasks/batch.py @@ -1,4 +1,3 @@ -import logging from datetime import datetime from time import sleep from typing import Any, Awaitable, Callable, Dict, List, Optional, Set @@ -33,9 +32,11 @@ async def schedule( jobs: List[Job], callback: Callable[[Dict[str, Any]], Awaitable[None]] ) -> Dict[str, str]: """ + Submit multiple batch jobs at once. Submitted batch jobs can depend on each other. Dependent jobs need to be listed in `dependent_jobs` - and must have a `parents` attribute with the parent job names + and must have a `parents` attribute with the parent job names. + """ scheduled_jobs = dict() @@ -165,9 +166,7 @@ async def poll_jobs( def submit_batch_job( job: Job, depends_on: Optional[List[Dict[str, Any]]] = None ) -> str: - """ - Submit job to AWS Batch - """ + """Submit job to AWS Batch.""" client = get_batch_client() if depends_on is None: diff --git a/app/tasks/default_assets.py b/app/tasks/default_assets.py index 10d01970d..2e10064fb 100644 --- a/app/tasks/default_assets.py +++ b/app/tasks/default_assets.py @@ -1,6 +1,5 @@ -import os from datetime import datetime -from typing import Any, Awaitable, Callable, Dict, List, Optional +from typing import Any, Awaitable, Callable, Dict, Optional from typing.io import IO from ..application import ContextEngine @@ -60,9 +59,8 @@ async def create_default_asset( async def _inject_file(file_obj: IO, s3_uri: str) -> ChangeLog: - """ - Upload a file-like object to S3 data lake - """ + """ Upload a file-like object to S3 data lake """ + s3 = get_s3_client() bucket, path = split_s3_path(s3_uri) diff --git a/app/tasks/table_source_assets.py b/app/tasks/table_source_assets.py index 54c8be6ce..2224c0a91 100644 --- a/app/tasks/table_source_assets.py +++ b/app/tasks/table_source_assets.py @@ -1,8 +1,6 @@ import json from typing import Any, Dict, List -from fastapi.logger import logger - from app.application import ContextEngine from app.crud import assets from app.models.pydantic.assets import AssetTaskCreate diff --git a/app/tasks/vector_source_assets.py b/app/tasks/vector_source_assets.py index f6d5ced6c..c4b34a5a3 100644 --- a/app/tasks/vector_source_assets.py +++ b/app/tasks/vector_source_assets.py @@ -8,12 +8,7 @@ from app.models.pydantic.creation_options import VectorSourceCreationOptions from app.models.pydantic.jobs import GdalPythonImportJob, Job, PostgresqlClientJob from app.models.pydantic.metadata import DatabaseTableMetadata -from app.tasks import ( - get_field_metadata, - update_asset_field_metadata, - update_asset_status, - writer_secrets, -) +from app.tasks import update_asset_field_metadata, update_asset_status, writer_secrets from app.tasks.batch import execute @@ -25,7 +20,9 @@ async def vector_source_asset( metadata: Dict[str, Any], callback, ) -> ChangeLog: - assert len(source_uris) == 1, "Vector sources only support one input file" + + if len(source_uris) != 1: + raise AssertionError("Vector sources only support one input file") options = VectorSourceCreationOptions(**creation_options) diff --git a/app/utils/aws.py b/app/utils/aws.py index 4cb00171c..e273dda18 100644 --- a/app/utils/aws.py +++ b/app/utils/aws.py @@ -8,9 +8,11 @@ def client_constructor(service: str): """ + Using closure design for a client constructor This way we only need to create the client once in central location and it will be easier to mock + """ service_client = None diff --git a/batch/python/fire/create_table.py b/batch/python/fire/create_table.py deleted file mode 100644 index bb8312290..000000000 --- a/batch/python/fire/create_table.py +++ /dev/null @@ -1,61 +0,0 @@ -import os - -import pendulum -import psycopg2 -from pendulum.parsing.exceptions import ParserError - - -def get_sql(sql_tmpl, **kwargs): - with open(sql_tmpl, "r") as tmpl: - sql = tmpl.read().format(**kwargs) - print(sql) - return sql - - -def cli(): - connection = psycopg2.connect( - database=os.environ["POSTGRES_NAME"], - user=os.environ["POSTGRES_USERNAME"], - password=os.environ["POSTGRES_PASSWORD"], - port=os.environ["POSTGRES_PORT"], - host=os.environ["POSTGRES_HOST"], - ) - - years = range(2011, 2022) - schema = "nasa_viirs_fire_alerts" - table = "v202003" - - cursor = connection.cursor() - print("Create table") - - cursor.execute(get_sql("sql/create_table.sql.tmpl", schema=schema, table=table)) - - for year in years: - for week in range(1, 54): - try: - week = f"{week:02}" - start = pendulum.parse(f"{year}-W{week}").to_date_string() - end = pendulum.parse(f"{year}-W{week}").add(days=7).to_date_string() - print(f"Create partition for week {week}") - cursor.execute( - get_sql( - "sql/create_partitions.sql.tmpl", - schema=schema, - table=table, - year=year, - week=week, - start=start, - end=end, - ) - ) - except ParserError: - # Year has only 52 weeks - pass - - connection.commit() - cursor.close() - connection.close() - - -if __name__ == "__main__": - cli() diff --git a/batch/python/fire/load.py b/batch/python/fire/load.py deleted file mode 100644 index e8bdc3995..000000000 --- a/batch/python/fire/load.py +++ /dev/null @@ -1,73 +0,0 @@ -import os - -import click -import psycopg2 - - -@click.command() -@click.argument("input") -@click.option("--schema", default="nasa_viirs_fire_alerts") -@click.option("--table", default="v202003") -def cli(input, schema, table): - load(input, schema, table) - - -def load(input, schema="nasa_viirs_fire_alerts", table="v202003"): - - connection = psycopg2.connect( - database=os.environ["POSTGRES_NAME"], - user=os.environ["POSTGRES_USERNAME"], - password=os.environ["POSTGRES_PASSWORD"], - port=os.environ["POSTGRES_PORT"], - host=os.environ["POSTGRES_HOST"], - ) - - cursor = connection.cursor() - with open(input, "r") as f: - # Notice that we don't need the `csv` module. - next(f) # Skip the header row. - cursor.copy_from( - f, - f"{schema}.{table}", - columns=( - "iso", - "adm1", - "adm2", - "longitude", - "latitude", - "alert__date", - "alert__time_utc", - "confidence__cat", - "bright_ti4__K", - "bright_ti5__K", - "frp__MW", - "wdpa_protected_area__iucn_cat", - "is__regional_primary_forest", - "is__alliance_for_zero_extinction_site", - "is__key_biodiversity_area", - "is__landmark", - "gfw_plantation__type", - "is__gfw_mining", - "is__gfw_logging", - "rspo_oil_palm__certification_status", - "is__gfw_wood_fiber", - "is__peat_land", - "is__idn_forest_moratorium", - "is__gfw_oil_palm", - "idn_forest_area__type", - "per_forest_concession__type", - "is__gfw_oil_gas", - "is__mangroves_2016", - "is__intact_forest_landscapes_2016", - "bra_biome__name", - "alert__count", - ), - ) - - connection.commit() - cursor.close() - connection.close() - - -if __name__ == "__main__": - cli() diff --git a/batch/python/fire/post_processing.py b/batch/python/fire/post_processing.py deleted file mode 100644 index a3cf70122..000000000 --- a/batch/python/fire/post_processing.py +++ /dev/null @@ -1,150 +0,0 @@ -import concurrent.futures -import os -from typing import List, Tuple - -import pendulum -import psycopg2 -from pendulum.parsing.exceptions import ParserError -from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT -from psycopg2.pool import ThreadedConnectionPool - -A_POOL = None -YEARS = range(2011, 2022) -WEEKS = range(1, 54) -SCHEMA = "nasa_viirs_fire_alerts" -TABLE = "v202003" - - -def cli() -> None: - """ - Post processing of VIRRS fire data - -> update geographic columns - -> create indicies - -> cluster partitions - Tasks are run asynchronously for each partition - """ - - pool = get_pool() - weeks = _get_weeks() - - create_indicies() - - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - executor.map(cluster, weeks) - - pool.closeall() - - -def get_pool() -> ThreadedConnectionPool: - """ - The database connection pool - """ - global A_POOL - if A_POOL is None: - A_POOL = psycopg2.pool.ThreadedConnectionPool( - 1, - 10, - database=os.environ["POSTGRES_NAME"], - user=os.environ["POSTGRES_USERNAME"], - password=os.environ["POSTGRES_PASSWORD"], - port=os.environ["POSTGRES_PORT"], - host=os.environ["POSTGRES_HOST"], - ) - return A_POOL - - -def create_indicies() -> None: - """ - This creates an invalid index. - It will be validated automatically, once all partitions are indexed and attached. - """ - pool = get_pool() - conn = pool.getconn() - - with conn.cursor() as cursor: - cursor.execute( - _get_sql("sql/update_geometry.sql.tmpl", schema=SCHEMA, table=TABLE,) - ) - - with conn.cursor() as cursor: - cursor.execute( - _get_sql( - "sql/create_indicies.sql.tmpl", - schema=SCHEMA, - table=TABLE, - column="geom", - index="gist", - ) - ) - - with conn.cursor() as cursor: - cursor.execute( - _get_sql( - "sql/create_indicies.sql.tmpl", - schema=SCHEMA, - table=TABLE, - column="geom_wm", - index="gist", - ) - ) - - with conn.cursor() as cursor: - cursor.execute( - _get_sql( - "sql/create_indicies.sql.tmpl", - schema=SCHEMA, - table=TABLE, - column="alert__date", - index="btree", - ) - ) - - -def cluster(weeks: Tuple[int, str]) -> None: - year = weeks[0] - week = weeks[1] - - pool = get_pool() - conn = pool.getconn() - cursor = conn.cursor() - - cursor.execute( - _get_sql( - "sql/cluster_partitions.sql.tmpl", - schema=SCHEMA, - table=TABLE, - year=year, - week=week, - ) - ) - - cursor.close() - - pool.putconn(conn) - - -def _get_sql(sql_tmpl, **kwargs) -> str: - with open(sql_tmpl, "r") as tmpl: - sql = tmpl.read().format(**kwargs) - print(sql) - return sql - - -def _get_weeks() -> List[Tuple[int, str]]: - weeks: List[Tuple[int, str]] = list() - for year in YEARS: - for week in WEEKS: - try: - # Check if year has that many weeks - pendulum.parse(f"{year}-W{week}") - - week_str = f"{week:02}" - weeks.append((year, week_str)) - except ParserError: - # Year has only 52 weeks - pass - return weeks - - -if __name__ == "__main__": - cli() diff --git a/batch/python/fire/seed.py b/batch/python/fire/seed.py deleted file mode 100644 index 6c8d5a54e..000000000 --- a/batch/python/fire/seed.py +++ /dev/null @@ -1,28 +0,0 @@ -import os - -import boto3 -from create_table import cli as create_table -from load import load -from post_processing import cli as post_processing - -s3 = boto3.client("s3") - - -def cli(): - create_table() - - for i in range(0, 300): - prefix = f"geotrellis/results/firealerts_gadm_2020-03-23/2020-03-23/fireAlerts_20200323_1934/all/part-00{str(i).zfill(3)}-fda1eafb-2d6e-42df-8b12-eb71454eba0f-c000.csv" - output = f"viirs_{i}.csv" - s3.download_file("gfw-pipelines-dev", prefix, output) - print(f"Download and load {prefix}") - load(output) - os.remove(output) - - print("DONE loading") - print("Start postprocessing") - post_processing() - - -if __name__ == "__main__": - cli() diff --git a/batch/python/fire/sql/cluster_partitions.sql.tmpl b/batch/python/fire/sql/cluster_partitions.sql.tmpl deleted file mode 100644 index 2a64da8e3..000000000 --- a/batch/python/fire/sql/cluster_partitions.sql.tmpl +++ /dev/null @@ -1,2 +0,0 @@ -CLUSTER {schema}.{table}_y{year}w{week} - USING {table}_y{year}w{week}_geom_wm_idx; \ No newline at end of file diff --git a/batch/python/fire/sql/create_indicies.sql.tmpl b/batch/python/fire/sql/create_indicies.sql.tmpl deleted file mode 100644 index ee9281380..000000000 --- a/batch/python/fire/sql/create_indicies.sql.tmpl +++ /dev/null @@ -1,3 +0,0 @@ -CREATE INDEX IF NOT EXISTS {table}_{column}_idx - ON {schema}.{table} USING {index} - ({column}); \ No newline at end of file diff --git a/batch/python/fire/sql/create_partition_indicies.sql.tmpl b/batch/python/fire/sql/create_partition_indicies.sql.tmpl deleted file mode 100644 index 66490131a..000000000 --- a/batch/python/fire/sql/create_partition_indicies.sql.tmpl +++ /dev/null @@ -1,5 +0,0 @@ -CREATE INDEX IF NOT EXISTS {table}_y{year}w{week}_{column}_idx - ON {schema}.{table}_y{year}w{week} USING {index} - ({column}); -ALTER INDEX {schema}.{table}_{column}_idx - ATTACH PARTITION {schema}.{table}_y{year}w{week}_{column}_idx; \ No newline at end of file diff --git a/batch/python/fire/sql/create_partitions.sql.tmpl b/batch/python/fire/sql/create_partitions.sql.tmpl deleted file mode 100644 index 7cd63bcc9..000000000 --- a/batch/python/fire/sql/create_partitions.sql.tmpl +++ /dev/null @@ -1,2 +0,0 @@ -CREATE TABLE {schema}.{table}_y{year}w{week} PARTITION OF {schema}.{table} - FOR VALUES FROM ('{start}') TO ('{end}'); \ No newline at end of file diff --git a/batch/python/fire/sql/create_table.sql.tmpl b/batch/python/fire/sql/create_table.sql.tmpl deleted file mode 100644 index dbcd95288..000000000 --- a/batch/python/fire/sql/create_table.sql.tmpl +++ /dev/null @@ -1,40 +0,0 @@ -CREATE EXTENSION IF NOT EXISTS postgis; -CREATE SCHEMA IF NOT EXISTS {schema}; --- DROP TABLE IF EXISTS {schema}.{table} CASCADE; -CREATE TABLE {schema}.{table} -( - iso character(3), - adm1 text, - adm2 text, - longitude numeric NOT NULL, - latitude numeric NOT NULL, - alert__date date, - alert__time_utc text, - confidence__cat text, - bright_ti4__K numeric, - bright_ti5__K numeric, - frp__MW numeric, - wdpa_protected_area__iucn_cat text, - is__regional_primary_forest boolean, - is__alliance_for_zero_extinction_site boolean, - is__key_biodiversity_area boolean, - is__landmark boolean, - gfw_plantation__type text, - is__gfw_mining boolean, - is__gfw_logging boolean, - rspo_oil_palm__certification_status text, - is__gfw_wood_fiber boolean, - is__peat_land boolean, - is__idn_forest_moratorium boolean, - is__gfw_oil_palm boolean, - idn_forest_area__type text, - per_forest_concession__type text, - is__gfw_oil_gas boolean, - is__mangroves_2016 boolean, - is__intact_forest_landscapes_2016 boolean, - bra_biome__name text, - alert__count smallint, - geom geometry(POINT,4326), - geom_wm geometry(POINT, 3857) -) -PARTITION BY RANGE (alert__date); diff --git a/batch/python/fire/sql/update_geometry.sql.tmpl b/batch/python/fire/sql/update_geometry.sql.tmpl deleted file mode 100644 index 2f89f5641..000000000 --- a/batch/python/fire/sql/update_geometry.sql.tmpl +++ /dev/null @@ -1,3 +0,0 @@ -UPDATE {schema}.{table} -SET geom = ST_SetSRID(ST_MakePoint(longitude, latitude),4326), - geom_wm = ST_Transform(ST_SetSRID(ST_MakePoint(longitude, latitude),4326), 3857); \ No newline at end of file diff --git a/wait_for_postgres.sh b/wait_for_postgres.sh index f3e9845c5..30a0effbe 100644 --- a/wait_for_postgres.sh +++ b/wait_for_postgres.sh @@ -3,12 +3,10 @@ set -e -cmd="$@" - until PGPASSWORD=$DB_PASSWORD psql -h "$DB_HOST" -U "$DB_USER" -d "$DATABASE" -c '\q'; do >&2 echo "Postgres is unavailable - sleeping" sleep 1 done >&2 echo "Postgres is up - executing command" -exec $cmd \ No newline at end of file +exec "$@" \ No newline at end of file