Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unwanted lake geometries #5

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

RickytheGuy
Copy link
Contributor

@RickytheGuy RickytheGuy commented Sep 18, 2024

Summary by CodeRabbit

  • New Features

    • Introduced configurable parameters for handling lakes, islands, ocean watersheds, and low-flow conditions in data processing.
    • Enhanced spatial data handling with a new overlay function for efficient intersection operations using Dask.
    • Added functionality to compute and manage stream network data more effectively, including new processing options for lake geometries.
    • Updated dependency specifications for Python and geopandas to ensure compatibility with newer versions.
    • Introduced new scripts for processing catchments and lakes, improving hydrological data organization.
    • Enhanced directory management and streamlined output directory creation in data processing scripts.
  • Bug Fixes

    • Improved handling of geometries related to lake streams, ensuring accurate representation in the basin data.

Copy link

coderabbitai bot commented Sep 18, 2024

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (1)
  • rivids_lt_5.csv is excluded by !**/*.csv

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes involve updates to several scripts and configuration files to enhance the handling of hydrological data. The environment.yml file specifies minimum version requirements for Python and geopandas. The run_1_tdx_gpkg_to_geoparquet.py script modifies how LINKNO is processed based on filename conditions. The run_3_tdxhydro_rapid_masters.py script introduces new configuration parameters for processing lakes and other water bodies. The run_4_make_vpu_files.py script updates directory paths and introduces a new variable for parquet files. Additionally, various functions in tdxhydrorapid/inputs.py, tdxhydrorapid/network.py, and tdxhydrorapid/weights.py are enhanced for improved data handling and processing capabilities.

Changes

Files Change Summary
environment.yml Updated Python version to python>=3.11 and geopandas to geopandas>=1.0. Added new dependencies: dask-geopandas, networkx, and pyogrio.
run_1_tdx_gpkg_to_geoparquet.py Modified LINKNO processing logic based on filename conditions; streamlined directory creation; added TDXHydroRegion field.
run_3_tdxhydro_rapid_masters.py Added new configuration parameters for lakes and other water bodies; updated function call to include new parameters.
run_4_make_vpu_files.py Changed input/output paths to relative; introduced new variable og_pqs for parquet files and updated function call to concat_tdxregions.
tdxhydrorapid/inputs.py Enhanced rapid_master_files function with new parameters for lake processing; added create_nexus_points helper function.
tdxhydrorapid/network.py Added dissolve_catchments function and refactored graph creation logic; improved efficiency in handling stream orders and branches.
tdxhydrorapid/weights.py Introduced overlay function for spatial intersections using Dask; updated existing functions to utilize this new overlay method.
run_1.1_lakes.py New script for processing lake geometries and their interactions with stream networks; includes functions for creating directed graphs and filling geometry holes.
run_5_global_geometry.py Replaced geopandas and pandas with dask_geopandas for reading parquet files; simplified geometry processing.
run_7_catchments.py New script for processing hydrological data related to catchments and lakes; utilizes dissolve_catchments for data management.
tdxhydrorapid/__init__.py Updated import statements with aliases for clarity in module's public interface.
tdxhydrorapid/_validate.py Updated check_outputs_are_valid function to include a use_rapid parameter for flexible validation paths.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant run_1_tdx_gpkg_to_geoparquet
    participant run_3_tdxhydro_rapid_masters
    participant rapid_master_files
    participant network

    User->>run_1_tdx_gpkg_to_geoparquet: Process GeoPackage
    run_1_tdx_gpkg_to_geoparquet->>run_3_tdxhydro_rapid_masters: Call with lake parameters
    run_3_tdxhydro_rapid_masters->>rapid_master_files: Pass lake processing parameters
    rapid_master_files->>network: Handle lakes and streams
    network->>User: Return processed data
Loading

Poem

🐰 In the meadow where rivers flow,
New paths for lakes and streams to grow.
With each hop and every change,
Our waters dance, no longer strange!
Celebrate the code, let it gleam,
A hydrological wonder, a coder's dream! 🌊✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 43b00d8 and 2eae5b0.

Files ignored due to path filters (2)
  • tdxhydrorapid/network_data/processing_options.xlsx is excluded by !**/*.xlsx
  • tdxhydrorapid/network_data/~$processing_options.xlsx is excluded by !**/*.xlsx
Files selected for processing (3)
  • run_3_tdxhydro_rapid_masters.py (3 hunks)
  • tdxhydrorapid/inputs.py (5 hunks)
  • tdxhydrorapid/network.py (2 hunks)
Additional context used
Ruff
tdxhydrorapid/network.py

4-4: json imported but unused

Remove unused import: json

(F401)

Additional comments not posted (11)
run_3_tdxhydro_rapid_masters.py (3)

66-66: LGTM!

The code for extracting the DISSOLVE_LAKES configuration parameter is consistent with the existing pattern and looks good.


76-76: LGTM!

The code for casting the DISSOLVE_LAKES parameter to a boolean type is consistent with the existing pattern and looks good.


91-91: Verify the function signature change in the codebase.

The code change looks good. However, ensure that all calls to the rapid_master_files function have been updated to pass the dissolve_lakes argument.

Run the following script to verify the function usage:

Verification successful

Changes to rapid_master_files function are correctly implemented.

The dissolve_lakes argument is properly passed in the function call within run_3_tdxhydro_rapid_masters.py, and the function definition in tdxhydrorapid/inputs.py has been updated to include this parameter. No other calls to this function were found in the codebase, so no further updates are necessary.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all calls to `rapid_master_files` pass the `dissolve_lakes` argument.

# Test: Search for the function usage. Expect: Only occurrences with the `dissolve_lakes` argument.
rg --type python -A 5 $'rapid_master_files'

Length of output: 73


Script:

#!/bin/bash
# Description: Find Python files and search for `rapid_master_files` function usage

# Find all Python files
echo "Python files in the repository:"
fd -e py

# Search for rapid_master_files function usage in Python files
echo -e "\nFunction usage:"
fd -e py -x grep -n "rapid_master_files" {} +

Length of output: 1010


Script:

#!/bin/bash
# Description: Examine rapid_master_files usage and definition

# Check function call in run_3_tdxhydro_rapid_masters.py
echo "Function call in run_3_tdxhydro_rapid_masters.py:"
sed -n '/rapid_master_files/,/)/p' run_3_tdxhydro_rapid_masters.py

# Check function definition in tdxhydrorapid/inputs.py
echo -e "\nFunction definition in tdxhydrorapid/inputs.py:"
sed -n '/def rapid_master_files/,/)/p' tdxhydrorapid/inputs.py

# Fixed grep command for cleaner output
echo -e "\nAll occurrences of rapid_master_files:"
grep -n "rapid_master_files" $(fd -e py)

Length of output: 3126

tdxhydrorapid/network.py (1)

313-325: LGTM!

The new code section for aggregating lake streams catchments looks good. It follows a clear and logical approach, handles the case when the JSON file doesn't exist, and uses appropriate data structures and operations for efficient processing. The variable names are also descriptive, enhancing code readability.

tdxhydrorapid/inputs.py (7)

121-183: Approve the addition of lake dissolution logic.

The new feature to handle lake geometries is a significant addition to the rapid_master_files function. The logic for constructing the directed graph of lakes, identifying inlets and streams that feed into the lakes, and updating the geometries in the GeoDataFrame by removing streams that are now part of the lakes is complex but well-structured.

The generated JSON output file mod_dissolve_lakes.json captures the relationships between lakes and their inlets, which could be useful for further analysis or visualization.

Overall, the changes enhance the functionality of the module by enabling the handling of lake geometries in the hydrological data processing workflow.


160-173: Review the logic for identifying initial inlet streams and streams that feed into the lake.

The logic for identifying initial inlet streams and streams that feed into the lake appears to be sound. Here's a breakdown of the steps:

  1. Initial inlet streams are identified as the leaf nodes (nodes with no incoming edges) in the lake ancestors subgraph. Among these leaf nodes, the ones with a stream order of 1 are considered as part of the lake, while the rest are classified as inlets.

  2. Streams that feed into the lake but are not part of the lake are identified by finding the predecessors of the inside streams (streams within the lake) that are not already part of the lake ancestors.

This approach effectively distinguishes between the initial inlet streams, streams that are part of the lake, and streams that feed into the lake from outside.

The logic is clear and concise, making it easy to understand and maintain.


178-179: Approve the generation of the JSON output file.

The generation of the JSON output file mod_dissolve_lakes.json is a valuable addition to the functionality. It captures the relationships between lakes and their inlets, providing a structured representation of the lake dissolution results.

This JSON file can be used for further analysis, visualization, or integration with other tools and workflows.

The code for writing the JSON file is straightforward and follows the standard json.dump() approach, ensuring compatibility and ease of use.


180-180: Approve the update of the GeoDataFrame by removing lake streams.

The update of the GeoDataFrame by removing streams that are now part of the lakes is a necessary step to maintain the integrity and consistency of the stream network.

By removing the lake streams from the GeoDataFrame, the function ensures that the subsequent processing steps operate on a clean and updated representation of the stream network.

The code for updating the GeoDataFrame is concise and uses boolean indexing to efficiently filter out the lake streams.


182-182: Approve the recreation of the directed graph after removing lake streams.

The recreation of the directed graph after removing the lake streams is crucial to maintain the correct topology and connectivity of the stream network.

By recreating the directed graph based on the updated GeoDataFrame, the function ensures that the graph accurately represents the modified stream network, taking into account the removal of lake streams.

This step is essential for subsequent processing tasks that rely on the graph structure, such as identifying downstream nodes or performing network traversals.

The code for recreating the directed graph is clear and reuses the existing create_directed_graphs() function, promoting code reuse and maintainability.


304-313: Approve the implementation of the is_stream_a_straight_line function.

The is_stream_a_straight_line function is a helper function used in the lake dissolution logic to determine if a stream is a straight line based on its geometry. The function takes the stream ID, ID field, and the GeoDataFrame as input parameters and returns a boolean value indicating whether the stream is a straight line or not.

The logic of the function is as follows:

  1. If the number of coordinates in the stream geometry is less than or equal to 2, the function immediately considers the stream as a straight line since a line with two points is always straight.

  2. For geometries with more than 2 coordinates, the function calculates the straight-line distance between the first and last coordinates using the Euclidean distance formula.

  3. It then compares the squared straight-line distance with the squared actual length of the stream geometry.

  4. If the absolute difference between the squared distances is less than a small threshold (1e-7), the function considers the stream as a straight line. The threshold value is used to account for minor variations or numerical precision issues when comparing the distances.

The function is well-documented with clear parameter names and return values. It efficiently determines if a stream is a straight line based on its geometry, which is a crucial aspect of the lake dissolution logic.

The implementation is concise and easy to understand, making it maintainable and reusable in other parts of the codebase if needed.


132-158: Verify the logic for identifying the true outlet of a lake.

The logic for identifying the true outlet of a lake involves traversing the downstream nodes until a non-straight stream is encountered. This approach assumes that lake streams are typically straight lines, and the first non-straight stream encountered is considered the true outlet of the lake.

While this assumption may hold true in most cases, there could be scenarios where a lake stream is not a perfect straight line due to geographical variations or data granularity. It's important to verify that this assumption is valid for the specific dataset being processed.

Consider adding some tolerance or threshold when comparing the straight-line distance with the actual stream length to account for minor variations.

Verification successful

Existing implementation verified, but improvements suggested for robustness

The is_stream_a_straight_line function has been located and analyzed. While it does implement a tolerance check, the current approach may be too strict for real-world scenarios:

  1. The function uses a very small tolerance (1e-7) on squared lengths, which may not account for minor variations in stream shapes.
  2. Comparing squared lengths amplifies differences, potentially leading to false negatives for nearly straight streams.

Suggested improvements:

  • Consider using a relative tolerance instead of an absolute one, e.g., abs(straight_line_len - true_len) / true_len < tolerance.
  • Adjust the tolerance value based on the dataset's characteristics and required accuracy.
  • Add a maximum absolute difference threshold to handle very short streams.

These changes would make the function more robust while maintaining the original intent of identifying straight lake streams.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the assumption that lake streams are typically straight lines.

# Test: Check if there are any lake streams that are not perfectly straight lines.
# Expect: No results or only a few edge cases.
rg --type python -A 5 $'if len\\(geom.coords\\) <= 2:\n.*return True\n.*straight_line_len = \\(\\(np\\.asarray\\(geom\\.coords\\[-1\\]\\) - np\\.asarray\\(geom\\.coords\\[0\\]\\)\\) \\*\\* 2\\)\\.sum\\(\\)\n.*true_len = geom\\.length \\*\\* 2\n.*return abs\\(straight_line_len - true_len\\) < 1e-7'

Length of output: 459


Script:

#!/bin/bash
# Description: Search for the implementation of is_stream_a_straight_line function

# Use ripgrep with multiline mode to find the function definition
rg --type python -U "def is_stream_a_straight_line\(" -A 15

# If the above doesn't yield results, try a more general search
rg --type python -U "straight_line_len.*true_len" -A 5 -B 5

Length of output: 185


Script:

#!/bin/bash
# Description: Search for the implementation of is_stream_a_straight_line function without relying on file type recognition

# Search for the function definition in .py files
rg -U "def is_stream_a_straight_line\(" -g "*.py" -A 15

# If the above doesn't yield results, try a more general search
rg -U "straight_line_len.*true_len" -g "*.py" -A 5 -B 5

# If still no results, search for any mention of the function name
rg "is_stream_a_straight_line" -g "*.py" -A 5 -B 5

Length of output: 3479

@@ -1,6 +1,7 @@
import glob
import logging
import os
import json
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The json module is imported but not used in the code. Please remove this unused import to keep the code clean.

-import json
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import json
Tools
Ruff

4-4: json imported but unused

Remove unused import: json

(F401)

@RickytheGuy
Copy link
Contributor Author

@rileyhales Here is pull request with correct, functioning version. Also, I have increased to performance significantly, about 10x speed up. To accomplish this, I use dask geopandas for intersecting with grids and lots of optimizations to some slow functions. Attached are some profiles. I've verified that my changes do not change the output files from what they were before (unless removing lakes, islands, etc.)

profile_new_lakes.txt
profile_new_nolakes.txt
profile_old.txt

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Outside diff range and nitpick comments (16)
environment.yml (1)

6-7: Document version requirement rationale.

Consider adding comments to explain why these specific minimum versions are required. This helps future maintainers understand the constraints.

 dependencies:
-  - python>=3.11
-  - geopandas>=1.0
+  # Python 3.11+ required for improved error handling and performance
+  - python>=3.11
+  # geopandas 1.0+ needed for enhanced geospatial operations
+  - geopandas>=1.0
🧰 Tools
🪛 yamllint

[warning] 6-6: wrong indentation: expected 0 but found 2

(indentation)

run_4_make_vpu_files.py (1)

Line range hint 1-13: Add documentation and separate test configurations.

The script lacks:

  1. A module-level docstring explaining its purpose and usage
  2. Clear separation between test and production configurations

Consider:

  1. Adding comprehensive documentation
  2. Moving test configurations to a separate test configuration file
  3. Adding type hints for better code maintainability
+"""
+Create VPU (Vector Processing Unit) files from TDXHydro data.
+
+This script processes hydrological data and creates VPU files by:
+1. Creating a master table from TDXHydro regions
+2. Processing each VPU separately
+3. Generating necessary input files for each VPU
+
+Environment Variables:
+    TDX_INPUTS_DIR: Directory containing TDXHydro input files
+    PARQUETS_DIR: Directory containing parquet files
+    FINAL_OUTPUT_DIR: Directory for output files
+"""
run_1_tdx_gpkg_to_geoparquet.py (2)

Line range hint 57-75: Remove redundant LINKNO assignment and add data validation.

There are several concerns in this block:

  1. The LINKNO assignment on line 57 is redundant as it's already processed on line 54
  2. There's no validation of input data ranges before applying the header number multiplication

Apply this diff to fix the issues:

-            gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000)
+            # Validate input ranges to prevent integer overflow
+            if (gdf['LINKNO'].max() + (tdx_header_number * 10_000_000) > sys.maxsize or
+                gdf['DSLINKNO'].max() + (tdx_header_number * 10_000_000) > sys.maxsize):
+                raise ValueError("Integer overflow detected in ID generation")
+
             gdf['DSLINKNO'] = gdf['DSLINKNO'].astype(int)

Line range hint 6-11: Consider architectural improvements for better maintainability.

Several architectural improvements could enhance the script:

  1. Replace hardcoded paths with environment variables or configuration files
  2. Add proper error handling around file operations
  3. Add progress tracking for large datasets

Consider these improvements:

+import os
+from pathlib import Path
+from tqdm import tqdm
+
-gpkg_dir = '/Volumes/T9Hales4TB/TDXHydro'
-gpq_dir = '/Volumes/T9Hales4TB/TDXHydroGeoParquet'
+gpkg_dir = os.getenv('TDX_HYDRO_DIR', '/Volumes/T9Hales4TB/TDXHydro')
+gpq_dir = os.getenv('TDX_PARQUET_DIR', '/Volumes/T9Hales4TB/TDXHydroGeoParquet')

And wrap the main processing loop with error handling and progress tracking:

-    for gpkg in sorted(glob.glob(os.path.join(gpkg_dir, 'TDX*.gpkg'))):
+    gpkg_files = sorted(glob.glob(os.path.join(gpkg_dir, 'TDX*.gpkg')))
+    for gpkg in tqdm(gpkg_files, desc="Processing GeoPackage files"):
+        try:
             # ... existing processing code ...
+        except Exception as e:
+            logging.error(f"Error processing {gpkg}: {str(e)}")
+            continue
run_3_tdxhydro_rapid_masters.py (1)

91-108: Consider refactoring to reduce parameter count.

While the implementation is functional, the high number of parameters (15+) suggests that the rapid_master_files function might be handling too many responsibilities. Consider refactoring to use a configuration object pattern.

Example approach:

class RapidConfig:
    def __init__(self):
        self.id_field = None
        self.ds_id_field = None
        # ... other config fields

    @classmethod
    def from_excel(cls, excel_df, region_num):
        config = cls()
        config.dissolve_lakes = bool(excel_df.loc[excel_df['region_number'] == region_num, 'dissolve_lakes'].values[0])
        # ... other fields
        return config

# Usage
config = RapidConfig.from_excel(net_df, region_num)
rp.inputs.rapid_master_files(streams_gpq, save_dir=save_dir, config=config)
tdxhydrorapid/weights.py (3)

83-83: Typo in comment: Correct 'calcualting' to 'calculating'

There's a minor typo in the comment. The word 'calcualting' should be corrected to 'calculating' for clarity.

Apply this diff to fix the typo:

-).sort_index()  # Sort index is needed (used when calcualting area_sqm)
+).sort_index()  # Sort index is needed (used when calculating area_sqm)

257-258: Type consistency when checking for outlet in wt[id_field].values

The outlet variable is converted to an integer, but wt[id_field].values may contain values of a different type (e.g., numpy.int64). This could cause the membership test to fail even if the values are numerically equal.

Ensure type consistency by converting wt[id_field] to integers before the check:

if outlet in wt[id_field].astype(int).values:

262-264: Consider regrouping after deleting streams

After modifying wt by deleting streams, it may be beneficial to regroup and sum the area_sqm to ensure data consistency, similar to previous steps.

Apply this diff to regroup the DataFrame:

wt = wt[~wt[id_field].isin(streams_to_delete)]
+ wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
tdxhydrorapid/network.py (5)

57-57: Simplify conditional expression for clarity

The condition not len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2 can be simplified using the != operator for better readability.

Apply this change:

-if len(pred_orders) == 2 and not len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2:
+if len(pred_orders) == 2 and len([x for x in pred_orders if x == (min_order_to_keep - 1)]) != 2:
🧰 Tools
🪛 Ruff

57-57: Use len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2 instead of not len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2

Replace with != operator

(SIM201)


61-61: Simplify conditional expression for clarity

Similarly, the condition not max(pred_orders) == min_order_to_keep - 1 can be simplified using the != operator.

Apply this change:

-if (len(pred_orders) > 2) and \
-        (not max(pred_orders) == min_order_to_keep - 1) and \
-        (len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2):
+if (len(pred_orders) > 2) and \
+        (max(pred_orders) != min_order_to_keep - 1) and \
+        (len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2):
🧰 Tools
🪛 Ruff

61-61: Use max(pred_orders) != min_order_to_keep - 1 instead of not max(pred_orders) == min_order_to_keep - 1

Replace with != operator

(SIM201)


88-88: Rename unused variable index to _

The loop variable index is not used within the loop body. Renaming it to _ indicates that it is intentionally unused.

Apply this change:

-for index, row in order1s.iterrows():
+for _, row in order1s.iterrows():
🧰 Tools
🪛 Ruff

88-88: Loop control variable index not used within loop body

Rename unused index to _index

(B007)


127-129: Use logging instead of print statements for error handling

Using print statements for error messages is not recommended. Replace them with appropriate logging calls to maintain consistent and configurable logging.

Apply this change:

-    except Exception as e:
-        print(e)
-        print(siblings)
-        print(row)
+    except Exception as e:
+        logger.error(f"An error occurred: {e}")
+        logger.debug(f"Siblings: {siblings}")
+        logger.debug(f"Row data: {row}")

351-380: Ensure consistent use of filesystem operations and add type annotations

In the dissolve_catchments function, the filesystem object fs is used alongside os.path functions. For consistency and to support various filesystem backends, consider using fs methods for all filesystem operations. Also, add a type annotation for fs for clarity.

Apply these changes:

  • Add type annotation for fs:

    -def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, fs, id_field: str = 'LINKNO') -> gpd.GeoDataFrame:
    +def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, fs: fsspec.AbstractFileSystem, id_field: str = 'LINKNO') -> gpd.GeoDataFrame:
  • Use fs.path.join instead of os.path.join:

    -    headwater_dissolve_path = os.path.join(save_dir, 'mod_dissolve_headwater.csv')
    +    headwater_dissolve_path = fs.path.join(save_dir, 'mod_dissolve_headwater.csv')
  • Replace os.path.exists with fs.exists where appropriate.

Ensure that all path operations and file manipulations consistently use the same filesystem interface.

tdxhydrorapid/inputs.py (3)

239-242: Simplify if-else block using a ternary operator

You can make the code more concise by replacing the if-else block with a ternary operator.

Apply this diff:

 if downstream:
     downstream = downstream[0]
 else:
     downstream = -1
+downstream = downstream[0] if downstream else -1
🧰 Tools
🪛 Ruff

239-242: Use ternary operator downstream = downstream[0] if downstream else -1 instead of if-else-block

Replace if-else-block with downstream = downstream[0] if downstream else -1

(SIM108)


460-461: Typographical error in comment

There's a typo in the comment. It should be "Check if the stream is a straight line."

Apply this diff:

-    # heck if the stream is a straight line
+    # Check if the stream is a straight line

527-527: Unused loop variable idx

The variable idx is not used within the loop, which is unnecessary.

Rename idx to _ to indicate it's intentionally unused.

Apply this diff:

-for idx, (river_id_to_keep, river_id_to_drop) in short_streams.iterrows():
+for _, (river_id_to_keep, river_id_to_drop) in short_streams.iterrows():
🧰 Tools
🪛 Ruff

527-527: Loop control variable idx not used within loop body

Rename unused idx to _idx

(B007)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2eae5b0 and d11957f.

⛔ Files ignored due to path filters (3)
  • tdxhydrorapid/network_data/bad_streams.csv is excluded by !**/*.csv
  • tdxhydrorapid/network_data/processing_options.xlsx is excluded by !**/*.xlsx
  • tdxhydrorapid/network_data/rivids_lt_5_cms.csv is excluded by !**/*.csv
