Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

923 ej classification processing and ingest scripts #936

Merged
merged 10 commits into from
Aug 16, 2024
98 changes: 98 additions & 0 deletions scripts/ej/cmr_to_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
the ej_dump is generated by running create_ej_dump.py and is scp'd to the COSMOS server
this script is then run via the dm shell on the COSMOS server to populate the database
"""

import json

from environmental_justice.models import EnvironmentalJusticeRow

import urllib.parse


def generate_source_link(doi_field):
authority = doi_field.get("Authority")
doi = doi_field.get("DOI")
if authority and doi:
return urllib.parse.urljoin(authority, doi)
return ""


def concept_id_to_sinequa_id(concept_id: str) -> str:
return f"/SDE/CMR_API/|{concept_id}"


def sinequa_id_to_url(sinequa_id: str) -> str:
base_url = "https://sciencediscoveryengine.nasa.gov/app/nasa-sba-smd/#/preview"
query = '{"name":"query-smd-primary","scope":"All","text":""}'

encoded_id = urllib.parse.quote(sinequa_id, safe="")
encoded_query = urllib.parse.quote(query, safe="")

return f"{base_url}?id={encoded_id}&query={encoded_query}"


def categorize_processing_level(level):

advanced_analysis_levels = {"0", "Level 0", "NA", "Not Provided", "Not provided"}

basic_analysis_levels = {
"1",
"1A",
"1B",
"1C",
"1T",
"2",
"2A",
"2B",
"2G",
"2P",
"Level 1",
"Level 1A",
"Level 1B",
"Level 1C",
"Level 2",
"Level 2A",
"Level 2B",
}

exploration_levels = {"3", "4", "Level 3", "Level 4", "L2"}

if level in exploration_levels:
return "exploration"
elif level in basic_analysis_levels:
return "basic analysis"
elif level in advanced_analysis_levels:
return "advanced analysis"
else:
return "advanced analysis"


# remove existing data
EnvironmentalJusticeRow.objects.filter(destination_server=EnvironmentalJusticeRow.DestinationServerChoices.DEV).delete()

ej_dump = json.load(open("backups/ej_dump_20240815_112916.json"))
for dataset in ej_dump:
ej_row = EnvironmentalJusticeRow(
destination_server=EnvironmentalJusticeRow.DestinationServerChoices.DEV,
sde_link=sinequa_id_to_url(concept_id_to_sinequa_id(dataset.get("meta", {}).get("concept-id", ""))),
dataset=dataset.get("umm", {}).get("ShortName", ""),
description=dataset.get("umm", {}).get("Abstract", ""),
limitations=dataset.get("umm", {}).get("AccessConstraints", {}).get("Description", ""),
format=dataset.get("meta", {}).get("format", ""),
temporal_extent=", ".join(dataset.get("umm", {}).get("TemporalExtents", [{}])[0].get("SingleDateTimes", [])),
intended_use=categorize_processing_level(
dataset.get("umm", {}).get("ProcessingLevel", {}).get("Id", "advanced analysis")
),
source_link=generate_source_link(dataset.get("umm", {}).get("DOI", {})),
indicators=dataset["indicators"],
geographic_coverage="", # Not provided in the data
data_visualization="", # dataset.get("umm", {}).get("RelatedUrls", [{}])[0].get("URL", ""),
latency="", # Not provided in the data
spatial_resolution="", # Not provided in the data
temporal_resolution="", # Not provided in the data
description_simplified="", # Not provided in the data
project="", # Not provided in the data
strengths="", # Not provided in the data
)
ej_row.save()
100 changes: 100 additions & 0 deletions scripts/ej/create_ej_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
inferences are supplied by the classification model. the contact point is Bishwas
cmr is supplied by running https://github.com/NASA-IMPACT/llm-app-EJ-classifier/blob/develop/scripts/data_processing/download_cmr.py
move to the serve like this: scp ej_dump_20240814_143036.json sde:/home/ec2-user/sde_indexing_helper/backups/
"""

import json
from datetime import datetime


def load_json_file(file_path: str) -> dict:
with open(file_path, "r") as file:
return json.load(file)


def save_to_json(data: dict | list, file_path: str) -> None:
with open(file_path, "w") as file:
json.dump(data, file, indent=2)


def process_classifications(predictions: list[dict[str, float]], threshold: float = 0.5) -> list[str]:
"""
Process the predictions and classify as follows:
1. If 'Not EJ' is the highest scoring prediction, return 'Not EJ' as the only classification
2. Filter classifications based on the threshold, excluding 'Not EJ'
3. Default to 'Not EJ' if no classifications meet the threshold
"""
highest_prediction = max(predictions, key=lambda x: x["score"])

if highest_prediction["label"] == "Not EJ":
return ["Not EJ"]

classifications = [
pred["label"] for pred in predictions if pred["score"] >= threshold and pred["label"] != "Not EJ"
]

return classifications if classifications else ["Not EJ"]


def create_cmr_dict(cmr_data: list[dict[str, dict[str, str]]]) -> dict[str, dict[str, dict[str, str]]]:
"""Restructure CMR data into a dictionary with 'concept-id' as the key."""
return {dataset["meta"]["concept-id"]: dataset for dataset in cmr_data}


def remove_unauthorized_classifications(classifications: list[str]) -> list[str]:
"""Filter classifications to keep only those in the authorized list."""

authorized_classifications = [
"Climate Change",
"Disasters",
"Extreme Heat",
"Food Availability",
"Health & Air Quality",
"Human Dimensions",
"Urban Flooding",
"Water Availability",
]

return [cls for cls in classifications if cls in authorized_classifications]


def update_cmr_with_classifications(
inferences: list[dict[str, dict]],
cmr_dict: dict[str, dict[str, dict]],
threshold: float = 0.5,
) -> list[dict[str, dict]]:
"""Update CMR data with valid classifications based on inferences."""

predicted_cmr = []

for inference in inferences:
classifications = process_classifications(predictions=inference["predictions"], threshold=threshold)
classifications = remove_unauthorized_classifications(classifications)

if classifications:
cmr_dataset = cmr_dict.get(inference["concept-id"])

if cmr_dataset:
cmr_dataset["indicators"] = ";".join(classifications)
predicted_cmr.append(cmr_dataset)

return predicted_cmr


def main():
inferences = load_json_file("cmr-inference.json")
cmr = load_json_file("cmr_collections_umm_20240807_142146.json")

cmr_dict = create_cmr_dict(cmr)

predicted_cmr = update_cmr_with_classifications(inferences=inferences, cmr_dict=cmr_dict, threshold=0.8)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"ej_dump_{timestamp}.json"

save_to_json(predicted_cmr, file_name)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions scripts/ej/thresholding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from create_ej_dump import load_json_file, create_cmr_dict, update_cmr_with_classifications

inferences = load_json_file("cmr-inference.json")
cmr = load_json_file("cmr_collections_umm_20240807_142146.json")

cmr_dict = create_cmr_dict(cmr)

for threshold in [0.5, 0.6, 0.7, 0.8, 0.9]:
predicted_cmr = update_cmr_with_classifications(inferences=inferences, cmr_dict=cmr_dict, threshold=threshold)
print(f"Threshold: {int(threshold*100)}%, EJ datasets: {len(predicted_cmr)}")