Skip to content

Commit

Permalink
Merge pull request #292 from Vizzuality/feature/protected_seas_pipe
Browse files Browse the repository at this point in the history
Feature/protected seas pipe
  • Loading branch information
aagm authored Aug 21, 2024
2 parents 69b91fb + 527deca commit 67d6456
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 37 deletions.
1 change: 1 addition & 0 deletions data/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies:
- pandera=>0.18.0
- pandera-geopandas=>0.18.0
- libgdal-arrow-parquet=>0.1.0
- openpyxl=>3.1.0
- pip:
- jupyterlab-code-formatter==2.2.1
- jupyter_collaboration
105 changes: 68 additions & 37 deletions data/notebooks/pipes_mock/precalc.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
},
{
"cell_type": "code",
"execution_count": 70,
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -30,9 +30,9 @@
" sys.path.insert(0, scripts_dir.resolve().as_posix())\n",
"\n",
"from helpers.strapi import Strapi\n",
"from helpers.settings import get_settings\n",
"from helpers.settings import get_settings, Settings\n",
"from helpers.file_handler import FileConventionHandler\n",
"from helpers.utils import download_and_unzip_if_needed\n",
"from helpers.utils import download_and_unzip_if_needed, writeReadGCP\n",
"\n",
"from pipelines.output_schemas import (\n",
" FPLSchema,\n",
Expand Down Expand Up @@ -793,67 +793,103 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 15,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"DEBUG:google.auth.transport.requests:Making request: POST https://oauth2.googleapis.com/token\n"
]
}
],
"source": [
"pipe = \"protectedseas\"\n",
"strapi_collection = \"\"\n",
"\n",
"pipe_dir = FileConventionHandler(pipe)\n",
"input_file = pipe_dir.get_processed_step_path(prev_step).joinpath(\"protectedseas_stats.xlsx\")\n",
"output_file = pipe_dir.get_processed_step_path(current_step).joinpath(\"lfp.csv\")\n",
"\n",
"# Download the protected seas file && unzip it\n",
"download_and_unzip_if_needed(pipe_dir, prev_step, mysettings)\n",
"writeReadGCP(\n",
" credentials=mysettings.GCS_KEYFILE_JSON,\n",
" bucket_name=mysettings.GCS_BUCKET,\n",
" blob_name=\"vizzuality_processed_data/protectedseas/preprocess/protectedseas_stats.xlsx\",\n",
" file=input_file,\n",
" operation=\"r\",\n",
")\n",
"\n",
"# Load the data\n",
"protectedseas_intermediate = gpd.read_file(pipe_dir.get_step_fmt_file_path(prev_step, \"shp\")).pipe(\n",
" clean_geometries\n",
")"
"protectedseas_intermediate = pd.read_excel(input_file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 16,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/mambauser/src/pipelines/processors.py:685: FutureWarning: Downcasting behavior in `replace` is deprecated and will be removed in a future version. To retain the old behavior, explicitly call `result.infer_objects(copy=False)`. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`\n",
" df.replace(rep_d)\n"
]
}
],
"source": [
"final = (\n",
" protectedseas_intermediate.pipe(calculate_area)\n",
" .pipe(calculate_global_area, gby_col=[\"FPS_cat\"], iso_column=\"iso\")\n",
" .pipe(separate_parent_iso, iso_column=\"iso\")\n",
" .replace(\n",
" protectedseas_intermediate.replace(\n",
" {\n",
" \"iso\": {\n",
" \"iso_sov\": {\n",
" \"COK\": \"NZL\",\n",
" \"IOT\": \"GBR\",\n",
" \"NIU\": \"NZL\",\n",
" \"SHN\": \"GBR\",\n",
" \"SJM\": \"NOR\",\n",
" \"UMI\": \"USA\",\n",
" \"NCL\": \"FRA\",\n",
" }\n",
" },\n",
" \"lfp\": {\n",
" 5: \"highly\",\n",
" 4: \"highly\",\n",
" 3: \"moderately\",\n",
" 2: \"less\",\n",
" 1: \"less\",\n",
" },\n",
" }\n",
" )\n",
" .pipe(add_region_iso, iso_column=\"iso\")\n",
" .pipe(calculate_stats, gby_col=[\"FPS_cat\"], iso_column=\"location_i\")\n",
" .pipe(fix_monaco, iso_column=\"iso\", area_column=\"area_km2\")\n",
" .pipe(\n",
" calculate_global_area,\n",
" gby_col=[\"lfp\"],\n",
" iso_column=\"iso_sov\",\n",
" agg_ops={\"area_sqkm\": \"sum\", \"total_area\": \"max\"},\n",
" )\n",
" .pipe(add_region_iso, iso_column=\"iso_sov\")\n",
" .pipe(\n",
" calculate_stats,\n",
" gby_col=[\"lfp\"],\n",
" ops={\"area_sqkm\": \"sum\", \"total_area\": \"max\"},\n",
" iso_column=\"iso_sov\",\n",
" )\n",
" .pipe(lambda x: x.assign(pct=round(x.area_sqkm / x.total_area, 2)))\n",
" .pipe(\n",
" output,\n",
" iso_column=\"iso\",\n",
" iso_column=\"iso_sov\",\n",
" rep_d={\n",
" \"FPS_cat\": {\n",
" \"lfp\": {\n",
" \"highly\": 1,\n",
" \"moderately\": 2,\n",
" \"less\": 3,\n",
" }\n",
" },\n",
" rename={\"FPS_cat\": \"fishing_protection_level\", \"area_km2\": \"area\"},\n",
" drop_cols=[\"iso\"],\n",
" rename={\"lfp\": \"fishing_protection_level\", \"area_sqkm\": \"area\"},\n",
" drop_cols=[\"iso_sov\", \"total_area\"],\n",
" )\n",
")\n",
"\n",
"FPLSchema(final).to_csv(output_file, index=True)"
"FPLSchema(final[final.location.notna()]).to_csv(output_file, index=True)"
]
},
{
Expand Down Expand Up @@ -1028,16 +1064,11 @@
"# {\"slug\": init_table.iucn_cat.dropna().unique(), \"name\": init_table.iucn_cat.dropna().unique()},\n",
"# index=pd.Index(np.arange(1, len(init_table.iucn_cat.dropna().unique()) + 1)),\n",
"# )\n",
"# iucn_cat.to_csv(pipe_dir.get_processed_step_path(current_step).joinpath(\"iucn_categories.csv\"), index=True)\n"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"iucn_cat = pd.read_csv(pipe_dir.get_processed_step_path(current_step).joinpath(\"iucn_categories.csv\"), index_col=0)"
"# iucn_cat.to_csv(pipe_dir.get_processed_step_path(current_step).joinpath(\"iucn_categories.csv\"), index=True)\n",
"\n",
"iucn_cat = pd.read_csv(\n",
" pipe_dir.get_processed_step_path(current_step).joinpath(\"iucn_categories.csv\"), index_col=0\n",
")"
]
},
{
Expand Down Expand Up @@ -1241,7 +1272,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
"version": "3.12.5"
}
},
"nbformat": 4,
Expand Down
1 change: 1 addition & 0 deletions data/src/pipelines/output_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class FPLSchema(pa.DataFrameModel):
location: Series[int] = pa.Field(gt=0, coerce=True)
fishing_protection_level: Series[int] = pa.Field(ge=0, coerce=True)
area: Series[float] = pa.Field(ge=0, coerce=True)
pct: Series[float] = pa.Field(ge=0, coerce=True)


