Skip to content

Commit

Permalink
Initial manual L30 reprocessing.
Browse files Browse the repository at this point in the history
  • Loading branch information
sharkinsspatial committed Dec 24, 2024
1 parent b70d15a commit 2971f74
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 20 deletions.
31 changes: 31 additions & 0 deletions lambda_functions/landsat_scene_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import boto3

s3_client = boto3.client("s3")
inputbucket = "usgs-landsat"
s3_basepath = "collection02/level-1/standard/oli-tirs"


def handler(event, context):
scene_ids = []
date = event["date"]
year = date[:4]
for pathrow in event["pathrows"]:
print(pathrow)
path = pathrow[:3]
row = pathrow[3:]
prefix = f"{s3_basepath}/{year}/{path}/{row}/"
list_result = s3_client.list_objects_v2(
Bucket=inputbucket, Prefix=prefix, RequestPayer="requester", Delimiter="/"
)
common_prefixes = list_result.get("CommonPrefixes")
if common_prefixes:
updated_key = [
prefix["Prefix"]
for prefix in common_prefixes
if prefix["Prefix"].split("_")[3] == date
]
if len(updated_key) > 0:
scene_id = updated_key[0].split("/")[-2]
print(scene_id)
scene_ids.append(scene_id)
return {"scenes": scene_ids, "prefix": prefix, "bucket": inputbucket}
8 changes: 8 additions & 0 deletions lambda_functions/setupdb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Lambda function for omnipotent setup and modification of HLS logging database"""

import json
import os

Expand Down Expand Up @@ -89,6 +90,13 @@ def execute_statement(sql, sql_parameters=[]):
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS succeeded BOOLEAN;
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS expected_error BOOLEAN;
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS unexpected_error BOOLEAN;
CREATE TABLE IF NOT EXISTS l30_reprocess_log (
id bigserial primary key,
ts timestamptz default now() not null,
date date,
mgrs varchar(5)
);
"""


Expand Down
67 changes: 48 additions & 19 deletions scripts/run_landsat_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
batch_client = boto3.client("batch")

ac_jobqueue = os.getenv("HLSSTACK_LANDSATACJOBQUEUEEXPORT")

tile_jobqueue = os.getenv("HLSSTACK_LANDSATTILEJOBQUEUEEXPORT")
tile_jobdefinition = os.getenv("HLSSTACK_LANDSATTILEJOBDEFINITION")
# tile_jobdefinition = os.getenv("HLSSTACK_LANDSATTILEJOBDEFINITION")
tile_jobdefinition = "arn:aws:batch:us-west-2:018923174646:job-definition/LandsatTaskBatchJob1274-673e2c2a2411740:6"
inputbucket = "usgs-landsat"
s3_basepath = "collection02/level-1/standard/oli-tirs"

Expand Down Expand Up @@ -50,8 +52,9 @@
ac_jobids = {}


