Skip to content

Commit

Permalink
Merge pull request #129 from CIROH-UA/implement_weights_parquet
Browse files Browse the repository at this point in the history
Implement weights parquet
  • Loading branch information
JordanLaserGit authored Oct 15, 2024
2 parents 0dea3a0 + b905c17 commit 201a180
Show file tree
Hide file tree
Showing 30 changed files with 686 additions and 382 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_test_docker_arm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ jobs:
uses: hashicorp/setup-terraform@v1
- name: Configure AWS
run: |
aws configure set aws_access_key_id ${{ secrets.aws_access_key_id }}
aws configure set aws_secret_access_key ${{ secrets.aws_secret_access_key }}
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set region us-east-1
- name: Build AWS Infra
run: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/build_test_docker_x86.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ jobs:

- name: Configure AWS
run: |
aws configure set aws_access_key_id ${{ secrets.aws_access_key_id }}
aws configure set aws_secret_access_key ${{ secrets.aws_secret_access_key }}
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set region us-east-1
- name: Install packages for datastream
Expand All @@ -43,9 +43,9 @@ jobs:
- name: Build docker containers
run : |
./scripts/docker_builds.sh -b
./scripts/docker_builds.sh -f -d
- name: Test docker containers
run : |
curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg
./scripts/stream.sh -s 202006200100 -e 202006210000 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json -n 4
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
8 changes: 4 additions & 4 deletions .github/workflows/build_test_push_docker_x86.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:

- name: Configure AWS
run: |
aws configure set aws_access_key_id ${{ secrets.aws_access_key_id }}
aws configure set aws_secret_access_key ${{ secrets.aws_secret_access_key }}
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set region us-east-1
- name: Install packages for datastream
Expand All @@ -37,12 +37,12 @@ jobs:
- name: Build docker containers
run : |
./scripts/docker_builds.sh -b
./scripts/docker_builds.sh -f -d
- name: Test docker containers
run : |
curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg
./scripts/stream.sh -s 202006200100 -e 202006210000 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json -n 4
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json -n 4
- name: Login to Docker Hub
uses: docker/login-action@v3
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/forcingprocessor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: "3.9"

- name: Configure AWS
run: |
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set region us-east-1
- name: Install dependencies
run: |
Expand All @@ -37,7 +43,9 @@ jobs:
- name: Test with pytest
run: |
cd forcingprocessor
python -m pytest -vv --deselect="tests/test_forcingprocessor.py::test_google_cloud_storage" --deselect="tests/test_forcingprocessor.py::test_gcs" --deselect="tests/test_forcingprocessor.py::test_gs" --deselect="tests/test_forcingprocessor.py::test_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_nomads_post_processed" --deselect="tests/test_forcingprocessor.py::test_retro_ciroh_zarr"
python -m pytest -vv --deselect="tests/test_forcingprocessor.py::test_google_cloud_storage" --deselect="tests/test_forcingprocessor.py::test_gcs" --deselect="tests/test_forcingprocessor.py::test_gs" --deselect="tests/test_forcingprocessor.py::test_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_nomads_post_processed" --deselect="tests/test_forcingprocessor.py::test_retro_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_noaa_nwm_pds_https_analysis_assim" --deselect="tests/test_hf2ds.py" --deselect="tests/test_plotter.py"
python -m pytest -vv tests/test_hf2ds.py
python -m pytest -vv tests/test_plotter.py
python -m pytest -vv -k test_google_cloud_storage
python -m pytest -vv -k test_gs
python -m pytest -vv -k test_gcs
8 changes: 4 additions & 4 deletions .github/workflows/research_datastream_terraform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ jobs:

- name: Configure AWS
run: |
aws configure set aws_access_key_id ${{ secrets.aws_access_key_id }}
aws configure set aws_secret_access_key ${{ secrets.aws_secret_access_key }}
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set region us-east-1
- name: Validate Terraform
run: |
cd research_datastream/terraform
terraform init
terraform validate
./test/import_resources.sh ./test/variables_gitactions.tfvars
../scripts/import_resources.sh ./test/variables_gitactions.tfvars
terraform apply -var-file=./test/variables_gitactions.tfvars -auto-approve
sleep 60
- name: Test execution
run: |
cd research_datastream/terraform
if ! aws ec2 describe-key-pairs --key-names "actions_key" --query 'KeyPairs[0].KeyName' --output text 2>/dev/null; then aws ec2 create-key-pair --key-name "actions_key" --query 'KeyName' --output text && echo "Key pair 'actions_key' created in AWS"; else echo "Key pair 'actions_key' already exists"; fi
execution_arn=$(aws stepfunctions start-execution --state-machine-arn $(cat ./sm_ARN.txt) --name docker_builder_$(env TZ=US/Eastern date +'%Y%m%d%H%M%S') --input "file://test/execution_gp_test.json" --region us-east-1 --query 'executionArn' --output text); echo "Execution ARN: $execution_arn"; status="RUNNING"; while [ "$status" != "SUCCEEDED" ]; do status=$(aws stepfunctions describe-execution --execution-arn "$execution_arn" --region us-east-1 --query 'status' --output text); echo "Current status: $status"; if [ "$status" == "FAILED" ]; then echo "State machine execution failed!"; exit 1; fi; sleep 5; done; echo "State machine execution succeeded!"
execution_arn=$(aws stepfunctions start-execution --state-machine-arn $(cat ./sm_ARN.txt) --name gp_test_$(env TZ=US/Eastern date +'%Y%m%d%H%M%S') --input "file://test/execution_gp_test.json" --region us-east-1 --query 'executionArn' --output text); echo "Execution ARN: $execution_arn"; status="RUNNING"; while [ "$status" != "SUCCEEDED" ]; do status=$(aws stepfunctions describe-execution --execution-arn "$execution_arn" --region us-east-1 --query 'status' --output text); echo "Current status: $status"; if [ "$status" == "FAILED" ]; then echo "State machine execution failed!"; exit 1; fi; sleep 5; done; echo "State machine execution succeeded!"
- name: Tear down infra
if: always()
Expand Down
69 changes: 50 additions & 19 deletions .github/workflows/test_datastream_options.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,28 @@ jobs:

- name: Configure AWS
run: |
aws configure set aws_access_key_id ${{ secrets.aws_access_key_id }}
aws configure set aws_secret_access_key ${{ secrets.aws_secret_access_key }}
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set region us-east-1
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
echo $AWS_ACCESS_KEY_ID
echo $AWS_SECRET_ACCESS_KEY
- name: Build docker containers
run : |
./scripts/docker_builds.sh -b
./scripts/docker_builds.sh -f -d
- name: Install packages for datastream
run: |
curl -L -O https://github.com/lynker-spatial/hfsubsetCLI/releases/download/v1.1.0/hfsubset-v1.1.0-linux_amd64.tar.gz && tar -xzvf hfsubset-v1.1.0-linux_amd64.tar.gz && sudo mv ./hfsubset /usr/bin/hfsubset && sudo apt-get install git pip pigz awscli python3.9 -y
- name: Get geopackage from hfsubset
run: |
hfsubset -w medium_range -s nextgen -v 2.1.1 -l divides,flowlines,network,nexus,forcing-weights,flowpath-attributes,model-attributes -o palisade.gpkg -t hl "Gages-09106150"
# HFSUBSET IS BROKEN
# hfsubset -w medium_range -s nextgen -v 2.1.1 -l divides,flowlines,network,nexus,forcing-weights,flowpath-attributes,model-attributes -o palisade.gpkg -t hl "Gages-09106150"
curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg
- name: Base test and NWM_RETRO_V3
run: |
Expand Down Expand Up @@ -73,46 +80,70 @@ jobs:
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s 201906200100 -e 201906200200 -C NWM_RETRO_V2 -d $(pwd)/data/datastream_test
- name: Forcings sources option test NWM_RETRO_V3
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s 201906200100 -e 201906200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test
- name: Forcings sources option test NWM_OPERATIONAL_V3
- name: Forcings sources option test NWM_V3
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
TODAY=$(env TZ=US/Eastern date +'%Y%m%d')
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s $TODAY"0100" -e $TODAY"0200" -C NWM_OPERATIONAL_V3 -d $(pwd)/data/datastream_test
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s $TODAY"0100" -e $TODAY"0200" -C NWM_V3 -d $(pwd)/data/datastream_test
- name: Forcings sources option test NOMADS_OPERATIONAL
- name: Forcings sources option test NOMADS
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
TODAY=$(env TZ=US/Eastern date +'%Y%m%d')
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s $TODAY"0100" -e $TODAY"0200" -C NOMADS_OPERATIONAL -d $(pwd)/data/datastream_test
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s $TODAY"0100" -e $TODAY"0200" -C NOMADS -d $(pwd)/data/datastream_test
- name: Test hfsubset options
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -I "Gages-09106150" -i hl -v 2.1.1 -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
# - name: Test hfsubset options
# if: always()
# run: |
# sudo rm -rf $(pwd)/data/datastream_test
# ./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -I "Gages-09106150" -i hl -v 2.1.1 -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json