📒 Files selected for processing (7)
  • environment.yml (1 hunks)
  • run_1_tdx_gpkg_to_geoparquet.py (2 hunks)
  • run_3_tdxhydro_rapid_masters.py (3 hunks)
  • run_4_make_vpu_files.py (2 hunks)
  • tdxhydrorapid/inputs.py (8 hunks)
  • tdxhydrorapid/network.py (8 hunks)
  • tdxhydrorapid/weights.py (5 hunks)
🧰 Additional context used
🪛 yamllint
environment.yml

[warning] 6-6: wrong indentation: expected 0 but found 2

(indentation)

🪛 Ruff
tdxhydrorapid/inputs.py

195-195: Loop control variable key not used within loop body

(B007)


239-242: Use ternary operator downstream = downstream[0] if downstream else -1 instead of if-else-block

Replace if-else-block with downstream = downstream[0] if downstream else -1

(SIM108)


329-329: Loop control variable key not used within loop body

(B007)


527-527: Loop control variable idx not used within loop body

Rename unused idx to _idx

(B007)

tdxhydrorapid/network.py

57-57: Use len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2 instead of not len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2

Replace with != operator

(SIM201)


61-61: Use max(pred_orders) != min_order_to_keep - 1 instead of not max(pred_orders) == min_order_to_keep - 1

