Skip to content

Commit

Permalink
Merge branch 'develop' into uv_at_last
Browse files Browse the repository at this point in the history
  • Loading branch information
dmannarino committed Dec 1, 2024
2 parents 85b4730 + 7be61b9 commit 3ab3b37
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 16 deletions.
29 changes: 29 additions & 0 deletions app/models/orm/migrations/versions/604bf4e66c2b_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Add content_date_description to version_metadata
Revision ID: 604bf4e66c2b
Revises: ef3392e8e054
Create Date: 2024-10-31 16:52:56.571782
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '604bf4e66c2b'
down_revision = 'ef3392e8e054'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('version_metadata', sa.Column('content_date_description', sa.String(), nullable=True))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('version_metadata', 'content_date_description')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions app/models/orm/version_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class VersionMetadata(Base, MetadataMixin):
version = db.Column(db.String, nullable=False)
content_date = db.Column(db.Date)
content_start_date = db.Column(db.Date)
content_date_description = db.Column(db.String)
content_end_date = db.Column(db.Date)
last_update = db.Column(db.Date)
description = db.Column(db.String)
Expand Down
7 changes: 7 additions & 0 deletions app/models/pydantic/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ class FieldType(StrictBaseModel):


class RasterTileSetAssetCreationOptions(StrictBaseModel):
unify_projection: bool = Field(
False,
description=(
"First re-project to a common projection (EPSG:4326). Necessary "
"when input files are in different projections from each other."
)
)
pixel_meaning: str
data_type: DataType
nbits: Optional[int]
Expand Down
10 changes: 9 additions & 1 deletion app/models/pydantic/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ class VersionMetadata(CommonMetadata):
None,
description="Date range covered by the content",
)

content_date_description: Optional[str] = Field(
None,
description="Date of content to display",
)
last_update: Optional[date] = Field(
None,
description="Date the data were last updated",
Expand All @@ -130,6 +133,7 @@ class Config:
"start_date": "2000-01-01", # TODO fix date
"end_date": "2021-04-06",
},
"content_date_description": "2000 - present",
}
]
}
Expand Down Expand Up @@ -159,6 +163,10 @@ class VersionMetadataUpdate(VersionMetadataIn):
None,
description="Date range covered by the content",
)
content_date_description: Optional[str] = Field(
None,
description="Date of content to display",
)

last_update: Optional[date] = Field(
None,
Expand Down
2 changes: 1 addition & 1 deletion app/routes/datasets/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ async def _query_table(
sql: str,
geometry: Optional[Geometry],
) -> List[Dict[str, Any]]:
# parse and validate SQL statement
# Parse and validate SQL statement
try:
parsed = parse_sql(unquote(sql))
except ParseError as e:
Expand Down
1 change: 1 addition & 0 deletions app/routes/datasets/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async def update_version(
AssetType.dynamic_vector_tile_cache,
AssetType.static_vector_tile_cache,
AssetType.raster_tile_cache,
AssetType.cog,
],
)