- name: S3 write out test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json -S ngen-datastream -o git_actions_test/
aws s3api wait object-exists --bucket ngen-datastream --key git_actions_test/ngen-run.tar.gz
aws s3api delete-object --bucket ngen-datastream --key git_actions_test/ngen-run.tar.gz
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json -S ciroh-community-ngen-datastream -o git_actions_test
aws s3api wait object-exists --bucket ciroh-community-ngen-datastream --key git_actions_test/ngen-run.tar.gz
aws s3 rm s3://ciroh-community-ngen-datastream/git_actions_test --recursive
- name: DAILY today test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s DAILY -C NWM_OPERATIONAL_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
./scripts/stream.sh -s DAILY -C NWM_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: DAILY pick day test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s DAILY -e 202006200000 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
./scripts/stream.sh -s DAILY -C NWM_V3 -e 202410030000 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: DAILY short_range today test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s DAILY -C NWM_SHORT_RANGE -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: DAILY medium_range today test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s DAILY -C NWM_MEDIUM_RANGE -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
# - name: DAILY analysis assim extend today test
# if: always()
# run: |
# sudo rm -rf $(pwd)/data/datastream_test
# ./scripts/stream.sh -s DAILY -C NWM_ANALYSIS_ASSIM_EXTEND -e $(date -d '-2 day' '+%Y%m%d0000') -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json