Replace with != operator

(SIM201)


88-88: Loop control variable index not used within loop body

Rename unused index to _index

(B007)

🔇 Additional comments (13)
environment.yml (1)

6-7: Consider specifying upper version bounds.

While using minimum version constraints (>=) provides flexibility, it might lead to compatibility issues if future major versions introduce breaking changes. Consider using version ranges (e.g., python>=3.11,<3.12 and geopandas>=1.0,<2.0) to ensure reproducible environments.

🧰 Tools
🪛 yamllint

[warning] 6-6: wrong indentation: expected 0 but found 2

(indentation)

run_4_make_vpu_files.py (1)

31-31: Verify the impact of the new og_pqs parameter.

The addition of the og_pqs parameter to concat_tdxregions could affect the master table creation process. Please ensure:

  1. The function handles empty or invalid parquet file lists gracefully
  2. The behavior is documented in the function's docstring

Let's verify the function implementation and its other callers:

✅ Verification successful

The og_pqs parameter is properly handled and documented

The function implementation shows:

  • The original_streams parameter (aliased as og_pqs) is optional with a default value of None
  • The docstring clearly explains the parameter's purpose for VPU code assignment using downstream terminal nodes
  • The implementation gracefully handles both cases (with and without the parameter) through conditional logic
  • The function is only called from one location, which has been properly updated
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check the function implementation and documentation
ast-grep --pattern 'def concat_tdxregions($$$)'

# Find other callers of this function
rg 'concat_tdxregions\(' --type py

Length of output: 369


Script:

#!/bin/bash
# Let's check the implementation details of concat_tdxregions
rg -A 20 'def concat_tdxregions' tdxhydrorapid/inputs.py

Length of output: 1498

run_1_tdx_gpkg_to_geoparquet.py (1)

77-78: 🛠️ Refactor suggestion

Verify the different processing logic for non-streamnet files.

The non-streamnet files are processed differently with minimal transformations:

  1. No validation of streamID values before processing
  2. Missing fields that are present in streamnet files (strmOrder, Magnitude, etc.)
  3. Dropping streamID might lose valuable debugging/tracking information

Let's verify if this difference is intentional:

Consider adding validation and preserving more information:

+            # Validate streamID values
+            if gdf['streamID'].isnull().any():
+                raise ValueError("Found null streamID values")
+
             gdf['LINKNO'] = gdf['streamID'].astype(int) + (tdx_header_number * 10_000_000)
-            gdf = gdf.drop(columns=['streamID'])
+            # Add common fields for consistency
+            gdf['TDXHydroRegion'] = region_number
+            gdf['LengthGeodesicMeters'] = gdf['geometry'].apply(_calculate_geodesic_length)
run_3_tdxhydro_rapid_masters.py (4)

66-70: LGTM: New configuration parameters are well-structured.

The new configuration parameters for lakes, islands, and ocean watersheds follow consistent naming and implementation patterns.


80-84: LGTM: Type casting is properly implemented.

Explicit boolean type casting ensures type safety and consistency with other configuration parameters.


126-126: LGTM: Improved weight table existence check.

The simplified check using list comprehension improves code readability while maintaining the same functionality.


Line range hint 66-108: Verify performance impact of new filtering options.

The addition of multiple new filtering options (lakes, islands, ocean watersheds) could impact processing time. Consider measuring and documenting the performance impact, especially for large regions.

tdxhydrorapid/weights.py (4)

270-277: Potential issue with outlet value -1 in island streams processing

In the loop for processing island streams, if outlet == -1, all streams in island_streams are added to streams_to_delete. Ensure that this logic aligns with the intended behavior. Also, verify that the wt[id_field] contains the correct data types for comparison.

Double-check the handling of -1 as an outlet and confirm that streams are being merged or deleted as expected.


290-292: Possible data inconsistency due to repeated grouping

Repeated grouping and summing without careful handling can sometimes lead to data inconsistencies. Review the grouping operations to ensure that the area_sqm and other aggregated fields correctly represent the data.

Ensure that the grouping fields are appropriate and that no critical information is lost during aggregation.


106-109: ⚠️ Potential issue

Ensure consistent projection before area calculation

In the make_weight_table_from_thiessen_grid function, the area is computed without ensuring the geometries are in a projected CRS. Since area calculations are inaccurate in geographic coordinates, please confirm that the geometries are projected appropriately before calculating the area.

Consider adding a projection step or verify that the overlay function outputs geometries in the desired CRS.


225-226: Verify the mapping keys and values for pruning streams

When creating ids_to_prune, the mapping from LINKTODROP to LINKNO assumes that these columns exist and are correctly set as integers. Verify that the CSV file has the appropriate columns and that the mapping does not unintentionally alter id_field.

Run the following script to check for potential issues:

tdxhydrorapid/inputs.py (2)

462-464: Potential IndexError when accessing geometry

When accessing sgdf[sgdf[id_field]==stream_id].geometry.values[0], if stream_id does not exist in sgdf, this will raise an IndexError.

Please verify that stream_id always exists in sgdf. If not, consider adding a check before accessing .values[0]:

if sgdf[sgdf[id_field] == stream_id].empty:
    # Handle the case where stream_id is not found
else:
    geom = sgdf[sgdf[id_field] == stream_id].geometry.values[0]

467-469: ⚠️ Potential issue

Logical error in straight-line detection algorithm

The method for determining if a stream is a straight line by comparing squared lengths may not be reliable due to floating-point precision issues.

Consider using a geometric approach to determine if the stream is a straight line, such as checking if all intermediate points lie on the line defined by the start and end points.

Comment on lines 11 to 13
tdx_inputs_dir = 'test/rapid_inputs'
og_pqs = glob.glob("/Volumes/EB406_T7_3/geoglows_v3/parquets/TDX_streamnet_*_01.parquet")
final_output_dir = 'test/outputs'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove hardcoded absolute paths and standardize path handling.

The current implementation has several issues:

  1. Hardcoded absolute path /Volumes/EB406_T7_3/... makes the code non-portable and environment-dependent
  2. Inconsistent path style (mixing relative and absolute paths)

Consider:

  1. Using environment variables or configuration files for path management
  2. Making all paths relative to the project root
  3. Adding path validation
-tdx_inputs_dir = 'test/rapid_inputs'
-og_pqs = glob.glob("/Volumes/EB406_T7_3/geoglows_v3/parquets/TDX_streamnet_*_01.parquet")
-final_output_dir = 'test/outputs'
+# Load paths from environment or config
+tdx_inputs_dir = os.getenv('TDX_INPUTS_DIR', 'test/rapid_inputs')
+parquets_dir = os.getenv('PARQUETS_DIR', 'data/parquets')
+og_pqs = glob.glob(os.path.join(parquets_dir, "TDX_streamnet_*_01.parquet"))
+final_output_dir = os.getenv('FINAL_OUTPUT_DIR', 'test/outputs')

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +220 to +221
merge_dict = {stream: streams_to_merge[0] for streams_to_merge in o2_to_dissolve.values for stream in streams_to_merge}
wt[id_field] = wt[id_field].map(merge_dict).fillna(wt[id_field]).astype(int)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible issue with merge dictionary creation

When creating merge_dict, the comprehension uses o2_to_dissolve.values, which may not produce the expected structure if o2_to_dissolve is a DataFrame. This could lead to incorrect mapping.

Ensure that o2_to_dissolve.values yields the correct lists for streams_to_merge. You might need to adjust the code to iterate over the DataFrame rows properly.

Comment on lines +286 to 288
if merge_stream[0] not in wt[id_field].values or merge_stream[1] not in wt[id_field].values:
continue
wt[id_field] = wt[id_field].replace(merge_stream[1], merge_stream[0])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper handling when merging short streams

In the loop for merging short streams, the condition checks if either stream ID is not present and continues. This may skip necessary merges if one of the IDs is missing due to previous operations.

Consider adjusting the logic to handle cases where one ID is missing, or log the occurrence for further inspection.

tg_ddf: dgpd.GeoDataFrame = dgpd.from_geopandas(tg_gdf, npartitions=1)
basins_ddf: dgpd.GeoDataFrame = dgpd.from_geopandas(basins_gdf, npartitions=80)
intersections = overlay(basins_ddf, tg_ddf)
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80).to_crs({'proj':'cea'}).area.compute()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Redundant reprojection before area computation

The intersections GeoDataFrame is already projected to Cylindrical Equal Area in the overlay function. Reprojecting again in line 195 is unnecessary and may impact performance.

Apply this diff to remove the redundant to_crs call:

intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80)\
-    .to_crs({'proj': 'cea'}).area.compute()
+    .area.compute()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80).to_crs({'proj':'cea'}).area.compute()
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80).area.compute()

Comment on lines +239 to +245
for outlet, _, lake_streams in lake_streams_df.itertuples():
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect unpacking in loop over lake_streams_df.itertuples()

When iterating over lake_streams_df.itertuples(), the unpacking should consider that the first element is the Index. The current code may raise a ValueError due to mismatched unpacking.

Adjust the loop to unpack correctly. Here's a corrected version:

- for outlet, _, lake_streams in lake_streams_df.itertuples():
+ for idx, outlet, _, lake_streams in lake_streams_df.itertuples():

Or, if the DataFrame columns are known, consider using itertuples(index=False):

- for outlet, _, lake_streams in lake_streams_df.itertuples():
+ for outlet, _, lake_streams in lake_streams_df.itertuples(index=False):

Ensure the unpacking matches the structure of the tuples returned by itertuples().

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for outlet, _, lake_streams in lake_streams_df.itertuples():
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)
for outlet, _, lake_streams in lake_streams_df.itertuples(index=False):
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)

Comment on lines 195 to 198
for key, items in dissolve_lake_dict.items():
if outlet in items:
break
real_outlet = key
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential use of loop variable key outside its scope

In the loop where you iterate over dissolve_lake_dict.items(), the variable key may not be defined if the loop completes without finding a matching outlet. Using key outside the loop without ensuring it was set can lead to a UnboundLocalError.

Apply this diff to ensure real_outlet is properly assigned:

 for key, items in dissolve_lake_dict.items():
     if outlet in items:
         break
+else:
+    raise ValueError(f"Outlet {outlet} not found in dissolve_lake_dict")
-real_outlet = key
+real_outlet = key
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for key, items in dissolve_lake_dict.items():
if outlet in items:
break
real_outlet = key
for key, items in dissolve_lake_dict.items():
if outlet in items:
break
else:
raise ValueError(f"Outlet {outlet} not found in dissolve_lake_dict")
real_outlet = key
🧰 Tools
🪛 Ruff

195-195: Loop control variable key not used within loop body

(B007)

Comment on lines +329 to +332
for key, items in islands_dict.items():
if outlet in items:
break
real_outlet = key
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential use of loop variable key outside its scope

Similar to lines 195-198, the variable key may be undefined outside the loop if the loop doesn't break. This can cause a UnboundLocalError when real_outlet = key is executed.

Apply this diff to handle the case properly:

 for key, items in islands_dict.items():
     if outlet in items:
         break
+else:
+    raise ValueError(f"Outlet {outlet} not found in islands_dict")
-real_outlet = key
+real_outlet = key
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for key, items in islands_dict.items():
if outlet in items:
break
real_outlet = key
for key, items in islands_dict.items():
if outlet in items:
break
else:
raise ValueError(f"Outlet {outlet} not found in islands_dict")
real_outlet = key
🧰 Tools
🪛 Ruff

329-329: Loop control variable key not used within loop body

(B007)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between d11957f and 4260eff.

📒 Files selected for processing (1)
  • tdxhydrorapid/weights.py (5 hunks)
🔇 Additional comments (2)
tdxhydrorapid/weights.py (2)

106-109: LGTM! Efficient spatial operations setup

The conversion to Dask GeoDataFrames with appropriate partition sizes looks good. Using 1 partition for the thiessen grid (smaller dataset) and 80 for the basins (larger dataset) is a good optimization.

Also applies to: 192-195


286-288: LGTM! Correct handling of missing stream IDs

The check for missing stream IDs before merging is appropriate and matches the PR's objective of removing unwanted geometries.

Comment on lines +74 to +83
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Performs a spatial intersection overlay of two GeoDataFrames using Dask.
Reproject geometries to a Cylindrical Equal Area projection.
"""
return gpd.GeoDataFrame(
left.sjoin(right.assign(right_geometry=right.geometry))
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
.drop(columns="right_geometry")
.compute()).sort_index() # Sort index is needed (used when calcualting area_sqm)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize projection timing in overlay function

The projection to CEA should occur before the intersection operation to ensure geometric accuracy. Currently, it's done after the intersection which could lead to less accurate results.

Apply this diff to fix the projection timing:

def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Performs a spatial intersection overlay of two GeoDataFrames using Dask.
    Reproject geometries to a Cylindrical Equal Area projection.
    """
+   left = left.to_crs({'proj': 'cea'})
+   right = right.to_crs({'proj': 'cea'})
    return gpd.GeoDataFrame(
        left.sjoin(right.assign(right_geometry=right.geometry))
-       .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
+       .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry))
        .drop(columns="right_geometry")
        .compute()).sort_index()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Performs a spatial intersection overlay of two GeoDataFrames using Dask.
Reproject geometries to a Cylindrical Equal Area projection.
"""
return gpd.GeoDataFrame(
left.sjoin(right.assign(right_geometry=right.geometry))
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
.drop(columns="right_geometry")
.compute()).sort_index() # Sort index is needed (used when calcualting area_sqm)
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Performs a spatial intersection overlay of two GeoDataFrames using Dask.
Reproject geometries to a Cylindrical Equal Area projection.
"""
left = left.to_crs({'proj': 'cea'})
right = right.to_crs({'proj': 'cea'})
return gpd.GeoDataFrame(
left.sjoin(right.assign(right_geometry=right.geometry))
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry))
.drop(columns="right_geometry")
.compute()).sort_index() # Sort index is needed (used when calcualting area_sqm)

Comment on lines +235 to +247
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json')
if os.path.exists(lake_streams_path):
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False)
streams_to_delete = set()
for outlet, _, lake_streams in lake_streams_df.itertuples():
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)

wt = wt[~wt[id_field].isin(streams_to_delete)]
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for lake streams processing

The lake streams processing lacks validation of JSON data structure and type conversion safety checks.

Add error handling:

 lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json')
 if os.path.exists(lake_streams_path):
+    try:
         lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False)
         streams_to_delete = set()
         for outlet, _, lake_streams in lake_streams_df.itertuples():
-            outlet = int(outlet)
+            try:
+                outlet = int(outlet)
+            except ValueError:
+                logger.warning(f"Invalid outlet ID format in lake streams: {outlet}")
+                continue
             if outlet in wt[id_field].values:
                 wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
             else:
                 streams_to_delete.update(lake_streams)
+    except ValueError as e:
+        logger.error(f"Invalid JSON format in lake streams file: {e}")
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json')
if os.path.exists(lake_streams_path):
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False)
streams_to_delete = set()
for outlet, _, lake_streams in lake_streams_df.itertuples():
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)
wt = wt[~wt[id_field].isin(streams_to_delete)]
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json')
if os.path.exists(lake_streams_path):
try:
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False)
streams_to_delete = set()
for outlet, _, lake_streams in lake_streams_df.itertuples():
try:
outlet = int(outlet)
except ValueError:
logger.warning(f"Invalid outlet ID format in lake streams: {outlet}")
continue
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)
wt = wt[~wt[id_field].isin(streams_to_delete)]
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
except ValueError as e:
logger.error(f"Invalid JSON format in lake streams file: {e}")
raise

Comment on lines +265 to +279
island_streams_path = os.path.join(save_dir, 'mod_dissolve_islands.json')
if os.path.exists(island_streams_path):
with open(island_streams_path, 'r') as f:
island_streams_dict: dict = json.load(f)

streams_to_delete = set()
for outlet, island_streams in island_streams_dict.items():
outlet = int(outlet)
if outlet == -1 or outlet not in wt[id_field].values:
streams_to_delete.update(island_streams)
else:
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet

wt = wt[~wt[id_field].isin(streams_to_delete)]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for island streams processing

The island streams processing also needs validation and error handling.

Add error handling:

 if os.path.exists(island_streams_path):
+    try:
         with open(island_streams_path, 'r') as f:
-            island_streams_dict: dict = json.load(f)
+            island_streams_dict: dict = json.load(f)
+            if not isinstance(island_streams_dict, dict):
+                raise ValueError("Expected dictionary in island streams file")

         streams_to_delete = set()
         for outlet, island_streams in island_streams_dict.items():
-            outlet = int(outlet)
+            try:
+                outlet = int(outlet)
+            except ValueError:
+                logger.warning(f"Invalid outlet ID format in island streams: {outlet}")
+                continue
             if outlet == -1 or outlet not in wt[id_field].values:
                 streams_to_delete.update(island_streams)
             else:
                 wt.loc[wt[id_field].isin(island_streams), id_field] = outlet
+    except (json.JSONDecodeError, ValueError) as e:
+        logger.error(f"Error processing island streams file: {e}")
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
island_streams_path = os.path.join(save_dir, 'mod_dissolve_islands.json')
if os.path.exists(island_streams_path):
with open(island_streams_path, 'r') as f:
island_streams_dict: dict = json.load(f)
streams_to_delete = set()
for outlet, island_streams in island_streams_dict.items():
outlet = int(outlet)
if outlet == -1 or outlet not in wt[id_field].values:
streams_to_delete.update(island_streams)
else:
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet
wt = wt[~wt[id_field].isin(streams_to_delete)]
island_streams_path = os.path.join(save_dir, 'mod_dissolve_islands.json')
if os.path.exists(island_streams_path):
try:
with open(island_streams_path, 'r') as f:
island_streams_dict: dict = json.load(f)
if not isinstance(island_streams_dict, dict):
raise ValueError("Expected dictionary in island streams file")
streams_to_delete = set()
for outlet, island_streams in island_streams_dict.items():
try:
outlet = int(outlet)
except ValueError:
logger.warning(f"Invalid outlet ID format in island streams: {outlet}")
continue
if outlet == -1 or outlet not in wt[id_field].values:
streams_to_delete.update(island_streams)
else:
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet
wt = wt[~wt[id_field].isin(streams_to_delete)]
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Error processing island streams file: {e}")
raise

Comment on lines +249 to +264
low_flow_streams_path = os.path.join(save_dir, 'mod_drop_low_flow.json')
if os.path.exists(low_flow_streams_path):
with open(low_flow_streams_path, 'r') as f:
low_flow_streams_dict: dict = json.load(f)

streams_to_delete = set()
for outlet, low_flow_streams in low_flow_streams_dict.items():
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet
else:
streams_to_delete.update(low_flow_streams)

wt = wt[~wt[id_field].isin(streams_to_delete)]
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for low flow streams processing

Similar to lake streams, the low flow streams processing needs validation and error handling.

Add error handling:

 if os.path.exists(low_flow_streams_path):
+    try:
         with open(low_flow_streams_path, 'r') as f:
-            low_flow_streams_dict: dict = json.load(f)
+            low_flow_streams_dict: dict = json.load(f)
+            if not isinstance(low_flow_streams_dict, dict):
+                raise ValueError("Expected dictionary in low flow streams file")

         streams_to_delete = set()
         for outlet, low_flow_streams in low_flow_streams_dict.items():
-            outlet = int(outlet)
+            try:
+                outlet = int(outlet)
+            except ValueError:
+                logger.warning(f"Invalid outlet ID format in low flow streams: {outlet}")
+                continue
             if outlet in wt[id_field].values:
                 wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet
             else:
                 streams_to_delete.update(low_flow_streams)
