Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cif 303 #102

Merged
merged 12 commits into from
Jan 10, 2025
203 changes: 203 additions & 0 deletions scripts/calculate_indicators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from datetime import datetime
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool

import geopandas as gpd
import pandas as pd
import requests
from cartoframes import read_carto, to_carto
from cartoframes.auth import set_default_credentials

from city_metrix.metrics import *

# os.getcwd()
# os.chdir('..')
# print(os.getcwd())


# os.environ["GCS_BUCKET"] = "gee-exports"
# os.environ["GOOGLE_APPLICATION_USER"] = (
# "[email protected]"
# )
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = (
# "/home/arsh/wri/code/cities-cif/citiesindicators-fe8fd6514c70.json"
# )


# base_url = "http://127.0.0.1:8000"
base_url = "https://fotomei.com"
city_id_list = list(set(["ARG-Buenos_Aires"]))
indicator_overwrite_tuple_list = [
("ACC_2_percentOpenSpaceinBuiltup2022", False),
] # (indicator, should replace)
indicator_list = list(set([i[0] for i in indicator_overwrite_tuple_list]))


set_default_credentials(
username="", # use your carto username for uploading
base_url="https://wri-cities.carto.com/",
api_key="", # use your carto aki key for uploading
)
from cartoframes.io.managers.context_manager import ContextManager

a = ContextManager(None)
sql_client = a.sql_client


def get_from_carto(city_id_list, indicator_list):
c = ", ".join([f"'{y}'" for y in city_id_list])
i = ", ".join([f"'{z}'" for z in indicator_list])
cities = f"({c})"
indicators = f"({i})"
return read_carto(
f"SELECT * from indicators_dev where geo_parent_name in {cities} AND indicator in {indicators}"
)


indicators = get_from_carto(city_id_list, indicator_list)


def get_geojson_df(geojson: str):
gdf = gpd.GeoDataFrame.from_features(geojson)
gdf.set_crs(epsg=4326, inplace=True)
return gdf


def get_city_boundary(city_id: str, admin_level: str):
city_boundary = requests.get(f"{base_url}/cities/{city_id}/{admin_level}/geojson")
if city_boundary.status_code in range(200, 206):
return city_boundary.json()
raise Exception("City boundary not found")


def get_city(city_id: str):
city = requests.get(f"{base_url}/cities/{city_id}")
if city.status_code in range(200, 206):
return city.json()
raise Exception("City not found")


def check_indicator_exists(city_id: str, indicator_id: str):
indicator = requests.get(f"{base_url}/indicators/{indicator_id}")
if indicator.status_code in range(200, 206):
return True
raise Exception("Indicator not found")


def get_indicator_metric(indicator_id: str):
indicator = requests.get(f"{base_url}/indicators/metadata/{indicator_id}")
if indicator.status_code in range(200, 206):
data = indicator.json()
return data.get("cif_metric_name")
raise Exception("Indicator not found")


def write_to_carto(result):
return to_carto(pd.DataFrame(result), "indicators_dev", if_exists="append")


def update_carto_data(cartodb_id, value):
date = datetime.today().strftime("%Y-%m-%d")
return sql_client.send(
f"UPDATE indicators_dev SET value={value}, creation_date='{date}' where cartodb_id={cartodb_id}"
)


errors = []


def process(city_id: str, indicator_id: str):
city = get_city(city_id)
indicator_exists = check_indicator_exists(city_id, indicator_id)
data_list = []
if indicator_exists:
metric = get_indicator_metric(indicator_id)
admin_level = city.get("admin_levels", None)
for a in admin_level:
try:
city_boundary = get_city_boundary(city_id, a)
df = get_geojson_df(city_boundary).reset_index()
output_df = eval(f"{metric}(df)")
new_df = df.join(output_df)
for _, row in new_df.iterrows():
data_dict = {
"geo_id": row["geo_id"],
"geo_level": row["geo_level"],
"geo_name": row["geo_name"],
"geo_parent_name": row["geo_parent_name"],
"value": row["count"],
"indicator": indicator_id,
"indicator_version": 0,
"creation_date": datetime.today().strftime("%Y-%m-%d"),
}
data_list.append(data_dict)
except Exception as e:
errors.append(f"{city_id}|{indicator_id}|{a} : {e}")
return data_list


def process_indicator(city_id, indicator):
create_list = []
update_list = []

indicator_name = indicator[0]
overwrite = indicator[1]
carto_indicator_list = indicators[
(indicators["indicator"] == indicator_name)
& (indicators["geo_parent_name"] == city_id)
]
data = process(city_id, indicator_name)
for d in data:
_indicator = carto_indicator_list[
(carto_indicator_list["geo_id"] == d["geo_id"])
]
if _indicator.empty:
create_list.append(d)
else:
if overwrite:
cartodb_id = _indicator["cartodb_id"].max()
update_list.append(
{
"cartodb_id": cartodb_id,
"value": d["value"],
}
)
pass
else:
max_version = _indicator["indicator_version"].max()
d["indicator_version"] = max_version + 1
create_list.append(d)
return create_list, update_list


def calculate_indicator_for_city(city_id):
create_list, update_list = [], []
pool = ThreadPool(5)
results = []
for j in indicator_overwrite_tuple_list:
results.append(pool.apply_async(process_indicator, (city_id, j)))

pool.close()
pool.join()
for r in results:
output = r.get()
create_list.extend(output[0])
update_list.extend(output[1])
return create_list, update_list


if __name__ == "__main__":
create_list = []
update_list = []

with Pool(5) as p:
output = p.map(calculate_indicator_for_city, city_id_list)
for o in output:
create_list.extend(o[0])
update_list.extend(o[1])
print("Create List: ", create_list)
print("Update List: ", update_list)
if create_list:
write_to_carto(create_list)
for u in update_list:
update_carto_data(u["cartodb_id"], u["value"])
172 changes: 172 additions & 0 deletions scripts/calculate_layers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import os
from datetime import datetime
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import Literal

import boto3
import botocore
import geopandas as gpd
import requests
from dotenv import load_dotenv

from city_metrix.layers import *
from city_metrix.layers.open_street_map import OpenStreetMapClass

load_dotenv()


# Parameters

osm_layer = OpenStreetMapClass.OPEN_SPACE # Required for open_space layer


city_id_list = list(set(["ARG-Buenos_Aires"]))

layer_overwrite_list = {"esa_world_cover_2020": [2020, 2021], "open_space": []}

############ for widnows user change / to \
# local_storage_path = f"{os.getcwd()}\cities-cif\scripts\output"

local_storage_path = "scripts/output"
should_push_to_s3 = True

# Code

# base_url = "http://127.0.0.1:8000"
base_url = "https://fotomei.com"
se_layer = ["albedo", "land_surface_temperature"]
year_layer = ["esa_world_cover", "esa_world_cover_2020", "world_pop", "ndvi"]

layer_list = list(set([i[0] for i in layer_overwrite_list]))

aws_bucket = os.getenv("AWS_BUCKET")

s3_client = boto3.client(
"s3",
aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)


def get_geojson_df_bounds(geojson: str):
gdf = gpd.GeoDataFrame.from_features(geojson)
gdf.set_crs(epsg=4326, inplace=True)
return gdf.total_bounds


# Will implement if requied.
def check_if_layer_exist_on_s3(file_path: str):
try:
s3.Object(aws_bucket, file_path).load()
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
else:
raise
else:
return True


def get_city_boundary(city_id: str, admin_level: str):
city_boundary = requests.get(f"{base_url}/cities/{city_id}/{admin_level}/geojson")
if city_boundary.status_code in range(200, 206):
return city_boundary.json()
raise Exception("City boundary not found")


def get_city(city_id: str):
city = requests.get(f"{base_url}/cities/{city_id}")
if city.status_code in range(200, 206):
return city.json()
raise Exception("City not found")


def get_layer(city_id: str, layer_id: str) -> Literal[True]:
layer = requests.get(f"{base_url}/layers/{layer_id}/{city_id}")
if layer.status_code in range(200, 206):
return layer.json()
raise Exception("Indicator not found")


def export_data(data: object, city_id: str, layer_id: str, file_format: str, year: str):
file_name = f"{city_id}__{layer_id}__{year}.{file_format}"
local_path = os.path.join(local_storage_path, file_name)
(
data.rio.to_raster(raster_path=local_path, driver="COG")
if file_format == "tif"
else data.to_file(local_path, driver="GeoJSON")
)
if should_push_to_s3:
s3_path = f"cid/dev/{city_id}/{file_format}/{file_name}"
s3_client.upload_file(
local_path, aws_bucket, s3_path, ExtraArgs={"ACL": "public-read"}
)


errors = []


def process_layer(city_id, layer_id, year):
print(f"Starting processing for {layer_id} | {city_id} ..... ")
city = get_city(city_id)
layer = get_layer(city_id, layer_id)
script = layer["class_name"]
file_type = layer["file_type"]
admin_level = city.get("city_admin_level", None)
if layer and script and admin_level and file_type:
file = Path(script).stem
try:
city_boundary = get_city_boundary(city_id, admin_level)
bbox = get_geojson_df_bounds(city_boundary)
params = ""
if layer_id in year_layer:
params = f"year={year}"
elif layer_id in se_layer:
params = f"start_date='{year}-01-01', end_date = '{year}-12-31'"
elif layer_id == "open_space":
params = f"osm_class={osm_layer}"
output = eval(f"{file}({params}).get_data(bbox)")
export_data(
data=output,
city_id=city_id,
layer_id=layer_id,
file_format=file_type,
year=year,
)
except Exception as e:
errors.append(f"{city_id}|{layer_id}|{e}")
print(f"Processing completed for {layer_id} | {city_id} ..... ")


def calculate_layer_for_city(city_id):
print(f"************ Processing layers for {city_id} **************")
pool = ThreadPool(5)
results = []
if not local_storage_path:
raise Exception("Please specify the local path.")
if not os.path.exists(local_storage_path):
os.makedirs(local_storage_path)
for l, year_list in layer_overwrite_list.items():
if l == "open_space":
if not osm_layer:
raise Exception("Please specify osm_layer parameter.")
curr_year = datetime.now().year
results.append(pool.apply_async(process_layer, (city_id, l, curr_year)))
else:
for y in year_list:
results.append(pool.apply_async(process_layer, (city_id, l, y)))

pool.close()
pool.join()
for r in results:
output = r.get()


if __name__ == "__main__":

with Pool(5) as p:
output = p.map(calculate_layer_for_city, city_id_list)
print(errors)
4 changes: 4 additions & 0 deletions scripts/env.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
AWS_BUCKET = "wri-cities-data-api"
AWS_ACCESS_KEY_ID = ""
AWS_SECRET_ACCESS_KEY = ""
AWS_SESSION_TOKEN = ""
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from setuptools import setup, find_packages
from setuptools import find_packages, setup

setup(
name="city_metrix",
Expand Down Expand Up @@ -27,6 +27,6 @@
"cdsapi",
"timezonefinder",
"scikit-image>=0.24.0",
"exactextract>=0.2.0"
"exactextract>=0.2.0",
],
)
Loading