2 changes: 1 addition & 1 deletion docs/DATASTREAM_OPTIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ or run with cli args
|---------------------|------|--------------------|------|
| START_DATE | `-s` |Start simulation time (YYYYMMDDHHMM) or "DAILY" | :white_check_mark: |
| END_DATE | `-e` |End simulation time (YYYYMMDDHHMM) | :white_check_mark: |
| FORCING_SOURCE | `-C` |Select the forcings data provider. Options include NWM_RETRO_V2, NWM_RETRO_V3, NWM_OPERATIONAL_V3, NOMADS_OPERATIONAL| :white_check_mark: |
| FORCING_SOURCE | `-C` |Select the forcings data provider. Options include NWM_RETRO_V2, NWM_RETRO_V3, NWM_V3, NWM_V3_SHORT_RANGE, NWM_V3_MEDIUM_RANGE, NWM_V3_ANALYSIS_ASSIM, NWM_V3_ANALYSIS_ASSIM_EXTEND, NOMADS, NOMADS_POSTPROCESSED| :white_check_mark: |
| DATA_DIR | `-d` |Absolute local path to construct the datastream run. | :white_check_mark: |
| REALIZATION | `-R` |Path to NextGen realization file | Required here or file exists in `RESOURCE_DIR/config` |
| GEOPACKAGE | `-g` | Path to hydrofabric, can be s3URI, URL, or local file. Generate file with [hfsubset](https://github.com/lynker-spatial/hfsubsetCLI) or use SUBSET args. | Required here or file exists in `RESOURCE_DIR/config` |
Expand Down
4 changes: 2 additions & 2 deletions forcingprocessor/src/forcingprocessor/plot_forcings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import geopandas as gpd
from pathlib import Path
from datetime import datetime
from forcingprocessor.weights_hf2ds import gpkgs2weightsjson
from forcingprocessor.weights_hf2ds import hf2ds
from forcingprocessor.utils import get_window, nwm_variables, ngen_variables
from forcingprocessor.utils import nwm_variables
plt.style.use('dark_background')
Expand Down Expand Up @@ -137,7 +137,7 @@ def get_nwm_data_array(
Outputs a windowed array of national water model data for the domain and forcing variables specified.
nwm_data : 4d array (time x nwm_forcing_variable x west_east x south_north)
"""
weights_json, _ = gpkgs2weightsjson([geopackage])
weights_json, _ = hf2ds([geopackage])
x_min, x_max, y_min, y_max = get_window(weights_json)

for path, _, files in os.walk(nwm_folder):
Expand Down
21 changes: 6 additions & 15 deletions forcingprocessor/src/forcingprocessor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from datetime import datetime
import gzip
import tarfile, tempfile
from forcingprocessor.weights_hf2ds import gpkgs2weightsjson
from forcingprocessor.weights_hf2ds import hf2ds
from forcingprocessor.plot_forcings import plot_ngen_forcings
from forcingprocessor.utils import get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables

Expand Down Expand Up @@ -467,7 +467,7 @@ def write_data(
bandwidth_Mbps = rate * file_size_MB * ntasked * bytes2bits
estimate_total_time = nfiles * ntasked / rate
report_usage()
msg = f"\n{(j+1)*ntasked} files written out of {nfiles*ntasked}\n"
msg = f"\n{(j+1)*ntasked} dataframes converted out of {nfiles*ntasked}\n"
msg += f"rate {rate:.2f} files/s\n"
msg += f"df conversion {t_df:.2f}s\n"
if storage_type == "s3": msg += f"buff {t_buff:.2f}s\n"
Expand Down Expand Up @@ -731,7 +731,7 @@ def prep_ngen_data(conf):
jtype in file_types
), f"{jtype} for output_file_type is not accepted! Accepted: {file_types}"
global storage_type
if "s3" in output_path:
if "s3://" in output_path:
storage_type = "s3"
elif "google" in output_path:
storage_type = "google"
Expand All @@ -755,9 +755,6 @@ def prep_ngen_data(conf):
json.dump(conf, f)
cp_cmd = f'cp {nwm_file} {metaf_path}'
os.system(cp_cmd)
for jgpkg in gpkg_files:
cp_cmd = f'cp {jgpkg} {metaf_path}'
os.system(cp_cmd)

elif storage_type == "s3":
bucket_path = output_path
Expand All @@ -778,20 +775,13 @@ def prep_ngen_data(conf):
bucket,
filenamelist_path
)
for jgpkg in gpkg_files:
gpkg_path = f"{key}/{os.path.basename(jgpkg)}"
s3.upload_file(
jgpkg,
bucket,
gpkg_path
)

log_time("CONFIGURATION_END", log_file)

log_time("READWEIGHTS_START", log_file)
if ii_verbose: print(f'Obtaining weights from geopackage(s)\n',flush=True)
global weights_json
weights_json, jcatchment_dict = gpkgs2weightsjson(gpkg_files)
weights_json, jcatchment_dict = hf2ds(gpkg_files)
ncatchments = len(weights_json)
global x_min, x_max, y_min, y_max
x_min, x_max, y_min, y_max = get_window(weights_json)
Expand Down Expand Up @@ -864,7 +854,8 @@ def prep_ngen_data(conf):
runtime = time.perf_counter() - t_start

if ii_plot:
if len(gpkg_files) > 1: raise Warning(f'Plotting currently not implemented for more than one geopackage')
if len(gpkg_files) > 1: raise Warning(f'Plotting currently not implemented for more than one file')
if gpkg_files[0].endswith('.parquet'): raise Warning(f'Plotting currently not implemented for parquet, need geopackage')
cat_ids = ['cat-' + x for x in forcing_cat_ids]
jplot_vars = np.array([x for x in range(len(ngen_variables)) if ngen_variables[x] in ngen_vars_plot])
if storage_type == "s3":
Expand Down
17 changes: 9 additions & 8 deletions forcingprocessor/src/forcingprocessor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ def convert_url2key(nwm_file,fs_type):
_nc_file_parts = nwm_file.split('/')
layers = _nc_file_parts[3:]
for jlay in layers:
bucket_key += "/" + jlay

if fs_type == 's3':
bucket_key = _nc_file_parts[2][:-17] + bucket_key
elif fs_type == 'google':
bucket_key = "gs:/" + bucket_key
if "s3://" in nwm_file: bucket_key = bucket_key[1:]
bucket = _nc_file_parts[2]
if jlay == layers[-1]:
bucket_key += jlay
else:
bucket_key += jlay + "/"
if fs_type == "google":
bucket = _nc_file_parts[3]
elif fs_type == 's3':
bucket = _nc_file_parts[2]

return bucket, bucket_key
Loading

0 comments on commit 201a180

Please sign in to comment.