+    except (json.JSONDecodeError, ValueError) as e:
+        logger.error(f"Error processing low flow streams file: {e}")
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
low_flow_streams_path = os.path.join(save_dir, 'mod_drop_low_flow.json')
if os.path.exists(low_flow_streams_path):
with open(low_flow_streams_path, 'r') as f:
low_flow_streams_dict: dict = json.load(f)
streams_to_delete = set()
for outlet, low_flow_streams in low_flow_streams_dict.items():
outlet = int(outlet)
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet
else:
streams_to_delete.update(low_flow_streams)
wt = wt[~wt[id_field].isin(streams_to_delete)]
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
low_flow_streams_path = os.path.join(save_dir, 'mod_drop_low_flow.json')
if os.path.exists(low_flow_streams_path):
try:
with open(low_flow_streams_path, 'r') as f:
low_flow_streams_dict: dict = json.load(f)
if not isinstance(low_flow_streams_dict, dict):
raise ValueError("Expected dictionary in low flow streams file")
streams_to_delete = set()
for outlet, low_flow_streams in low_flow_streams_dict.items():
try:
outlet = int(outlet)
except ValueError:
logger.warning(f"Invalid outlet ID format in low flow streams: {outlet}")
continue
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet
else:
streams_to_delete.update(low_flow_streams)
wt = wt[~wt[id_field].isin(streams_to_delete)]
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Error processing low flow streams file: {e}")
raise

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Outside diff range and nitpick comments (10)
run_1.1_lakes.py (2)

51-185: Refactor the Main Execution Block into Smaller Functions

The main execution block is lengthy and contains nested loops and conditional statements, which can be difficult to read and maintain. Refactor the code into smaller, well-named functions that perform specific tasks. This will improve readability and make the code easier to debug and test.


64-64: Optimize Spatial Join Performance

Performing a spatial join on large datasets can be resource-intensive. Ensure that spatial indexing is utilized to optimize performance. In Dask GeoDataFrame, spatial partitions and indexing can significantly improve the efficiency of spatial joins.

Consider verifying that spatial partitions are set up correctly:

dgdf = dgdf.partition_quadTree()
lakes_subset = dgpd.from_geopandas(lakes_subset).partition_quadTree()
intersect = dgpd.sjoin(dgdf, lakes_subset, how='inner', predicate='intersects').compute()
run_7_catchments.py (1)

6-6: Remove Unused Import

The tqdm module is imported but not used in the script. Removing unused imports improves code clarity and reduces potential confusion.

Apply this diff to remove the unused import:

-import tqdm
🧰 Tools
🪛 Ruff (0.8.0)

6-6: tqdm imported but unused

Remove unused import: tqdm

(F401)

run_1_tdx_gpkg_to_geoparquet.py (1)

9-9: Remove Unused Import

The Point class from shapely.geometry is imported but not used in the script. Removing unused imports helps keep the code clean and maintainable.

Apply this diff to remove the unused import:

-from shapely.geometry import Point
🧰 Tools
🪛 Ruff (0.8.0)

9-9: shapely.geometry.Point imported but unused

Remove unused import: shapely.geometry.Point

(F401)

run_4_make_vpu_files.py (2)

65-67: Remove unused exception variable.

The error handling is improved with the use of traceback.format_exc(), but the exception variable e is unused.

-    except Exception as e:
+    except Exception:
         logging.error(vpu)
         logging.error(tdx_region)
         logging.error(traceback.format_exc())

50-50: Improve readability of the conditional check.

The current condition combines multiple checks in a way that's hard to read. Consider splitting it for better clarity.

-    if os.path.exists(vpu_dir) and (not MAKE_GPKG or os.path.exists(os.path.join(gpkg_dir, f'streams_{vpu}.gpkg'))):
+    gpkg_path = os.path.join(gpkg_dir, f'streams_{vpu}.gpkg')
+    if os.path.exists(vpu_dir) and (not MAKE_GPKG or os.path.exists(gpkg_path)):
tdxhydrorapid/_validate.py (1)

Line range hint 40-77: Consider extracting validation logic into separate functions.

While the changes effectively support both rapid and non-rapid validation paths, the function could be more maintainable if split into smaller, focused functions.

Consider refactoring like this:

 def check_outputs_are_valid(input_dir: str, use_rapid: bool = False) -> bool:
+    def _validate_rapid_files():
+        return {
+            'comid_lat_lon_z': pd.read_csv(os.path.join(input_dir, 'comid_lat_lon_z.csv')).shape[0],
+            'rapid_connect': pd.read_csv(os.path.join(input_dir, 'rapid_connect.csv'), header=None).shape[0],
+            'riv_bas_id': pd.read_csv(os.path.join(input_dir, 'riv_bas_id.csv'), header=None).shape[0],
+            'k': pd.read_csv(os.path.join(input_dir, 'k.csv'), header=None).shape[0],
+            'x': pd.read_csv(os.path.join(input_dir, 'x.csv'), header=None).shape[0],
+        }

+    def _validate_parquet_files():
+        return {
+            'connectivity': pd.read_parquet(os.path.join(input_dir, 'connectivity.parquet')).shape[0],
+            'routing_parameters': pd.read_parquet(os.path.join(input_dir, 'routing_parameters.parquet')).shape[0],
+        }

     file_counts = _validate_rapid_files() if use_rapid else _validate_parquet_files()
     n_weights = []
     for f in sorted(glob.glob(os.path.join(input_dir, 'weight_*.csv'))):
         df = pd.read_csv(f)
         n_weights.append((os.path.basename(f), df.iloc[:, 0].unique().shape[0]))

     logger.info('Checking for consistent numbers of basins in generated files')
-    if use_rapid:
-        all_nums = [n_comid_lat_lon_z, n_rapidconnect, n_rivbasid, n_k, n_x] + [n for f, n in n_weights if 'full' not in f]
-    else:
-        all_nums = [n_connectivity, n_routing_params] + [n for f, n in n_weights if 'full' not in f]
+    all_nums = list(file_counts.values()) + [n for f, n in n_weights if 'full' not in f]
run_3_tdxhydro_rapid_masters.py (1)

92-93: Simplify the complex file existence check.

The condition for checking file existence is hard to read with multiple conditions and line breaks.

-        if not os.path.exists(os.path.join(save_dir, 'rapid_inputs_master.parquet')) or \
-                (CACHE_GEOMETRY and not len(list(glob.glob(os.path.join(save_dir, '*.geoparquet'))))) or \
-                not os.path.exists(os.path.join(save_dir, 'rapid_inputs_master.parquet')):
+        master_file = os.path.join(save_dir, 'rapid_inputs_master.parquet')
+        geoparquet_files = list(glob.glob(os.path.join(save_dir, '*.geoparquet')))
+        needs_processing = (
+            not os.path.exists(master_file) or
+            (CACHE_GEOMETRY and not len(geoparquet_files))
+        )
+        if needs_processing:
tdxhydrorapid/network.py (1)

59-65: Simplify complex conditional logic

The nested conditions are hard to read and maintain. Consider simplifying the logic using clearer variable names and separate conditions.

Apply this diff to improve readability:

-    if len(pred_orders) == 2 and not len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2:
+    has_two_predecessors = len(pred_orders) == 2
+    both_are_lower_order = len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2
+    if has_two_predecessors and not both_are_lower_order:
         continue

-    if (len(pred_orders) > 2) and \
-            (not max(pred_orders) == min_order_to_keep - 1) and \
-            (len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2):
+    has_multiple_predecessors = len(pred_orders) > 2
+    highest_order_matches = max(pred_orders) == min_order_to_keep - 1
+    has_two_lower_order = len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2
+    if has_multiple_predecessors and not highest_order_matches and has_two_lower_order:
         continue
🧰 Tools
🪛 Ruff (0.8.0)

59-59: Use len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2 instead of not len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2

Replace with != operator

(SIM201)


63-63: Use max(pred_orders) != min_order_to_keep - 1 instead of not max(pred_orders) == min_order_to_keep - 1

Replace with != operator

(SIM201)

tdxhydrorapid/inputs.py (1)

631-666: Add type hints to river_route_inputs

Adding type hints would improve code maintainability and IDE support.

Apply this diff to add type hints:

def river_route_inputs(sdf: pd.DataFrame,
                      save_dir: str,
                      id_field: str = 'LINKNO',
                      ds_id_field: str = 'DSLINKNO', ) -> None:
    """
    Create river route input files from a stream network dataframe

    Produces the following files:
+       - connectivity.parquet: Contains river connectivity information
+       - routing_parameters.parquet: Contains routing parameters

    Args:
        sdf: Stream network dataframe
        save_dir: Directory to save outputs
        id_field: Field containing unique stream IDs
        ds_id_field: Field containing downstream stream IDs

    Returns:
        None
+
+    Raises:
+        ValueError: If required columns are missing
    """
+    required_columns = {id_field, ds_id_field, 'musk_x', 'musk_k'}
+    missing_columns = required_columns - set(sdf.columns)
+    if missing_columns:
+        raise ValueError(f"Missing required columns: {missing_columns}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 4260eff and eaa4bbc.

⛔ Files ignored due to path filters (2)
  • tdxhydrorapid/network_data/lake_table.csv is excluded by !**/*.csv
  • tdxhydrorapid/network_data/processing_options.xlsx is excluded by !**/*.xlsx
📒 Files selected for processing (11)
  • run_1.1_lakes.py (1 hunks)
  • run_1_tdx_gpkg_to_geoparquet.py (4 hunks)
  • run_3_tdxhydro_rapid_masters.py (5 hunks)
  • run_4_make_vpu_files.py (3 hunks)
  • run_5_global_geometry.py (1 hunks)
  • run_7_catchments.py (1 hunks)
  • tdxhydrorapid/__init__.py (1 hunks)
  • tdxhydrorapid/_validate.py (2 hunks)
  • tdxhydrorapid/inputs.py (16 hunks)
  • tdxhydrorapid/network.py (9 hunks)
  • tdxhydrorapid/weights.py (5 hunks)
✅ Files skipped from review due to trivial changes (1)
  • tdxhydrorapid/init.py
🧰 Additional context used
🪛 Ruff (0.8.0)
run_7_catchments.py

6-6: tqdm imported but unused

Remove unused import: tqdm

(F401)

tdxhydrorapid/weights.py

302-305: Use ternary operator gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid instead of if-else-block

Replace if-else-block with gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid

(SIM108)


308-308: Multiple statements on one line (colon)

(E701)


309-309: Undefined name dys

(F821)


315-315: Multiple statements on one line (colon)

(E701)

run_4_make_vpu_files.py

19-19: Use '/Users/ricky/Downloads/vpu-boundaries.gpkg' instead of '/Users/ricky/Downloads/vpu-boundaries.gpkg' or ...

Replace with '/Users/ricky/Downloads/vpu-boundaries.gpkg'

(SIM222)


64-64: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)

tdxhydrorapid/network.py

59-59: Use len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2 instead of not len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2

Replace with != operator

(SIM201)


63-63: Use max(pred_orders) != min_order_to_keep - 1 instead of not max(pred_orders) == min_order_to_keep - 1

Replace with != operator

(SIM201)


90-90: Loop control variable index not used within loop body

Rename unused index to _index

(B007)

tdxhydrorapid/inputs.py