def submit_ac_job(scene_id, path, row, year):
def submit_ac_job(scene_id, path, row, year, run_id):
prefix = f"{s3_basepath}/{year}/{path}/{row}/{scene_id}"
print(ac_jobdefinition)
response = batch_client.submit_job(
jobName=str(random.randint(1, 200)),
jobQueue=ac_jobqueue,
Expand All @@ -75,7 +78,13 @@ def submit_ac_job(scene_id, path, row, year):


def submit_tile_job(
valid_pathrows, mgrs_tile, mgrs_result, landsat_path, ac_job_dependencies, date
valid_pathrows,
mgrs_tile,
mgrs_result,
landsat_path,
ac_job_dependencies,
date,
run_id,
):
separated_date = f"{date[:4]}-{date[4:6]}-{date[6:8]}"
response = batch_client.submit_job(
Expand All @@ -89,6 +98,7 @@ def submit_tile_job(
"environment": [
{"name": "PATHROW_LIST", "value": ",".join(valid_pathrows)},
{"name": "OUTPUT_BUCKET", "value": f"hls-debug-output/{run_id}"},
{"name": "DEBUG_BUCKET", "value": f"hls-debug-output/{run_id}"},
{"name": "INPUT_BUCKET", "value": f"hls-debug-output/{run_id}"},
{"name": "DATE", "value": separated_date},
{"name": "MGRS", "value": mgrs_tile},
Expand All @@ -103,7 +113,7 @@ def submit_tile_job(
return response


def process_mgrs(mgrs_tile, date, ignore):
def process_mgrs(mgrs_tile, date, ignore, run_id):
print(mgrs_tile)
event = {"MGRS": mgrs_tile}
mgrs_result = handler.handler(event, {})
Expand Down Expand Up @@ -134,7 +144,9 @@ def process_mgrs(mgrs_tile, date, ignore):
valid_pathrows.append(pathrow)
date_pathrow = f"{date}_{pathrow}"
if date_pathrow not in ac_jobids:
batch_response = submit_ac_job(scene_id, path, row, year)
batch_response = submit_ac_job(
scene_id, path, row, year, run_id
)
ac_jobids[date_pathrow] = batch_response.get("jobId")
ac_job_dependencies.append(
{"jobId": ac_jobids[date_pathrow], "type": "N_TO_N"}
Expand All @@ -145,24 +157,41 @@ def process_mgrs(mgrs_tile, date, ignore):
print(valid_pathrows)
landsat_path = valid_pathrows[0][:3]
submit_tile_job(
valid_pathrows, mgrs_tile, mgrs_result, landsat_path, ac_job_dependencies, date
valid_pathrows,
mgrs_tile,
mgrs_result,
landsat_path,
ac_job_dependencies,
date,
run_id,
)


if os.path.isfile(granules):
scenes = open(granules, "r").read().splitlines()
for line in scenes:
components = line.split("_")
directory = "/Users/seanharkins/Downloads/missing_S30/missing/"
files = os.listdir(directory)

for file in files[35:40]:
print(file)
file_path = os.path.join(directory, file)

if os.path.isfile(file_path):
scenes = open(file_path, "r").read().splitlines()
for line in scenes:
components = line.split(",")
# path = components[0][0:3]
# row = components[0][3:6]
mgrs = components[0]
date = components[1]
print(mgrs, date)
# result = handler.handler({"path": path, "row": row}, {})
# process_mgrs(result["mgrs"][0], date)
id = file.split(".")[0]
process_mgrs(mgrs, date, ignore, f"hansen/{id}")

else:
components = granules.split("_")
path = components[2][0:3]
row = components[2][3:6]
date = components[3]
result = handler.handler({"path": path, "row": row}, {})
process_mgrs(result["mgrs"][0], date)

else:
components = granules.split("_")
path = components[2][0:3]
row = components[2][3:6]
date = components[3]
result = handler.handler({"path": path, "row": row}, {})
process_mgrs(result["mgrs"][0], date, ignore)
process_mgrs(result["mgrs"][0], date, ignore)
183 changes: 183 additions & 0 deletions stack/hlsconstructs/l30_reprocess_step_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import json
from typing import Union

from aws_cdk import aws_iam, aws_stepfunctions
from constructs import Construct
from hlsconstructs.batch_step_function import BatchStepFunction
from hlsconstructs.lambdafunc import Lambda
from hlsconstructs.state_machine_step_function import StateMachineStepFunction


class L30_Reprocess_StepFunction(BatchStepFunction, StateMachineStepFunction):
def __init__(
self,
scope: Construct,
id: str,
laads_available: Lambda,
intermediate_output_bucket: str,
ac_job_definition: str,
acjobqueue: str,
pr2mgrs: Lambda,
get_landsat_scenes: Lambda,
check_landsat_tiling_exit_code: Lambda,
check_landsat_ac_exit_code: Lambda,
get_random_wait: Lambda,
replace_existing: bool,
tile_job_definition: str,
tilejobqueue: str,
gibs_outputbucket: str,
debug_bucket: Union[bool, str] = False,
**kwargs,
) -> None:
super().__init__(scope, id, **kwargs)
retry = {
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 10,
"MaxAttempts": 3,
"BackoffRate": 2,
}

if replace_existing:
replace = "replace"
else:
replace = None

state_definition = {
"Comment": "L30 Reprocessing",
"StartAt": "GetPathrows",
"States": {
"GetPathrows": {
"Type": "Task",
"Resource": pr2mgrs.function.function_arn,
"ResultPath": "$.pathrows",
"Next": "CheckLaads",
"Retry": [retry],
},
"CheckLaads": {
"Type": "Task",
"Resource": laads_available.function.function_arn,
"Parameters": {"date.$": "$.date"},
"ResultPath": "$.laads_available",
"Next": "LaadsAvailable",
"Retry": [retry],
},
"LaadsAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.laads_available.available",
"BooleanEquals": True,
"Next": "GetRandomWait",
}
],
"Default": "Wait",
},
"Wait": {"Type": "Wait", "Seconds": 3600, "Next": "CheckLaads"},
"GetLandsatScenes": {
"Type": "Task",
"Resource": get_landsat_scenes.function.function_arn,
"Parameters": {"date.$": "pathrows.$"},
"ResultPath": "$.scenes",
"Next": "LaadsAvailable",
"Retry": [retry],
},
"ProcessLandsatScenes": {
"Type": "Map",
"ItemsPath": "$.scenes.scenes",
"Parameters": {
"scene.$": "$$.Map.Item.Value",
"path.$": "$.path",
"date.$": "$.date",
"bucket.$": "$.scenes.bucket",
"prefix.$": "$.scenes.prefix",
},
"MaxConcurrency": 0,
"Iterator": {
"StartAt": "RunLandsatAc",
"States": {
"RunLandsatAc": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"ResultPath": "$.jobinfo",
"Parameters": {
"JobName": "LandsatAcJob",
"JobQueue": acjobqueue,
"JobDefinition": ac_job_definition,
"ContainerOverrides": {
"Command": ["export && landsat.sh"],
"Environment": [
{
"Name": "INPUT_BUCKET",
"Value.$": "$.bucket",
},
{"Name": "PREFIX", "Value.$": "$.prefix"},
{"Name": "GRANULE", "Value.$": "$.scene"},
{
"Name": "OUTPUT_BUCKET",
"Value": intermediate_output_bucket,
},
{
"Name": "LASRC_AUX_DIR",
"Value": "/var/lasrc_aux",
},
{
"Name": "VIIRS_AUX_STARTING_DATE",
"Value": "20210101",
},
{
"Name": "REPLACE_EXISTING",
"Value": replace,
},
{"Name": "OMP_NUM_THREADS", "Value": "2"},
],
},
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "CheckAcExitCode",
"ResultPath": "$.jobinfo",
}
],
"Next": "CheckAcExitCode",
},
"CheckAcExitCode": {
"Type": "Task",
"Resource": check_landsat_ac_exit_code.function.function_arn,
"Next": "HadAcFailure",
},
"HadAcFailure": {
"Type": "Choice",
"Choices": [
{
"Variable": "$",
"BooleanEquals": True,
"Next": "Done",
},
{
"Variable": "$",
"BooleanEquals": False,
"Next": "Error",
},
],
"Default": "Done",
},
"Done": {"Type": "Succeed"},
"Error": {"Type": "Fail"},
}
},
},
},
},
}
if debug_bucket:
state_definition["States"]["RunLandsatAc"]["Parameters"][
"ContainerOverrides"
]["Environment"].append({"Name": "DEBUG_BUCKET", "Value": debug_bucket})
self.state_machine = aws_stepfunctions.CfnStateMachine(
self,
"LandsatStateMachine",
definition_string=json.dumps(state_definition),
role_arn=self.steps_role.role_arn,
)

self.add_lambdas_to_role(locals())
Loading

0 comments on commit 2971f74

Please sign in to comment.