Expand Down
38 changes: 34 additions & 4 deletions app/tasks/raster_tile_set_assets/raster_tile_set_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
from app.models.pydantic.geostore import FeatureCollection
from app.models.pydantic.jobs import Job
from app.models.pydantic.statistics import BandStats, Histogram, RasterStats
from app.settings.globals import DATA_LAKE_BUCKET
from app.tasks import Callback, callback_constructor
from app.tasks.batch import execute
from app.tasks.raster_tile_set_assets.utils import create_pixetl_job
from app.tasks.raster_tile_set_assets.utils import create_pixetl_job, create_unify_projection_job
from app.utils.aws import get_s3_client
from app.utils.path import (
get_asset_uri,
Expand Down Expand Up @@ -67,13 +68,42 @@ async def raster_tile_set_asset(

creation_options = PixETLCreationOptions(**co)

jobs: List[Job] = list()
callback: Callback = callback_constructor(asset_id)

create_raster_tile_set_job: Job = await create_pixetl_job(
dataset, version, creation_options, "create_raster_tile_set", callback
unify_job: Optional[Job] = None
if creation_options.unify_projection:
target_crs = "epsg:4326"
new_src_uris = list()
for i,_ in enumerate(creation_options.source_uri):
new_src_uris.append(
f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/raster/"
f"{target_crs.replace(':', '-')}/reprojected/SRC_{i}"
)
target_prefix = new_src_uris[0].rsplit("/", 1)[0]
unify_job = await create_unify_projection_job(
dataset,
creation_options.source_uri,
target_prefix,
target_crs,
"unify_projection",
callback
)
jobs.append(unify_job)
creation_options.source_uri = new_src_uris

jobs.append(
await create_pixetl_job(
dataset,
version,
creation_options,
"create_raster_tile_set",
callback,
[unify_job] if unify_job is not None else None,
)
)

log: ChangeLog = await execute([create_raster_tile_set_job])
log: ChangeLog = await execute(jobs)

return log

Expand Down
37 changes: 36 additions & 1 deletion app/tasks/raster_tile_set_assets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from app.models.enum.assets import AssetType
from app.models.enum.pixetl import ResamplingMethod
from app.models.pydantic.creation_options import PixETLCreationOptions
from app.models.pydantic.jobs import GDALDEMJob, Job, PixETLJob
from app.models.pydantic.jobs import GDALDEMJob, Job, PixETLJob, GDAL2TilesJob
from app.settings.globals import (
AWS_GCS_KEY_SECRET_ARN,
DEFAULT_JOB_DURATION,
Expand Down Expand Up @@ -225,3 +225,38 @@ async def create_resample_job(
parents=[parent.job_name for parent in parents] if parents else None,
**kwargs,
)

async def create_unify_projection_job(
dataset: str,
old_source_uris: List[str],
target_prefix: str,
target_crs: str,
job_name: str,
callback: Callback
) -> GDAL2TilesJob:
"""Creates a Batch job that takes all files indicated in old_source_uris
and re-projects each to a common CRS, then places them in a mirror of the
original directory structure under the target_prefix, divided by source
number. More specifically, the files from the first source URI will be
put at <target_prefix>/SRC_0, the files from the second under
<target_prefix>/SRC_1, and so on.
"""

command = [
"unify_projection.sh",
"--target_crs",
target_crs,
]

for s in old_source_uris:
command.extend(["--source", s])

command.extend(["--target", target_prefix])

return GDAL2TilesJob(
dataset=dataset,
job_name=job_name,
command=command,
environment=JOB_ENV,
callback=callback,
)
2 changes: 1 addition & 1 deletion batch/gdal-python.dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM globalforestwatch/data-api-gdal:v1.2.1
FROM globalforestwatch/data-api-gdal:v1.2.2

# Copy scripts
COPY ./batch/scripts/ /opt/scripts/
Expand Down
4 changes: 2 additions & 2 deletions batch/pixetl.dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM globalforestwatch/pixetl:v1.7.7
FROM globalforestwatch/pixetl:v1.7.7_test_parallel

# Copy scripts
COPY ./batch/scripts/ /opt/scripts/
Expand All @@ -18,4 +18,4 @@ WORKDIR /tmp
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8

ENTRYPOINT ["/opt/scripts/report_status.sh"]
ENTRYPOINT ["/opt/scripts/report_status.sh"]
44 changes: 39 additions & 5 deletions batch/python/export_to_gee.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ def upload_cog_to_gcs(dataset, implementation):
return f"gs://{GCS_BUCKET}/{dataset}/{implementation}.tif"


def create_cog_backed_asset(dataset, implementation, gcs_path, service_account):
credentials = ee.ServiceAccountCredentials(service_account, GCS_CREDENTIALS_FILE)
ee.Initialize(credentials)

def create_cog_backed_asset(dataset, implementation, gcs_path, credentials):
# delete any existing asset with the same dataset/implementation
try:
ee.data.deleteAsset(f"projects/{EE_PROJECT}/assets/{dataset}/{implementation}")
Expand Down Expand Up @@ -84,6 +81,38 @@ def create_cog_backed_asset(dataset, implementation, gcs_path, service_account):
f"GEE returned unexpected status code {response.status_code} with payload {response.content}"
)

return asset_id


def ingest_in_gee(dataset, implementation, gcs_path):
"""Ingest directly into GEE as a best effort task."""
# delete any existing asset with the same dataset/implementation
try:
ee.data.deleteAsset(f"projects/{EE_PROJECT}/assets/{dataset}/{implementation}")
except ee.EEException:
# asset doesn't exist
pass

# create dataset folder if it doesn't exist
try:
ee.data.createAsset(
{"type": "Folder"}, f"projects/{EE_PROJECT}/assets/{dataset}"
)
except ee.EEException:
# folder already exists
pass

asset_id = f"{dataset}/{implementation}"
request_id = ee.data.newTaskId()[0]
params = {
"name": f"projects/{EE_PROJECT}/assets/{asset_id}",
"tilesets": [{"sources": [{"uris": [gcs_path]}]}],
}
ee.data.startIngestion(request_id=request_id, params=params)
return asset_id


def set_acl_to_anyone_read(asset_id):
# update ACL to be public
full_asset_id = f"projects/{EE_PROJECT}/assets/{asset_id}"
acl = ee.data.getAssetAcl(full_asset_id)
Expand All @@ -96,8 +125,13 @@ def export_to_gee(
implementation: str = Option(..., help="Implementation name."),
):
service_account = set_google_application_credentials()

# initialize GEE
credentials = ee.ServiceAccountCredentials(service_account, GCS_CREDENTIALS_FILE)
ee.Initialize(credentials)

gcs_path = upload_cog_to_gcs(dataset, implementation)
create_cog_backed_asset(dataset, implementation, gcs_path, service_account)
ingest_in_gee(dataset, implementation, gcs_path)


if __name__ == "__main__":
Expand Down
40 changes: 40 additions & 0 deletions batch/scripts/_tiff_crosses_dateline.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash
#
# USAGE: _tiff_crosses_dateline.sh raster_file
#
# Prints the string "true" if the input raster will cross the dateline
# when converting to EPSG:4326, "false" otherwise
#
# Needs GDAL 2.0+ and Python
#
# Credit: Slightly modified from https://gis.stackexchange.com/a/222341


if [ -z "${1}" ]; then
echo -e "Error: No input raster file given.\n> USAGE: _tiff_crosses_dateline.sh raster_file"
exit 1
fi

# Get raster info, save it to a variable as we need it several times
gdalinfo=$(gdalinfo "${1}" -json)

# Exit if -json switch is not available
if [ ! -z $(echo $gdalinfo | grep "^Usage:") ]; then
echo -e "Error: GDAL command failed, Version 2.0+ is needed"
exit 1
fi

function jsonq {
echo "${1}" | python -c "import json,sys; jdata = sys.stdin.read(); data = json.loads(jdata); print(data${2});"
}

ulx=$(jsonq "$gdalinfo" "['wgs84Extent']['coordinates'][0][0][0]")
llx=$(jsonq "$gdalinfo" "['wgs84Extent']['coordinates'][0][1][0]")
lrx=$(jsonq "$gdalinfo" "['wgs84Extent']['coordinates'][0][3][0]")
urx=$(jsonq "$gdalinfo" "['wgs84Extent']['coordinates'][0][2][0]")

crossing_dateline=false
test $(python -c "print(${ulx}>${lrx})") = True && crossing_dateline=true
test $(python -c "print(${llx}>${urx})") = True && crossing_dateline=true

echo -n "${crossing_dateline}"
38 changes: 38 additions & 0 deletions batch/scripts/_warp_and_upload.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash

set -e

# arguments:
# $0 - The name of this script
# $1 - local_src_file
# $2 - local_warped_file
# $3 - target_crs
# $4 - remote target file

if aws s3 ls "$4"; then
echo "Remote target file $4 already exists, skipping..."
exit 0
fi

warp_options=("-co" "COMPRESS=DEFLATE" "-co" "TILED=yes")

echo "Seeing if TIFF crosses the dateline"
crosses="$(_tiff_crosses_dateline.sh $1)"
if [ "${crosses}" = "true" ]; then
echo "$1 crosses the dateline"
warp_options+=("--config" "CENTER_LONG" "180")
else
echo "$1 does not cross the dateline"
fi

echo "Now warping $1 to $2"
gdalwarp "$1" "$2" -t_srs "$3" "${warp_options[@]}"
echo "Done warping $1 to $2"

echo "Now uploading $2 to $4"
aws s3 cp --no-progress "$2" "$4"
echo "Done uploading $2 to $4"

echo "Finally, deleting local files $1 and $2"
rm "$1" "$2"
echo "Done deleting local files $1 and $2"
5 changes: 5 additions & 0 deletions batch/scripts/get_arguments.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ do
shift # past argument
shift # past value
;;
--target_crs)
TARGET_CRS="$2"
shift # past argument
shift # past value
;;
--target_bucket)
TARGET_BUCKET="$2"
shift # past argument
Expand Down
2 changes: 1 addition & 1 deletion batch/scripts/resample.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -e
# -d | --dataset
# -v | --version
# -s | --source
# -r | --resampling_method)
# -r | --resampling_method
# --zoom_level
# -T | --target

Expand Down
Loading

0 comments on commit 3ab3b37

Please sign in to comment.