32-33: Undefined name riverconcat_tdxregions in __all__

(F822)


212-215: Use ternary operator downstream = downstream[0] if downstream else -1 instead of if-else-block

Replace if-else-block with downstream = downstream[0] if downstream else -1

(SIM108)


302-302: Loop control variable key not used within loop body

(B007)


465-465: Use feat[ds_id_field].values == -1 instead of not feat[ds_id_field].values != -1

Replace with == operator

(SIM202)


544-544: Loop control variable idx not used within loop body

Rename unused idx to _idx

(B007)

run_1_tdx_gpkg_to_geoparquet.py

9-9: shapely.geometry.Point imported but unused

Remove unused import: shapely.geometry.Point

(F401)


56-56: Undefined name gdf

(F821)


56-56: Undefined name gdf

(F821)

🔇 Additional comments (10)
run_1.1_lakes.py (2)

25-75: Ensure Type Hint Compatibility with Python Version

The type hints using dict[int, int] require Python 3.9 or later. If the project needs to support earlier Python versions, use Dict[int, int] from the typing module instead.

Apply this diff to update type hints for compatibility:

+from typing import Dict

 def create_directed_graphs(df: gpd.GeoDataFrame,
                            id_field='LINKNO',
                            ds_id_field='DSLINKNO', ) -> nx.DiGraph:
     G: nx.DiGraph = nx.from_pandas_edgelist(df[df[ds_id_field] != -1],
                                             source=id_field,
                                             target=ds_id_field,
                                             create_using=nx.DiGraph)
     G.add_nodes_from(df[id_field].values)
     return G

 # ...

-    lake_id_dict: dict[int, int] = intersect['Lake_id'].to_dict()
-    lake_polygon_dict: dict[int, Polygon] = lakes_subset.set_index('Lake_id')['geometry'].to_dict() 
-    geom_dict: dict[int, Polygon] = gdf.set_index('LINKNO')['geometry'].to_dict()
+    lake_id_dict: Dict[int, int] = intersect['Lake_id'].to_dict()
+    lake_polygon_dict: Dict[int, Polygon] = lakes_subset.set_index('Lake_id')['geometry'].to_dict() 
+    geom_dict: Dict[int, Polygon] = gdf.set_index('LINKNO')['geometry'].to_dict()

83-84: 🛠️ Refactor suggestion

Handle Lakes with Multiple Outlets Gracefully

Currently, the script raises a RuntimeError if a lake has multiple outlets:

if len(outlets) != 1:
    raise RuntimeError(f'Lake has {len(outlets)} outlets')

Consider handling lakes with multiple outlets more gracefully, such as logging a warning and continuing processing, or implementing the logic to support multiple outlets if applicable.

Would you like assistance in modifying the code to handle multiple outlets?

run_1_tdx_gpkg_to_geoparquet.py (1)

43-44: Ensure Output Directories Are Correctly Named

The os.makedirs calls are creating directories with potentially incorrect paths:

os.makedirs(gpq_dir, exist_ok=True)
os.makedirs(save_dir, exist_ok=True)

Verify that gpq_dir and save_dir are correctly set and that the directories are intended to be created at these locations. Additionally, consider whether save_dir is necessary if it's not used elsewhere in the script.

run_4_make_vpu_files.py (1)

13-15: ⚠️ Potential issue

Replace hardcoded absolute paths with environment variables or configuration.

The hardcoded absolute paths make the code environment-dependent and non-portable. This is particularly problematic for paths that reference user-specific directories.

Consider using environment variables or a configuration file:

-tdx_inputs_dir = '/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs'
-og_pqs = glob.glob("/Users/ricky/tdxhydro-postprocessing/test/pqs/TDX_streamnet_*_01.parquet")
-final_output_dir = '/Users/ricky/tdxhydro-postprocessing/test/vpus'
-vpu_boundaries = '/Users/ricky/Downloads/vpu-boundaries.gpkg' or None
+tdx_inputs_dir = os.getenv('TDX_INPUTS_DIR', 'test/rapid_inputs')
+og_pqs = glob.glob(os.path.join(os.getenv('TDX_PQS_DIR', 'test/pqs'), "TDX_streamnet_*_01.parquet"))
+final_output_dir = os.getenv('FINAL_OUTPUT_DIR', 'test/vpus')
+vpu_boundaries = os.getenv('VPU_BOUNDARIES_PATH')

Also applies to: 19-19

run_3_tdxhydro_rapid_masters.py (2)

Line range hint 21-22: Replace hardcoded paths with environment variables or configuration.

Similar to other files, the hardcoded paths make the code environment-dependent.

-inputs_path = '/Volumes/T9Hales4TB/TDXHydroGeoParquet'
-outputs_path = '/Volumes/T9Hales4TB/geoglows2/tdxhydro-inputs'
+inputs_path = os.getenv('TDX_HYDRO_INPUTS', 'data/TDXHydroGeoParquet')
+outputs_path = os.getenv('TDX_HYDRO_OUTPUTS', 'data/tdxhydro-inputs')

Also applies to: 46-46


69-73: LGTM! Well-structured lake processing configuration.

The new configuration options for lake processing are well-organized and properly typed. The changes align with the PR objective of removing unwanted lake geometries.

Also applies to: 83-87, 103-107

tdxhydrorapid/weights.py (3)

4-13: LGTM! The new imports align with the performance objectives.

The addition of dask_geopandas and related imports supports the goal of improving performance through parallel processing.

Also applies to: 22-22


325-328: LGTM! Clean and efficient implementation.

The generator function is well-designed for memory efficiency when dealing with multiple grids.


79-88: 🛠️ Refactor suggestion

Optimize projection timing in overlay function

The projection to CEA should occur before the intersection operation to ensure geometric accuracy. Currently, it's done after the intersection which could lead to less accurate results.

Apply this diff to fix the projection timing:

def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Performs a spatial intersection overlay of two GeoDataFrames using Dask.
    Reproject geometries to a Cylindrical Equal Area projection.
    """
+   left = left.to_crs({'proj': 'cea'})
+   right = right.to_crs({'proj': 'cea'})
    return gpd.GeoDataFrame(
        left.sjoin(right.assign(right_geometry=right.geometry))
-       .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
+       .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry))
        .drop(columns="right_geometry")
        .compute()).sort_index()

Likely invalid or redundant comment.

tdxhydrorapid/network.py (1)

6-7: LGTM! Consistent with performance improvements.

The addition of Dask-related imports aligns with the performance optimization goals stated in the PR.

Also applies to: 22-22

run_1.1_lakes.py Outdated
Comment on lines 21 to 23
lakes_parquet = '/Users/ricky/Downloads/GLAKES/all_lakes_filtered.parquet' # https://garslab.com/?p=234 August 24, 2022
gpq_dir = '/Volumes/EB406_T7_3/geoglows_v3/parquets*'
save_dir = './tdxhydrorapid/network_data/'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace Hardcoded Paths with Configurable Parameters

The paths lakes_parquet, gpq_dir, and save_dir are hardcoded with absolute paths specific to your local file system. This can cause issues when running the script in different environments or when sharing the code with others. Consider using command-line arguments, environment variables, or a configuration file to make these paths configurable.

run_1.1_lakes.py Outdated
Comment on lines 47 to 49
lakes_gdf = gpd.read_parquet(lakes_parquet)
lakes_gdf = lakes_gdf[lakes_gdf['Area_PW'] > 1]
lakes_gdf['geometry'] = lakes_gdf['geometry'].apply(fill_holes) # Remove holes from lakes, could lead to issues...
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add Error Handling for Missing or Incorrect Data Columns

The script assumes that the Area_PW column exists in the lakes_gdf DataFrame. If this column is missing or contains unexpected data types, the script will raise a KeyError or other exceptions. Add error handling to check for the existence and validity of the Area_PW column before performing operations.

Apply this diff to add error handling:

 if __name__ == "__main__":
     logging.info('Getting Lake Polygons')
     
     lakes_gdf = gpd.read_parquet(lakes_parquet)
+    if 'Area_PW' not in lakes_gdf.columns:
+        raise KeyError("Column 'Area_PW' not found in lakes data.")
     lakes_gdf = lakes_gdf[lakes_gdf['Area_PW'] > 1]
     lakes_gdf['geometry'] = lakes_gdf['geometry'].apply(fill_holes) # Remove holes from lakes, could lead to issues...

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +3 to +5
gdf = dgpd.read_parquet('/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs/*/*geoparquet', filesystem='arrow', columns=['LINKNO', 'geometry'])
gdf['geometry'] = gdf['geometry'].simplify(0.001, preserve_topology=False)
gdf.compute().to_file('test_global_streams_simplified_dask.gpkg', driver='GPKG')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ensure File Path Patterns Are Correct and Configurable

The file path pattern used in dgpd.read_parquet is hardcoded and may not correctly match all intended files:

gdf = dgpd.read_parquet('/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs/*/*geoparquet', filesystem='arrow', columns=['LINKNO', 'geometry'])

Consider making the file path configurable and verify that the pattern matches all required files. Also, ensure that the use of wildcards (*) is appropriate for the directory structure.

Apply this diff to use a configurable file path:

+import sys
+import argparse

+parser = argparse.ArgumentParser(description='Process global geometries.')
+parser.add_argument('--input_path', type=str, required=True, help='Path to input geoparquet files.')
+parser.add_argument('--output_file', type=str, default='global_streams_simplified.gpkg', help='Output file name.')
+args = parser.parse_args()

-gdf = dgpd.read_parquet('/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs/*/*geoparquet', filesystem='arrow', columns=['LINKNO', 'geometry'])
+gdf = dgpd.read_parquet(args.input_path, filesystem='arrow', columns=['LINKNO', 'geometry'])
 gdf['geometry'] = gdf['geometry'].simplify(0.001, preserve_topology=False)
-gdf.compute().to_file('test_global_streams_simplified_dask.gpkg', driver='GPKG')
+gdf.compute().to_file(args.output_file, driver='GPKG')

Comment on lines +31 to +36
gdf = gpd.read_parquet(catchments_gpq)
gdf, lake_gdf = (
rp
.network
.dissolve_catchments(tdx, gdf)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle Missing Catchments Data Gracefully

When reading the catchments GeoDataFrame and processing it with dissolve_catchments, there may be cases where the catchment data is missing or empty. Add error handling to check if gdf is empty before proceeding.

Apply this diff to add a check:

 gdf = gpd.read_parquet(catchments_gpq)
+if gdf.empty:
+    logging.warning(f"No catchments data found for {tdxnumber}. Skipping.")
+    continue
 gdf, lake_gdf = (
     rp
     .network
     .dissolve_catchments(tdx, gdf)
 )

Committable suggestion skipped: line range outside the PR's diff.

@@ -73,6 +73,7 @@ def _calculate_geodesic_length(line) -> float:
]]

else:
gdf['LINKNO'] = gdf['streamID'].astype(int) + (tdx_header_number * 10_000_000)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Undefined Variable gdf

Similar to the previous issue, gdf is undefined in this block. Ensure gdf is read correctly.

Apply this diff to read gdf:

             else:
+                gdf = gpd.read_file(gpkg)
                 gdf['LINKNO'] = gdf['streamID'].astype(int) + (tdx_header_number * 10_000_000)
                 gdf = gdf.drop(columns=['streamID'])

Committable suggestion skipped: line range outside the PR's diff.

if 'streamnet' in os.path.basename(gpkg):
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Undefined Variable gdf

The variable gdf is used without being defined in this block. This will raise a NameError.

Apply this diff to define gdf before using it:

             if 'streamnet' in os.path.basename(gpkg):
+                gdf = gpd.read_file(gpkg)
                 gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000)
                 gdf['DSLINKNO'] = gdf['DSLINKNO'].astype(int)
                 gdf.loc[gdf['DSLINKNO'] != -1, 'DSLINKNO'] = gdf['DSLINKNO'] + (tdx_header_number * 10_000_000)
                 gdf['strmOrder'] = gdf['strmOrder'].astype(int)
                 gdf['LengthGeodesicMeters'] = gdf['geometry'].apply(_calculate_geodesic_length)
                 gdf['TDXHydroRegion'] = region_number

Ensure that gdf is properly read from the GeoPackage file before processing.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000)
if 'streamnet' in os.path.basename(gpkg):
gdf = gpd.read_file(gpkg)
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000)
gdf['DSLINKNO'] = gdf['DSLINKNO'].astype(int)
gdf.loc[gdf['DSLINKNO'] != -1, 'DSLINKNO'] = gdf['DSLINKNO'] + (tdx_header_number * 10_000_000)
gdf['strmOrder'] = gdf['strmOrder'].astype(int)
gdf['LengthGeodesicMeters'] = gdf['geometry'].apply(_calculate_geodesic_length)
gdf['TDXHydroRegion'] = region_number
🧰 Tools
🪛 Ruff (0.8.0)

56-56: Undefined name gdf

(F821)


56-56: Undefined name gdf

(F821)

Comment on lines 301 to 324
def get_weight_table_name_from_grid(grid: Union[str, gpd.GeoDataFrame], warn=False) -> str:
if isinstance(grid, str):
gpq = gpd.read_parquet(grid)
else:
gpq = grid
dxs = gpq['lon'].round(12).drop_duplicates().sort_values().diff().dropna().unique()
if len(dxs) > 1:
if warn: logger.warning(f'Multiple dx values found in {grid}, using median')
dx = round(np.median(dys), 12)
else:
dx = round(dxs[0], 12)

dys = gpq['lat'].round(12).drop_duplicates().sort_values().diff().dropna().unique()
if len(dys) > 1:
if warn: logger.warning(f'Multiple dy values found in {grid}, using median')
dy = round(np.median(dys), 12)
else:
dy = round(dys[0], 12)

xinit = gpq['lon'].round(12).min()
yinit = gpq['lat'].round(12).min()

return f"weight_xinit={xinit}_yinit={yinit}_dx={dx}_dy={dy}_.csv"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix undefined variable in get_weight_table_name_from_grid

There's a critical bug in the median calculation where dys is used instead of dxs.

Apply this diff to fix the bug:

    if len(dxs) > 1:
        if warn: logger.warning(f'Multiple dx values found in {grid}, using median')
-       dx = round(np.median(dys), 12)
+       dx = round(np.median(dxs), 12)
    else:
        dx = round(dxs[0], 12)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_weight_table_name_from_grid(grid: Union[str, gpd.GeoDataFrame], warn=False) -> str:
if isinstance(grid, str):
gpq = gpd.read_parquet(grid)
else:
gpq = grid
dxs = gpq['lon'].round(12).drop_duplicates().sort_values().diff().dropna().unique()
if len(dxs) > 1:
if warn: logger.warning(f'Multiple dx values found in {grid}, using median')
dx = round(np.median(dys), 12)
else:
dx = round(dxs[0], 12)
dys = gpq['lat'].round(12).drop_duplicates().sort_values().diff().dropna().unique()
if len(dys) > 1:
if warn: logger.warning(f'Multiple dy values found in {grid}, using median')
dy = round(np.median(dys), 12)
else:
dy = round(dys[0], 12)
xinit = gpq['lon'].round(12).min()
yinit = gpq['lat'].round(12).min()
return f"weight_xinit={xinit}_yinit={yinit}_dx={dx}_dy={dy}_.csv"
def get_weight_table_name_from_grid(grid: Union[str, gpd.GeoDataFrame], warn=False) -> str:
if isinstance(grid, str):
gpq = gpd.read_parquet(grid)
else:
gpq = grid
dxs = gpq['lon'].round(12).drop_duplicates().sort_values().diff().dropna().unique()
if len(dxs) > 1:
if warn: logger.warning(f'Multiple dx values found in {grid}, using median')
dx = round(np.median(dxs), 12)
else:
dx = round(dxs[0], 12)
dys = gpq['lat'].round(12).drop_duplicates().sort_values().diff().dropna().unique()
if len(dys) > 1:
if warn: logger.warning(f'Multiple dy values found in {grid}, using median')
dy = round(np.median(dys), 12)
else:
dy = round(dys[0], 12)
xinit = gpq['lon'].round(12).min()
yinit = gpq['lat'].round(12).min()
return f"weight_xinit={xinit}_yinit={yinit}_dx={dx}_dy={dy}_.csv"
🧰 Tools
🪛 Ruff (0.8.0)

302-305: Use ternary operator gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid instead of if-else-block

Replace if-else-block with gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid

(SIM108)


308-308: Multiple statements on one line (colon)

(E701)


309-309: Undefined name dys

(F821)


315-315: Multiple statements on one line (colon)

(E701)

Comment on lines 353 to 410
def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, id_field: str = 'LINKNO') -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
headwater_dissolve_path = os.path.join(save_dir, 'mod_dissolve_headwater.csv')

if os.path.exists(headwater_dissolve_path):
with open(headwater_dissolve_path) as f:
o2_to_dissolve = pd.read_csv(f).fillna(-1).astype(int)
updates = {}
for streams_to_merge in o2_to_dissolve.values:
valid_streams = streams_to_merge[streams_to_merge != -1]
updates.update({stream: valid_streams[0] for stream in valid_streams})

gdf[id_field] = gdf[id_field].map(updates).fillna(gdf[id_field]).astype(int)

streams_to_prune_path = os.path.join(save_dir, 'mod_prune_streams.csv')
if os.path.exists(streams_to_prune_path):
with open(streams_to_prune_path) as f:
ids_to_prune = pd.read_csv(f).astype(int).set_index('LINKTODROP')['LINKNO'].to_dict()
gdf[id_field] = gdf[id_field].map(ids_to_prune).fillna(gdf[id_field]).astype(int)

drop_streams_path = os.path.join(save_dir, 'mod_drop_small_trees.csv')
if os.path.exists(drop_streams_path):
with open(drop_streams_path) as f:
ids_to_drop = pd.read_csv(f).astype(int)
gdf = gdf[~gdf[id_field].isin(ids_to_drop.values.flatten())]


short_streams_path = os.path.join(save_dir, 'mod_merge_short_streams.csv')
if os.path.exists(short_streams_path):
with open(short_streams_path) as f:
short_streams = pd.read_csv(f).astype(int)
for streams_to_merge in short_streams.values:
gdf.loc[gdf[id_field].isin(streams_to_merge), id_field] = streams_to_merge[0]

lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json')
lake_outlets = set()
if os.path.exists(lake_streams_path):
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False)
streams_to_delete = set()
for outlet, _, lake_streams in lake_streams_df.itertuples():
outlet = int(outlet)
lake_outlets.add(outlet)
if outlet in gdf[id_field].values:
gdf.loc[gdf[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)

gdf = gdf[~gdf[id_field].isin(streams_to_delete)]

# dissolve the geometries based on shared value in the id field
logging.info('\tDissolving catchments')
dgdf: dd.DataFrame = dd.from_pandas(gdf, npartitions=os.cpu_count()*2)
dgdf: dgpd.GeoDataFrame = dgpd.from_dask_dataframe(dgdf.shuffle(on=id_field))
gdf = dgdf.dissolve(by=id_field).compute().reset_index()
if lake_outlets:
lake_gdf = gdf[gdf[id_field].isin(lake_outlets)]
else:
lake_gdf = gpd.GeoDataFrame()
return gdf, lake_gdf
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for dissolve_catchments

The function should handle potential errors when reading files and processing data.

Apply this diff to add error handling:

def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, id_field: str = 'LINKNO') -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
+    if not isinstance(gdf, gpd.GeoDataFrame):
+        raise TypeError("gdf must be a GeoDataFrame")
+    if id_field not in gdf.columns:
+        raise ValueError(f"id_field '{id_field}' not found in GeoDataFrame columns")

     headwater_dissolve_path = os.path.join(save_dir, 'mod_dissolve_headwater.csv')
     if os.path.exists(headwater_dissolve_path):
-        with open(headwater_dissolve_path) as f:
-            o2_to_dissolve = pd.read_csv(f).fillna(-1).astype(int)
+        try:
+            with open(headwater_dissolve_path) as f:
+                o2_to_dissolve = pd.read_csv(f).fillna(-1).astype(int)
+        except (IOError, pd.errors.EmptyDataError) as e:
+            logger.error(f"Error reading headwater dissolve file: {e}")
+            raise

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +50 to +54
dissolve_lakes: bool = True,
drop_islands: bool = True,
drop_ocean_watersheds: bool = True,
drop_within_sea: bool = True,
drop_low_flow: bool = False,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve documentation and parameter validation

The new parameters need better documentation and validation to ensure correct usage.

Apply this diff to improve the function:

def rapid_master_files(streams_gpq: str,
                      save_dir: str,
                      id_field: str = 'LINKNO',
                      ds_id_field: str = 'DSLINKNO',
                      length_field: str = 'Length',
                      default_velocity_factor: float = None,
                      default_x: float = .25,
                      drop_small_watersheds: bool = True,
                      dissolve_headwaters: bool = True,
                      prune_branches_from_main_stems: bool = True,
                      merge_short_streams: bool = True,
                      cache_geometry: bool = True,
                      dissolve_lakes: bool = True,
                      drop_islands: bool = True,
                      drop_ocean_watersheds: bool = True,
                      drop_within_sea: bool = True,
                      drop_low_flow: bool = False,
                      min_drainage_area_m2: float = 200_000_000,
                      min_headwater_stream_order: int = 3,
                      min_velocity_factor: float = 0.25,
                      min_k_value: int = 900,
                      lake_min_k: int = 3600, ) -> None:
    """
    Create RAPID master files from a stream network

    Args:
        streams_gpq: Path to the streams geoparquet file
        save_dir: Path to save the master files
        id_field: Field name for the link id
        ds_id_field: Field name for the downstream link id
        length_field: Field name for stream segment length
        default_velocity_factor: Default velocity factor for Muskingum routing
        default_x: Default attenuation factor for Muskingum routing
        drop_small_watersheds: Whether to drop small watersheds
        dissolve_headwaters: Whether to dissolve headwater branches
        prune_branches_from_main_stems: Whether to prune branches
        merge_short_streams: Whether to merge short streams
        cache_geometry: Whether to cache geometry