class HabitatsSchema(pa.DataFrameModel):
Expand Down
Empty file.
152 changes: 152 additions & 0 deletions data/src/pipelines/pipes/eez_intermediate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import pandas as pd
import shutil

from data.src.helpers.utils import downloadFile, rm_tree
from data.src.pipelines.pipes.utils import define_paths_intermediate


def eez_intermediate(force_clean: bool = True) -> None:
# Pipe params
step = "intermediate"
pipe = "eez"
# Data sources
## EEZ
EEZ_url = "https://www.marineregions.org/download_file.php"
EEZ_file_name = "eez_v11.shp"
EEZ_params = {"name": "World_EEZ_v11_20191118.zip"}
EEZ_headers = {
"content-type": "application/x-www-form-urlencoded",
"cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off",
"dnt": "1",
"origin": "https://www.marineregions.org",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
}

EEZ_body = {
"name": "Jason",
"organisation": "skytruth",
"email": "[email protected]",
"country": "Spain",
"user_category": "academia",
"purpose_category": "Conservation",
"agree": "1",
}

## High seas
hs_url = "https://www.marineregions.org/download_file.php"
hs_file_name = "High_seas_v1.shp"
hs_params = {"name": "World_High_Seas_v1_20200826.zip"}
hs_headers = {
"content-type": "application/x-www-form-urlencoded",
"cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off",
"dnt": "1",
"origin": "https://www.marineregions.org",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
}
hs_body = {
"name": "Jason",
"organisation": "skytruth",
"email": "[email protected]",
"country": "Spain",
"user_category": "academia",
"purpose_category": "Conservation",
"agree": "1",
}
input_path, temp_working_path, output_path, output_file, zipped_output_file, remote_path = (
define_paths_intermediate(pipe, step)
)

# Extract data
## download files EEZ & High seas
downloadFile(
EEZ_url,
input_path,
EEZ_body,
EEZ_params,
EEZ_headers,
overwrite=force_clean,
)
downloadFile(hs_url, input_path, hs_body, hs_params, hs_headers, overwrite=force_clean)
## unzip file if needed & load data
unziped_folders = []
for idx, path in enumerate(input_path.glob("*.zip")):
unziped_folder = temp_working_path.joinpath(path.stem)
print(unziped_folder)

if unziped_folder.exists() and force_clean:
rm_tree(unziped_folder)

shutil.unpack_archive(path, unziped_folder.parent if idx == 0 else unziped_folder)

files = [
gpd.read_file(file)
for file in unziped_folder.rglob("*.shp")
if "boundaries" not in file.stem
]
unziped_folders.append(pd.concat(files))

# Transform data
## set the same structure for both datasets updating the high seas one
unziped_folders[1] = (
unziped_folders[1]
.rename(
columns={"name": "GEONAME", "area_km2": "AREA_KM2", "mrgid": "MRGID"},
)
.assign(
POL_TYPE="High Seas",
ISO_SOV1="ABNJ",
)
)

# merge datasets
df = pd.concat(unziped_folders, ignore_index=True)

df.drop(
columns=list(
set(df.columns)
- set(
[
"MRGID",
"GEONAME",
"POL_TYPE",
"ISO_SOV1",
"ISO_SOV2",
"ISO_SOV3",
"AREA_KM2",
"geometry",
]
)
),
inplace=True,
)

# save data
gpd.GeoDataFrame(
df,
crs=unziped_folders[0].crs,
).to_file(filename=output_file.as_posix(), driver="ESRI Shapefile")

# zip data
make_archive(output_path, zipped_output_file)

# clean unzipped files
rm_tree(temp_working_path) if temp_working_path.exists() else None
rm_tree(output_path) if output_path.exists() else None

# LOAD
## load zipped file to GCS
writeReadGCP(
credentials=mysettings.GCS_KEYFILE_JSON,
bucket_name=mysettings.GCS_BUCKET,
blob_name=remote_path,
file=zipped_output_file,
operation="w",
)
13 changes: 13 additions & 0 deletions data/src/pipelines/pipes/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from data.src.helpers.file_handler import FileConventionHandler


def define_paths_intermediate(pipe: str, step: str):
working_folder = FileConventionHandler(pipe)
input_path = working_folder.pipe_raw_path
temp_working_path = working_folder.get_temp_file_path(step)

output_path = working_folder.get_processed_step_path(step)
output_file = working_folder.get_step_fmt_file_path(step, "shp")
zipped_output_file = working_folder.get_step_fmt_file_path(step, "zip", True)
remote_path = working_folder.get_remote_path(step)
return input_path, temp_working_path, output_path, output_file, zipped_output_file, remote_path

0 comments on commit 67d6456

Please sign in to comment.