+       dissolve_lakes: Whether to dissolve lake geometries
+       drop_islands: Whether to remove island streams
+       drop_ocean_watersheds: Whether to remove ocean watersheds
+       drop_within_sea: Whether to remove streams within sea
+       drop_low_flow: Whether to remove low flow streams
        min_drainage_area_m2: Minimum drainage area to keep
        min_headwater_stream_order: Minimum stream order for headwaters
        min_velocity_factor: Minimum velocity factor
        min_k_value: Target minimum k value
        lake_min_k: Minimum k value for lakes

    Raises:
        FileNotFoundError: If required files are missing
        ValueError: If input parameters are invalid
    """
+    if not os.path.exists(streams_gpq):
+        raise FileNotFoundError(f"Streams file not found: {streams_gpq}")
+    if not os.path.exists(save_dir):
+        raise FileNotFoundError(f"Save directory not found: {save_dir}")
+    if min_drainage_area_m2 <= 0:
+        raise ValueError("min_drainage_area_m2 must be positive")
+    if min_headwater_stream_order < 1:
+        raise ValueError("min_headwater_stream_order must be at least 1")

Also applies to: 71-71, 94-94

for river_id in gdf[gdf['LengthGeodesicMeters'] <= 0.01][id_field].values:
feat = gdf[gdf[id_field] == river_id]
upstreams = set(gdf[gdf[ds_id_field] == river_id][id_field])
if not feat[ds_id_field].values != -1 and all([x != -1 for x in upstreams]):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect condition in create_nexus_points

The condition uses a confusing double negative that could lead to bugs.

Apply this diff to fix the logic:

-        if not feat[ds_id_field].values != -1 and all([x != -1 for x in upstreams]):
+        if feat[ds_id_field].values == -1 and all([x != -1 for x in upstreams]):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not feat[ds_id_field].values != -1 and all([x != -1 for x in upstreams]):
if feat[ds_id_field].values == -1 and all([x != -1 for x in upstreams]):
🧰 Tools
🪛 Ruff (0.8.0)

465-465: Use feat[ds_id_field].values == -1 instead of not feat[ds_id_field].values != -1

Replace with == operator

(SIM202)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (17)
tdxhydrorapid/inputs.py (6)

51-55: Ensure robust validation for new boolean parameters.

The newly introduced parameters (dissolve_lakes, drop_islands, drop_ocean_watersheds, drop_within_sea, drop_low_flow) provide granular control over stream processing. However, no checks confirm these parameters are used meaningfully (e.g., not mutually exclusive, or required in certain scenarios). Consider validating parameter combinations or providing better error messages for misconfigurations.


252-314: Check edge cases for island removal.

The island removal logic relies on multiple loops and conditions to determine valid outlet replacements. Edge cases could arise if an island merges with multiple outlets or if there are unresolved references to partially removed streams. Consider adding more defensive checks or a fallback scenario to handle unrecognized conditions.

🧰 Tools
🪛 Ruff (0.8.2)

301-301: Loop control variable key not used within loop body

(B007)


388-389: Clarify rationale for musk_x = 0.01 on lakes.

Setting musk_x to a very low value provides substantial attenuation, but any edge-case scenarios where 0.01 is still too high/low? If you rely on hydrological data, a data-driven approach or an override parameter might be safer.


449-472: Expand unit testing for create_nexus_points.

While generating nexus points for extremely short streams, edge cases involving unusual geometry or partial merges are possible. To prevent silent failures, ensure robust unit tests cover:

  1. Streams with multiple upstreams but no valid downstream.
  2. Streams bridging multiple confluences.

Would you like help creating or updating these tests?


624-662: Consider factoring out repeated connectivity code.

Both rapid_input_csvs and river_route_inputs build connectivity structures from the same graph approach. You might unify the logic in a shared helper function, reducing duplication.


663-699: Gracefully handle missing VPU codes during concatenation.

The fallback mechanism attempts to find a valid VPU for each terminal link. If a link still can’t be resolved, the process raises a RuntimeError. For better usability, consider providing more context (e.g., partial merges, suggested solutions) or partial fallback if feasible.

environment.yml (2)

7-8: Double-check if geopandas>=1.0 covers known bug fixes.

Ensure there's no need for a higher pinned version to avoid potential geometry or coordinate reference system issues on large datasets.


10-10: Add a comment about networkx usage.

Since networkx is newly added, consider documenting how or why it is used for future maintainers.

run_1.1_lakes.py (3)

33-34: Allow external configuration for hardcoded paths.

Paths such as gpq_dir and save_dir are set in code, making them environment-specific. Providing overrides (e.g., CLI args or environment variables) can boost portability.


61-71: Include fallback for empty lake intersections.

You skip post-processing if no intersections are found, but partial or malformed files might produce false empties. Log potential mismatches to help debugging.


136-139: Optimize assignment for outlet_list.

A ternary operator or short guard clause can replace the if not isinstance(...) block to make code tidier and avoid potential confusion about outlet being a list.

🧰 Tools
🪛 Ruff (0.8.2)

136-139: Use ternary operator outlet_list = [outlet] if not isinstance(outlet, list) else outlet instead of if-else-block

Replace if-else-block with outlet_list = [outlet] if not isinstance(outlet, list) else outlet

(SIM108)

tdxhydrorapid/weights.py (2)

240-257: Add error handling or logging for lake merges.

If reading mod_dissolve_lakes.json results in unexpected structures, consider logging mismatches or partial merges to help diagnose.


295-295: Safeguard merges when IDs are missing.

The code only continues if both IDs exist. Some merges might remain partially applied, leaving orphan references. Consider logging or storing these partial merges for later inspection.

tdxhydrorapid/network.py (4)

23-24: Elevate or remove dissolve_catchments if widely needed.

If dissolving catchments is a core operation, consider moving it into a more central location or clarifying its usage.


36-37: Validate presence of required columns.

create_directed_graphs uses df[df[ds_id_field] != -1], but no checks confirm ds_id_field indeed exists or that values are valid integers. A safeguard can prevent silent failures.


85-125: Cascade merges in find_branches_to_prune.

Logic for merging sibling streams can become complex if multiple merges happen in chain. Track merges carefully to avoid losing references or skipping partial merges.

Would you like me to draft a solution that systematically updates merges across repeated passes?

🧰 Tools
🪛 Ruff (0.8.2)

92-92: Loop control variable index not used within loop body

Rename unused index to _index

(B007)


351-409: Collect metrics on dissolved catchments for debugging.

After dissolving catchments, it may help to record how many merges took place or how many streams got dropped. This data can assist in troubleshooting complex merges.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eaa4bbc and c7f8d6a.

⛔ Files ignored due to path filters (1)
  • tdxhydrorapid/network_data/lake_table.csv is excluded by !**/*.csv
📒 Files selected for processing (7)
  • environment.yml (1 hunks)
  • run_1.1_lakes.py (1 hunks)
  • run_1_tdx_gpkg_to_geoparquet.py (4 hunks)
  • run_7_catchments.py (1 hunks)
  • tdxhydrorapid/inputs.py (19 hunks)
  • tdxhydrorapid/network.py (8 hunks)
  • tdxhydrorapid/weights.py (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • run_1_tdx_gpkg_to_geoparquet.py
  • run_7_catchments.py
🧰 Additional context used
🪛 Ruff (0.8.2)
run_1.1_lakes.py

136-139: Use ternary operator outlet_list = [outlet] if not isinstance(outlet, list) else outlet instead of if-else-block

Replace if-else-block with outlet_list = [outlet] if not isinstance(outlet, list) else outlet

(SIM108)

tdxhydrorapid/weights.py

306-309: Use ternary operator gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid instead of if-else-block

Replace if-else-block with gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid

(SIM108)


312-312: Multiple statements on one line (colon)

(E701)


319-319: Multiple statements on one line (colon)

(E701)

tdxhydrorapid/network.py

92-92: Loop control variable index not used within loop body

Rename unused index to _index

(B007)

tdxhydrorapid/inputs.py

33-34: Undefined name riverconcat_tdxregions in __all__

(F822)


211-214: Use ternary operator downstream = downstream[0] if downstream else -1 instead of if-else-block

Replace if-else-block with downstream = downstream[0] if downstream else -1

(SIM108)


301-301: Loop control variable key not used within loop body

(B007)


539-539: Loop control variable idx not used within loop body

Rename unused idx to _idx

(B007)

🔇 Additional comments (9)
tdxhydrorapid/inputs.py (3)

354-375: Re-evaluate the outlet distance update logic for lakes.

This section adds the maximum straight-line distance to the outlet’s length. In some cases, merging multiple inlets with varied distances might cause underestimation or overestimation of the lake’s effective length. You might consider averaging or applying a more stable metric if multiple inlets differ significantly.


722-724: Double-check the usage of new parameters use_rapid and vpu_boundaries.

The updated parameters control whether RAPID input files or alternative route files are generated. Validate that the code paths remain consistent if vpu_boundaries is missing or if use_rapid toggles mid-run.


141-182: Potential for partial updates to sgdf when dissolving lakes.

While updating stream connectivity for inlets and removing ancestral streams, complex networks can occur if multiple lakes share partial upstreams. Consider adding comprehensive checks or logging to detect partial merges or edge cases.

Run the following script to scan the codebase for references to potential multi-lake conflicts:

environment.yml (2)

6-7: Confirm that dask-geopandas works with Python≥3.11.

While these version constraints often suffice, confirm that dask-geopandas is fully compatible with Python 3.11 in your environment.


15-15: Confirm pyogrio usage.

pyogrio can improve I/O performance, but ensure you’ve tested it thoroughly. If it’s not used heavily, you might avoid extra dependencies.

run_1.1_lakes.py (3)

49-50: Confirm the threshold for Area_PW > 3.

Filtering lakes by Area_PW > 3 is presumably empirical. Re-verify the threshold if large but critical lakes are accidentally excluded.


81-85: Recheck projection handling in create_directed_graphs.

This function relies on raw geometry but does not forcibly reproject. For large-scale or multi-CRS data, consider ensuring consistent coordinate reference systems before building graphs.


242-244: Ensure partial merges handle multiple inlets.

When multiple inlets converge on the same outlet, you update the stream references. Confirm that partial merges or overlapping references do not leave the data in an inconsistent state.

tdxhydrorapid/weights.py (1)

229-230: Confirm structured iteration for o2_to_dissolve.

You map each streams_to_merge array in a single comprehension, which can misalign if the DataFrame columns aren’t in the expected order. Double-check columns or consider a safer row-wise approach like itertuples().

Comment on lines +33 to 34
'river'
'concat_tdxregions',
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix missing comma in __all__ list.

There's a missing comma between 'river' and 'concat_tdxregions', which can cause a syntax or name error.

-    'river'
-    'concat_tdxregions',
+    'river',
+    'concat_tdxregions',
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
'river'
'concat_tdxregions',
'river',
'concat_tdxregions',
🧰 Tools
🪛 Ruff (0.8.2)

33-34: Undefined name riverconcat_tdxregions in __all__

(F822)

Comment on lines +81 to +89
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Performs a spatial intersection overlay of two GeoDataFrames using Dask.
Reproject geometries to a Cylindrical Equal Area projection.
"""
return gpd.GeoDataFrame(
left.sjoin(right.assign(right_geometry=right.geometry))
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
.drop(columns="right_geometry")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refine overlay logic to avoid potential re-projection confusion.

Applying .to_crs({'proj': 'cea'}) directly in the lambda might lead to a partial or repeated re-projection if used further. Also consider re-projecting inputs strictly once before intersection for clarity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant