-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove unwanted lake geometries #5
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (1)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the WalkthroughThe changes involve updates to several scripts and configuration files to enhance the handling of hydrological data. The Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant run_1_tdx_gpkg_to_geoparquet
participant run_3_tdxhydro_rapid_masters
participant rapid_master_files
participant network
User->>run_1_tdx_gpkg_to_geoparquet: Process GeoPackage
run_1_tdx_gpkg_to_geoparquet->>run_3_tdxhydro_rapid_masters: Call with lake parameters
run_3_tdxhydro_rapid_masters->>rapid_master_files: Pass lake processing parameters
rapid_master_files->>network: Handle lakes and streams
network->>User: Return processed data
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files ignored due to path filters (2)
tdxhydrorapid/network_data/processing_options.xlsx
is excluded by!**/*.xlsx
tdxhydrorapid/network_data/~$processing_options.xlsx
is excluded by!**/*.xlsx
Files selected for processing (3)
- run_3_tdxhydro_rapid_masters.py (3 hunks)
- tdxhydrorapid/inputs.py (5 hunks)
- tdxhydrorapid/network.py (2 hunks)
Additional context used
Ruff
tdxhydrorapid/network.py
4-4:
json
imported but unusedRemove unused import:
json
(F401)
Additional comments not posted (11)
run_3_tdxhydro_rapid_masters.py (3)
66-66
: LGTM!The code for extracting the
DISSOLVE_LAKES
configuration parameter is consistent with the existing pattern and looks good.
76-76
: LGTM!The code for casting the
DISSOLVE_LAKES
parameter to a boolean type is consistent with the existing pattern and looks good.
91-91
: Verify the function signature change in the codebase.The code change looks good. However, ensure that all calls to the
rapid_master_files
function have been updated to pass thedissolve_lakes
argument.Run the following script to verify the function usage:
Verification successful
Changes to
rapid_master_files
function are correctly implemented.The
dissolve_lakes
argument is properly passed in the function call withinrun_3_tdxhydro_rapid_masters.py
, and the function definition intdxhydrorapid/inputs.py
has been updated to include this parameter. No other calls to this function were found in the codebase, so no further updates are necessary.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all calls to `rapid_master_files` pass the `dissolve_lakes` argument. # Test: Search for the function usage. Expect: Only occurrences with the `dissolve_lakes` argument. rg --type python -A 5 $'rapid_master_files'Length of output: 73
Script:
#!/bin/bash # Description: Find Python files and search for `rapid_master_files` function usage # Find all Python files echo "Python files in the repository:" fd -e py # Search for rapid_master_files function usage in Python files echo -e "\nFunction usage:" fd -e py -x grep -n "rapid_master_files" {} +Length of output: 1010
Script:
#!/bin/bash # Description: Examine rapid_master_files usage and definition # Check function call in run_3_tdxhydro_rapid_masters.py echo "Function call in run_3_tdxhydro_rapid_masters.py:" sed -n '/rapid_master_files/,/)/p' run_3_tdxhydro_rapid_masters.py # Check function definition in tdxhydrorapid/inputs.py echo -e "\nFunction definition in tdxhydrorapid/inputs.py:" sed -n '/def rapid_master_files/,/)/p' tdxhydrorapid/inputs.py # Fixed grep command for cleaner output echo -e "\nAll occurrences of rapid_master_files:" grep -n "rapid_master_files" $(fd -e py)Length of output: 3126
tdxhydrorapid/network.py (1)
313-325
: LGTM!The new code section for aggregating lake streams catchments looks good. It follows a clear and logical approach, handles the case when the JSON file doesn't exist, and uses appropriate data structures and operations for efficient processing. The variable names are also descriptive, enhancing code readability.
tdxhydrorapid/inputs.py (7)
121-183
: Approve the addition of lake dissolution logic.The new feature to handle lake geometries is a significant addition to the
rapid_master_files
function. The logic for constructing the directed graph of lakes, identifying inlets and streams that feed into the lakes, and updating the geometries in the GeoDataFrame by removing streams that are now part of the lakes is complex but well-structured.The generated JSON output file
mod_dissolve_lakes.json
captures the relationships between lakes and their inlets, which could be useful for further analysis or visualization.Overall, the changes enhance the functionality of the module by enabling the handling of lake geometries in the hydrological data processing workflow.
160-173
: Review the logic for identifying initial inlet streams and streams that feed into the lake.The logic for identifying initial inlet streams and streams that feed into the lake appears to be sound. Here's a breakdown of the steps:
Initial inlet streams are identified as the leaf nodes (nodes with no incoming edges) in the lake ancestors subgraph. Among these leaf nodes, the ones with a stream order of 1 are considered as part of the lake, while the rest are classified as inlets.
Streams that feed into the lake but are not part of the lake are identified by finding the predecessors of the inside streams (streams within the lake) that are not already part of the lake ancestors.
This approach effectively distinguishes between the initial inlet streams, streams that are part of the lake, and streams that feed into the lake from outside.
The logic is clear and concise, making it easy to understand and maintain.
178-179
: Approve the generation of the JSON output file.The generation of the JSON output file
mod_dissolve_lakes.json
is a valuable addition to the functionality. It captures the relationships between lakes and their inlets, providing a structured representation of the lake dissolution results.This JSON file can be used for further analysis, visualization, or integration with other tools and workflows.
The code for writing the JSON file is straightforward and follows the standard
json.dump()
approach, ensuring compatibility and ease of use.
180-180
: Approve the update of the GeoDataFrame by removing lake streams.The update of the GeoDataFrame by removing streams that are now part of the lakes is a necessary step to maintain the integrity and consistency of the stream network.
By removing the lake streams from the GeoDataFrame, the function ensures that the subsequent processing steps operate on a clean and updated representation of the stream network.
The code for updating the GeoDataFrame is concise and uses boolean indexing to efficiently filter out the lake streams.
182-182
: Approve the recreation of the directed graph after removing lake streams.The recreation of the directed graph after removing the lake streams is crucial to maintain the correct topology and connectivity of the stream network.
By recreating the directed graph based on the updated GeoDataFrame, the function ensures that the graph accurately represents the modified stream network, taking into account the removal of lake streams.
This step is essential for subsequent processing tasks that rely on the graph structure, such as identifying downstream nodes or performing network traversals.
The code for recreating the directed graph is clear and reuses the existing
create_directed_graphs()
function, promoting code reuse and maintainability.
304-313
: Approve the implementation of theis_stream_a_straight_line
function.The
is_stream_a_straight_line
function is a helper function used in the lake dissolution logic to determine if a stream is a straight line based on its geometry. The function takes the stream ID, ID field, and the GeoDataFrame as input parameters and returns a boolean value indicating whether the stream is a straight line or not.The logic of the function is as follows:
If the number of coordinates in the stream geometry is less than or equal to 2, the function immediately considers the stream as a straight line since a line with two points is always straight.
For geometries with more than 2 coordinates, the function calculates the straight-line distance between the first and last coordinates using the Euclidean distance formula.
It then compares the squared straight-line distance with the squared actual length of the stream geometry.
If the absolute difference between the squared distances is less than a small threshold (1e-7), the function considers the stream as a straight line. The threshold value is used to account for minor variations or numerical precision issues when comparing the distances.
The function is well-documented with clear parameter names and return values. It efficiently determines if a stream is a straight line based on its geometry, which is a crucial aspect of the lake dissolution logic.
The implementation is concise and easy to understand, making it maintainable and reusable in other parts of the codebase if needed.
132-158
: Verify the logic for identifying the true outlet of a lake.The logic for identifying the true outlet of a lake involves traversing the downstream nodes until a non-straight stream is encountered. This approach assumes that lake streams are typically straight lines, and the first non-straight stream encountered is considered the true outlet of the lake.
While this assumption may hold true in most cases, there could be scenarios where a lake stream is not a perfect straight line due to geographical variations or data granularity. It's important to verify that this assumption is valid for the specific dataset being processed.
Consider adding some tolerance or threshold when comparing the straight-line distance with the actual stream length to account for minor variations.
Verification successful
Existing implementation verified, but improvements suggested for robustness
The
is_stream_a_straight_line
function has been located and analyzed. While it does implement a tolerance check, the current approach may be too strict for real-world scenarios:
- The function uses a very small tolerance (1e-7) on squared lengths, which may not account for minor variations in stream shapes.
- Comparing squared lengths amplifies differences, potentially leading to false negatives for nearly straight streams.
Suggested improvements:
- Consider using a relative tolerance instead of an absolute one, e.g.,
abs(straight_line_len - true_len) / true_len < tolerance
.- Adjust the tolerance value based on the dataset's characteristics and required accuracy.
- Add a maximum absolute difference threshold to handle very short streams.
These changes would make the function more robust while maintaining the original intent of identifying straight lake streams.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the assumption that lake streams are typically straight lines. # Test: Check if there are any lake streams that are not perfectly straight lines. # Expect: No results or only a few edge cases. rg --type python -A 5 $'if len\\(geom.coords\\) <= 2:\n.*return True\n.*straight_line_len = \\(\\(np\\.asarray\\(geom\\.coords\\[-1\\]\\) - np\\.asarray\\(geom\\.coords\\[0\\]\\)\\) \\*\\* 2\\)\\.sum\\(\\)\n.*true_len = geom\\.length \\*\\* 2\n.*return abs\\(straight_line_len - true_len\\) < 1e-7'Length of output: 459
Script:
#!/bin/bash # Description: Search for the implementation of is_stream_a_straight_line function # Use ripgrep with multiline mode to find the function definition rg --type python -U "def is_stream_a_straight_line\(" -A 15 # If the above doesn't yield results, try a more general search rg --type python -U "straight_line_len.*true_len" -A 5 -B 5Length of output: 185
Script:
#!/bin/bash # Description: Search for the implementation of is_stream_a_straight_line function without relying on file type recognition # Search for the function definition in .py files rg -U "def is_stream_a_straight_line\(" -g "*.py" -A 15 # If the above doesn't yield results, try a more general search rg -U "straight_line_len.*true_len" -g "*.py" -A 5 -B 5 # If still no results, search for any mention of the function name rg "is_stream_a_straight_line" -g "*.py" -A 5 -B 5Length of output: 3479
tdxhydrorapid/network.py
Outdated
@@ -1,6 +1,7 @@ | |||
import glob | |||
import logging | |||
import os | |||
import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused import.
The json
module is imported but not used in the code. Please remove this unused import to keep the code clean.
-import json
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
import json |
Tools
Ruff
4-4:
json
imported but unusedRemove unused import:
json
(F401)
@rileyhales Here is pull request with correct, functioning version. Also, I have increased to performance significantly, about 10x speed up. To accomplish this, I use dask geopandas for intersecting with grids and lots of optimizations to some slow functions. Attached are some profiles. I've verified that my changes do not change the output files from what they were before (unless removing lakes, islands, etc.) profile_new_lakes.txt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Outside diff range and nitpick comments (16)
environment.yml (1)
6-7
: Document version requirement rationale.Consider adding comments to explain why these specific minimum versions are required. This helps future maintainers understand the constraints.
dependencies: - - python>=3.11 - - geopandas>=1.0 + # Python 3.11+ required for improved error handling and performance + - python>=3.11 + # geopandas 1.0+ needed for enhanced geospatial operations + - geopandas>=1.0🧰 Tools
🪛 yamllint
[warning] 6-6: wrong indentation: expected 0 but found 2
(indentation)
run_4_make_vpu_files.py (1)
Line range hint
1-13
: Add documentation and separate test configurations.The script lacks:
- A module-level docstring explaining its purpose and usage
- Clear separation between test and production configurations
Consider:
- Adding comprehensive documentation
- Moving test configurations to a separate test configuration file
- Adding type hints for better code maintainability
+""" +Create VPU (Vector Processing Unit) files from TDXHydro data. + +This script processes hydrological data and creates VPU files by: +1. Creating a master table from TDXHydro regions +2. Processing each VPU separately +3. Generating necessary input files for each VPU + +Environment Variables: + TDX_INPUTS_DIR: Directory containing TDXHydro input files + PARQUETS_DIR: Directory containing parquet files + FINAL_OUTPUT_DIR: Directory for output files +"""run_1_tdx_gpkg_to_geoparquet.py (2)
Line range hint
57-75
: Remove redundant LINKNO assignment and add data validation.There are several concerns in this block:
- The LINKNO assignment on line 57 is redundant as it's already processed on line 54
- There's no validation of input data ranges before applying the header number multiplication
Apply this diff to fix the issues:
- gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000) + # Validate input ranges to prevent integer overflow + if (gdf['LINKNO'].max() + (tdx_header_number * 10_000_000) > sys.maxsize or + gdf['DSLINKNO'].max() + (tdx_header_number * 10_000_000) > sys.maxsize): + raise ValueError("Integer overflow detected in ID generation") + gdf['DSLINKNO'] = gdf['DSLINKNO'].astype(int)
Line range hint
6-11
: Consider architectural improvements for better maintainability.Several architectural improvements could enhance the script:
- Replace hardcoded paths with environment variables or configuration files
- Add proper error handling around file operations
- Add progress tracking for large datasets
Consider these improvements:
+import os +from pathlib import Path +from tqdm import tqdm + -gpkg_dir = '/Volumes/T9Hales4TB/TDXHydro' -gpq_dir = '/Volumes/T9Hales4TB/TDXHydroGeoParquet' +gpkg_dir = os.getenv('TDX_HYDRO_DIR', '/Volumes/T9Hales4TB/TDXHydro') +gpq_dir = os.getenv('TDX_PARQUET_DIR', '/Volumes/T9Hales4TB/TDXHydroGeoParquet')And wrap the main processing loop with error handling and progress tracking:
- for gpkg in sorted(glob.glob(os.path.join(gpkg_dir, 'TDX*.gpkg'))): + gpkg_files = sorted(glob.glob(os.path.join(gpkg_dir, 'TDX*.gpkg'))) + for gpkg in tqdm(gpkg_files, desc="Processing GeoPackage files"): + try: # ... existing processing code ... + except Exception as e: + logging.error(f"Error processing {gpkg}: {str(e)}") + continuerun_3_tdxhydro_rapid_masters.py (1)
91-108
: Consider refactoring to reduce parameter count.While the implementation is functional, the high number of parameters (15+) suggests that the
rapid_master_files
function might be handling too many responsibilities. Consider refactoring to use a configuration object pattern.Example approach:
class RapidConfig: def __init__(self): self.id_field = None self.ds_id_field = None # ... other config fields @classmethod def from_excel(cls, excel_df, region_num): config = cls() config.dissolve_lakes = bool(excel_df.loc[excel_df['region_number'] == region_num, 'dissolve_lakes'].values[0]) # ... other fields return config # Usage config = RapidConfig.from_excel(net_df, region_num) rp.inputs.rapid_master_files(streams_gpq, save_dir=save_dir, config=config)tdxhydrorapid/weights.py (3)
83-83
: Typo in comment: Correct 'calcualting' to 'calculating'There's a minor typo in the comment. The word 'calcualting' should be corrected to 'calculating' for clarity.
Apply this diff to fix the typo:
-).sort_index() # Sort index is needed (used when calcualting area_sqm) +).sort_index() # Sort index is needed (used when calculating area_sqm)
257-258
: Type consistency when checking foroutlet
inwt[id_field].values
The
outlet
variable is converted to an integer, butwt[id_field].values
may contain values of a different type (e.g.,numpy.int64
). This could cause the membership test to fail even if the values are numerically equal.Ensure type consistency by converting
wt[id_field]
to integers before the check:if outlet in wt[id_field].astype(int).values:
262-264
: Consider regrouping after deleting streamsAfter modifying
wt
by deleting streams, it may be beneficial to regroup and sum thearea_sqm
to ensure data consistency, similar to previous steps.Apply this diff to regroup the DataFrame:
wt = wt[~wt[id_field].isin(streams_to_delete)] + wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index()
tdxhydrorapid/network.py (5)
57-57
: Simplify conditional expression for clarityThe condition
not len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2
can be simplified using the!=
operator for better readability.Apply this change:
-if len(pred_orders) == 2 and not len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2: +if len(pred_orders) == 2 and len([x for x in pred_orders if x == (min_order_to_keep - 1)]) != 2:🧰 Tools
🪛 Ruff
57-57: Use
len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2
instead ofnot len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2
Replace with
!=
operator(SIM201)
61-61
: Simplify conditional expression for claritySimilarly, the condition
not max(pred_orders) == min_order_to_keep - 1
can be simplified using the!=
operator.Apply this change:
-if (len(pred_orders) > 2) and \ - (not max(pred_orders) == min_order_to_keep - 1) and \ - (len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2): +if (len(pred_orders) > 2) and \ + (max(pred_orders) != min_order_to_keep - 1) and \ + (len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2):🧰 Tools
🪛 Ruff
61-61: Use
max(pred_orders) != min_order_to_keep - 1
instead ofnot max(pred_orders) == min_order_to_keep - 1
Replace with
!=
operator(SIM201)
88-88
: Rename unused variableindex
to_
The loop variable
index
is not used within the loop body. Renaming it to_
indicates that it is intentionally unused.Apply this change:
-for index, row in order1s.iterrows(): +for _, row in order1s.iterrows():🧰 Tools
🪛 Ruff
88-88: Loop control variable
index
not used within loop bodyRename unused
index
to_index
(B007)
127-129
: Use logging instead of print statements for error handlingUsing
logging
calls to maintain consistent and configurable logging.Apply this change:
- except Exception as e: - print(e) - print(siblings) - print(row) + except Exception as e: + logger.error(f"An error occurred: {e}") + logger.debug(f"Siblings: {siblings}") + logger.debug(f"Row data: {row}")
351-380
: Ensure consistent use of filesystem operations and add type annotationsIn the
dissolve_catchments
function, the filesystem objectfs
is used alongsideos.path
functions. For consistency and to support various filesystem backends, consider usingfs
methods for all filesystem operations. Also, add a type annotation forfs
for clarity.Apply these changes:
Add type annotation for
fs
:-def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, fs, id_field: str = 'LINKNO') -> gpd.GeoDataFrame: +def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, fs: fsspec.AbstractFileSystem, id_field: str = 'LINKNO') -> gpd.GeoDataFrame:Use
fs.path.join
instead ofos.path.join
:- headwater_dissolve_path = os.path.join(save_dir, 'mod_dissolve_headwater.csv') + headwater_dissolve_path = fs.path.join(save_dir, 'mod_dissolve_headwater.csv')Replace
os.path.exists
withfs.exists
where appropriate.Ensure that all path operations and file manipulations consistently use the same filesystem interface.
tdxhydrorapid/inputs.py (3)
239-242
: Simplifyif
-else
block using a ternary operatorYou can make the code more concise by replacing the
if
-else
block with a ternary operator.Apply this diff:
if downstream: downstream = downstream[0] else: downstream = -1 +downstream = downstream[0] if downstream else -1
🧰 Tools
🪛 Ruff
239-242: Use ternary operator
downstream = downstream[0] if downstream else -1
instead ofif
-else
-blockReplace
if
-else
-block withdownstream = downstream[0] if downstream else -1
(SIM108)
460-461
: Typographical error in commentThere's a typo in the comment. It should be "Check if the stream is a straight line."
Apply this diff:
- # heck if the stream is a straight line + # Check if the stream is a straight line
527-527
: Unused loop variableidx
The variable
idx
is not used within the loop, which is unnecessary.Rename
idx
to_
to indicate it's intentionally unused.Apply this diff:
-for idx, (river_id_to_keep, river_id_to_drop) in short_streams.iterrows(): +for _, (river_id_to_keep, river_id_to_drop) in short_streams.iterrows():🧰 Tools
🪛 Ruff
527-527: Loop control variable
idx
not used within loop bodyRename unused
idx
to_idx
(B007)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (3)
tdxhydrorapid/network_data/bad_streams.csv
is excluded by!**/*.csv
tdxhydrorapid/network_data/processing_options.xlsx
is excluded by!**/*.xlsx
tdxhydrorapid/network_data/rivids_lt_5_cms.csv
is excluded by!**/*.csv
📒 Files selected for processing (7)
environment.yml
(1 hunks)run_1_tdx_gpkg_to_geoparquet.py
(2 hunks)run_3_tdxhydro_rapid_masters.py
(3 hunks)run_4_make_vpu_files.py
(2 hunks)tdxhydrorapid/inputs.py
(8 hunks)tdxhydrorapid/network.py
(8 hunks)tdxhydrorapid/weights.py
(5 hunks)
🧰 Additional context used
🪛 yamllint
environment.yml
[warning] 6-6: wrong indentation: expected 0 but found 2
(indentation)
🪛 Ruff
tdxhydrorapid/inputs.py
195-195: Loop control variable key
not used within loop body
(B007)
239-242: Use ternary operator downstream = downstream[0] if downstream else -1
instead of if
-else
-block
Replace if
-else
-block with downstream = downstream[0] if downstream else -1
(SIM108)
329-329: Loop control variable key
not used within loop body
(B007)
527-527: Loop control variable idx
not used within loop body
Rename unused idx
to _idx
(B007)
tdxhydrorapid/network.py
57-57: Use len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2
instead of not len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2
Replace with !=
operator
(SIM201)
61-61: Use max(pred_orders) != min_order_to_keep - 1
instead of not max(pred_orders) == min_order_to_keep - 1
Replace with !=
operator
(SIM201)
88-88: Loop control variable index
not used within loop body
Rename unused index
to _index
(B007)
🔇 Additional comments (13)
environment.yml (1)
6-7
: Consider specifying upper version bounds.
While using minimum version constraints (>=
) provides flexibility, it might lead to compatibility issues if future major versions introduce breaking changes. Consider using version ranges (e.g., python>=3.11,<3.12
and geopandas>=1.0,<2.0
) to ensure reproducible environments.
🧰 Tools
🪛 yamllint
[warning] 6-6: wrong indentation: expected 0 but found 2
(indentation)
run_4_make_vpu_files.py (1)
31-31
: Verify the impact of the new og_pqs
parameter.
The addition of the og_pqs
parameter to concat_tdxregions
could affect the master table creation process. Please ensure:
- The function handles empty or invalid parquet file lists gracefully
- The behavior is documented in the function's docstring
Let's verify the function implementation and its other callers:
✅ Verification successful
The og_pqs
parameter is properly handled and documented
The function implementation shows:
- The
original_streams
parameter (aliased asog_pqs
) is optional with a default value ofNone
- The docstring clearly explains the parameter's purpose for VPU code assignment using downstream terminal nodes
- The implementation gracefully handles both cases (with and without the parameter) through conditional logic
- The function is only called from one location, which has been properly updated
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check the function implementation and documentation
ast-grep --pattern 'def concat_tdxregions($$$)'
# Find other callers of this function
rg 'concat_tdxregions\(' --type py
Length of output: 369
Script:
#!/bin/bash
# Let's check the implementation details of concat_tdxregions
rg -A 20 'def concat_tdxregions' tdxhydrorapid/inputs.py
Length of output: 1498
run_1_tdx_gpkg_to_geoparquet.py (1)
77-78
: 🛠️ Refactor suggestion
Verify the different processing logic for non-streamnet files.
The non-streamnet files are processed differently with minimal transformations:
- No validation of streamID values before processing
- Missing fields that are present in streamnet files (strmOrder, Magnitude, etc.)
- Dropping streamID might lose valuable debugging/tracking information
Let's verify if this difference is intentional:
Consider adding validation and preserving more information:
+ # Validate streamID values
+ if gdf['streamID'].isnull().any():
+ raise ValueError("Found null streamID values")
+
gdf['LINKNO'] = gdf['streamID'].astype(int) + (tdx_header_number * 10_000_000)
- gdf = gdf.drop(columns=['streamID'])
+ # Add common fields for consistency
+ gdf['TDXHydroRegion'] = region_number
+ gdf['LengthGeodesicMeters'] = gdf['geometry'].apply(_calculate_geodesic_length)
run_3_tdxhydro_rapid_masters.py (4)
66-70
: LGTM: New configuration parameters are well-structured.
The new configuration parameters for lakes, islands, and ocean watersheds follow consistent naming and implementation patterns.
80-84
: LGTM: Type casting is properly implemented.
Explicit boolean type casting ensures type safety and consistency with other configuration parameters.
126-126
: LGTM: Improved weight table existence check.
The simplified check using list comprehension improves code readability while maintaining the same functionality.
Line range hint 66-108
: Verify performance impact of new filtering options.
The addition of multiple new filtering options (lakes, islands, ocean watersheds) could impact processing time. Consider measuring and documenting the performance impact, especially for large regions.
tdxhydrorapid/weights.py (4)
270-277
: Potential issue with outlet value -1
in island streams processing
In the loop for processing island streams, if outlet == -1
, all streams in island_streams
are added to streams_to_delete
. Ensure that this logic aligns with the intended behavior. Also, verify that the wt[id_field]
contains the correct data types for comparison.
Double-check the handling of -1
as an outlet and confirm that streams are being merged or deleted as expected.
290-292
: Possible data inconsistency due to repeated grouping
Repeated grouping and summing without careful handling can sometimes lead to data inconsistencies. Review the grouping operations to ensure that the area_sqm
and other aggregated fields correctly represent the data.
Ensure that the grouping fields are appropriate and that no critical information is lost during aggregation.
106-109
:
Ensure consistent projection before area calculation
In the make_weight_table_from_thiessen_grid
function, the area is computed without ensuring the geometries are in a projected CRS. Since area calculations are inaccurate in geographic coordinates, please confirm that the geometries are projected appropriately before calculating the area.
Consider adding a projection step or verify that the overlay
function outputs geometries in the desired CRS.
225-226
: Verify the mapping keys and values for pruning streams
When creating ids_to_prune
, the mapping from LINKTODROP
to LINKNO
assumes that these columns exist and are correctly set as integers. Verify that the CSV file has the appropriate columns and that the mapping does not unintentionally alter id_field
.
Run the following script to check for potential issues:
tdxhydrorapid/inputs.py (2)
462-464
: Potential IndexError
when accessing geometry
When accessing sgdf[sgdf[id_field]==stream_id].geometry.values[0]
, if stream_id
does not exist in sgdf
, this will raise an IndexError
.
Please verify that stream_id
always exists in sgdf
. If not, consider adding a check before accessing .values[0]
:
if sgdf[sgdf[id_field] == stream_id].empty:
# Handle the case where stream_id is not found
else:
geom = sgdf[sgdf[id_field] == stream_id].geometry.values[0]
467-469
:
Logical error in straight-line detection algorithm
The method for determining if a stream is a straight line by comparing squared lengths may not be reliable due to floating-point precision issues.
Consider using a geometric approach to determine if the stream is a straight line, such as checking if all intermediate points lie on the line defined by the start and end points.
run_4_make_vpu_files.py
Outdated
tdx_inputs_dir = 'test/rapid_inputs' | ||
og_pqs = glob.glob("/Volumes/EB406_T7_3/geoglows_v3/parquets/TDX_streamnet_*_01.parquet") | ||
final_output_dir = 'test/outputs' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove hardcoded absolute paths and standardize path handling.
The current implementation has several issues:
- Hardcoded absolute path
/Volumes/EB406_T7_3/...
makes the code non-portable and environment-dependent - Inconsistent path style (mixing relative and absolute paths)
Consider:
- Using environment variables or configuration files for path management
- Making all paths relative to the project root
- Adding path validation
-tdx_inputs_dir = 'test/rapid_inputs'
-og_pqs = glob.glob("/Volumes/EB406_T7_3/geoglows_v3/parquets/TDX_streamnet_*_01.parquet")
-final_output_dir = 'test/outputs'
+# Load paths from environment or config
+tdx_inputs_dir = os.getenv('TDX_INPUTS_DIR', 'test/rapid_inputs')
+parquets_dir = os.getenv('PARQUETS_DIR', 'data/parquets')
+og_pqs = glob.glob(os.path.join(parquets_dir, "TDX_streamnet_*_01.parquet"))
+final_output_dir = os.getenv('FINAL_OUTPUT_DIR', 'test/outputs')
Committable suggestion skipped: line range outside the PR's diff.
merge_dict = {stream: streams_to_merge[0] for streams_to_merge in o2_to_dissolve.values for stream in streams_to_merge} | ||
wt[id_field] = wt[id_field].map(merge_dict).fillna(wt[id_field]).astype(int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible issue with merge dictionary creation
When creating merge_dict
, the comprehension uses o2_to_dissolve.values
, which may not produce the expected structure if o2_to_dissolve
is a DataFrame. This could lead to incorrect mapping.
Ensure that o2_to_dissolve.values
yields the correct lists for streams_to_merge
. You might need to adjust the code to iterate over the DataFrame rows properly.
if merge_stream[0] not in wt[id_field].values or merge_stream[1] not in wt[id_field].values: | ||
continue | ||
wt[id_field] = wt[id_field].replace(merge_stream[1], merge_stream[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper handling when merging short streams
In the loop for merging short streams, the condition checks if either stream ID is not present and continues. This may skip necessary merges if one of the IDs is missing due to previous operations.
Consider adjusting the logic to handle cases where one ID is missing, or log the occurrence for further inspection.
tdxhydrorapid/weights.py
Outdated
tg_ddf: dgpd.GeoDataFrame = dgpd.from_geopandas(tg_gdf, npartitions=1) | ||
basins_ddf: dgpd.GeoDataFrame = dgpd.from_geopandas(basins_gdf, npartitions=80) | ||
intersections = overlay(basins_ddf, tg_ddf) | ||
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80).to_crs({'proj':'cea'}).area.compute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Redundant reprojection before area computation
The intersections
GeoDataFrame is already projected to Cylindrical Equal Area in the overlay
function. Reprojecting again in line 195 is unnecessary and may impact performance.
Apply this diff to remove the redundant to_crs
call:
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80)\
- .to_crs({'proj': 'cea'}).area.compute()
+ .area.compute()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80).to_crs({'proj':'cea'}).area.compute() | |
intersections['area_sqm'] = dgpd.from_geopandas(intersections, npartitions=80).area.compute() |
for outlet, _, lake_streams in lake_streams_df.itertuples(): | ||
outlet = int(outlet) | ||
if outlet in wt[id_field].values: | ||
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet | ||
else: | ||
streams_to_delete.update(lake_streams) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect unpacking in loop over lake_streams_df.itertuples()
When iterating over lake_streams_df.itertuples()
, the unpacking should consider that the first element is the Index. The current code may raise a ValueError
due to mismatched unpacking.
Adjust the loop to unpack correctly. Here's a corrected version:
- for outlet, _, lake_streams in lake_streams_df.itertuples():
+ for idx, outlet, _, lake_streams in lake_streams_df.itertuples():
Or, if the DataFrame columns are known, consider using itertuples(index=False)
:
- for outlet, _, lake_streams in lake_streams_df.itertuples():
+ for outlet, _, lake_streams in lake_streams_df.itertuples(index=False):
Ensure the unpacking matches the structure of the tuples returned by itertuples()
.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for outlet, _, lake_streams in lake_streams_df.itertuples(): | |
outlet = int(outlet) | |
if outlet in wt[id_field].values: | |
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet | |
else: | |
streams_to_delete.update(lake_streams) | |
for outlet, _, lake_streams in lake_streams_df.itertuples(index=False): | |
outlet = int(outlet) | |
if outlet in wt[id_field].values: | |
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet | |
else: | |
streams_to_delete.update(lake_streams) |
tdxhydrorapid/inputs.py
Outdated
for key, items in dissolve_lake_dict.items(): | ||
if outlet in items: | ||
break | ||
real_outlet = key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential use of loop variable key
outside its scope
In the loop where you iterate over dissolve_lake_dict.items()
, the variable key
may not be defined if the loop completes without finding a matching outlet
. Using key
outside the loop without ensuring it was set can lead to a UnboundLocalError
.
Apply this diff to ensure real_outlet
is properly assigned:
for key, items in dissolve_lake_dict.items():
if outlet in items:
break
+else:
+ raise ValueError(f"Outlet {outlet} not found in dissolve_lake_dict")
-real_outlet = key
+real_outlet = key
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for key, items in dissolve_lake_dict.items(): | |
if outlet in items: | |
break | |
real_outlet = key | |
for key, items in dissolve_lake_dict.items(): | |
if outlet in items: | |
break | |
else: | |
raise ValueError(f"Outlet {outlet} not found in dissolve_lake_dict") | |
real_outlet = key |
🧰 Tools
🪛 Ruff
195-195: Loop control variable key
not used within loop body
(B007)
for key, items in islands_dict.items(): | ||
if outlet in items: | ||
break | ||
real_outlet = key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential use of loop variable key
outside its scope
Similar to lines 195-198, the variable key
may be undefined outside the loop if the loop doesn't break. This can cause a UnboundLocalError
when real_outlet = key
is executed.
Apply this diff to handle the case properly:
for key, items in islands_dict.items():
if outlet in items:
break
+else:
+ raise ValueError(f"Outlet {outlet} not found in islands_dict")
-real_outlet = key
+real_outlet = key
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for key, items in islands_dict.items(): | |
if outlet in items: | |
break | |
real_outlet = key | |
for key, items in islands_dict.items(): | |
if outlet in items: | |
break | |
else: | |
raise ValueError(f"Outlet {outlet} not found in islands_dict") | |
real_outlet = key |
🧰 Tools
🪛 Ruff
329-329: Loop control variable key
not used within loop body
(B007)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
tdxhydrorapid/weights.py
(5 hunks)
🔇 Additional comments (2)
tdxhydrorapid/weights.py (2)
106-109
: LGTM! Efficient spatial operations setup
The conversion to Dask GeoDataFrames with appropriate partition sizes looks good. Using 1 partition for the thiessen grid (smaller dataset) and 80 for the basins (larger dataset) is a good optimization.
Also applies to: 192-195
286-288
: LGTM! Correct handling of missing stream IDs
The check for missing stream IDs before merging is appropriate and matches the PR's objective of removing unwanted geometries.
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame: | ||
""" | ||
Performs a spatial intersection overlay of two GeoDataFrames using Dask. | ||
Reproject geometries to a Cylindrical Equal Area projection. | ||
""" | ||
return gpd.GeoDataFrame( | ||
left.sjoin(right.assign(right_geometry=right.geometry)) | ||
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'})) | ||
.drop(columns="right_geometry") | ||
.compute()).sort_index() # Sort index is needed (used when calcualting area_sqm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize projection timing in overlay function
The projection to CEA should occur before the intersection operation to ensure geometric accuracy. Currently, it's done after the intersection which could lead to less accurate results.
Apply this diff to fix the projection timing:
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Performs a spatial intersection overlay of two GeoDataFrames using Dask.
Reproject geometries to a Cylindrical Equal Area projection.
"""
+ left = left.to_crs({'proj': 'cea'})
+ right = right.to_crs({'proj': 'cea'})
return gpd.GeoDataFrame(
left.sjoin(right.assign(right_geometry=right.geometry))
- .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
+ .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry))
.drop(columns="right_geometry")
.compute()).sort_index()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame: | |
""" | |
Performs a spatial intersection overlay of two GeoDataFrames using Dask. | |
Reproject geometries to a Cylindrical Equal Area projection. | |
""" | |
return gpd.GeoDataFrame( | |
left.sjoin(right.assign(right_geometry=right.geometry)) | |
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'})) | |
.drop(columns="right_geometry") | |
.compute()).sort_index() # Sort index is needed (used when calcualting area_sqm) | |
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame: | |
""" | |
Performs a spatial intersection overlay of two GeoDataFrames using Dask. | |
Reproject geometries to a Cylindrical Equal Area projection. | |
""" | |
left = left.to_crs({'proj': 'cea'}) | |
right = right.to_crs({'proj': 'cea'}) | |
return gpd.GeoDataFrame( | |
left.sjoin(right.assign(right_geometry=right.geometry)) | |
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry)) | |
.drop(columns="right_geometry") | |
.compute()).sort_index() # Sort index is needed (used when calcualting area_sqm) |
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json') | ||
if os.path.exists(lake_streams_path): | ||
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False) | ||
streams_to_delete = set() | ||
for outlet, _, lake_streams in lake_streams_df.itertuples(): | ||
outlet = int(outlet) | ||
if outlet in wt[id_field].values: | ||
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet | ||
else: | ||
streams_to_delete.update(lake_streams) | ||
|
||
wt = wt[~wt[id_field].isin(streams_to_delete)] | ||
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for lake streams processing
The lake streams processing lacks validation of JSON data structure and type conversion safety checks.
Add error handling:
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json')
if os.path.exists(lake_streams_path):
+ try:
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False)
streams_to_delete = set()
for outlet, _, lake_streams in lake_streams_df.itertuples():
- outlet = int(outlet)
+ try:
+ outlet = int(outlet)
+ except ValueError:
+ logger.warning(f"Invalid outlet ID format in lake streams: {outlet}")
+ continue
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet
else:
streams_to_delete.update(lake_streams)
+ except ValueError as e:
+ logger.error(f"Invalid JSON format in lake streams file: {e}")
+ raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json') | |
if os.path.exists(lake_streams_path): | |
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False) | |
streams_to_delete = set() | |
for outlet, _, lake_streams in lake_streams_df.itertuples(): | |
outlet = int(outlet) | |
if outlet in wt[id_field].values: | |
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet | |
else: | |
streams_to_delete.update(lake_streams) | |
wt = wt[~wt[id_field].isin(streams_to_delete)] | |
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index() | |
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json') | |
if os.path.exists(lake_streams_path): | |
try: | |
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False) | |
streams_to_delete = set() | |
for outlet, _, lake_streams in lake_streams_df.itertuples(): | |
try: | |
outlet = int(outlet) | |
except ValueError: | |
logger.warning(f"Invalid outlet ID format in lake streams: {outlet}") | |
continue | |
if outlet in wt[id_field].values: | |
wt.loc[wt[id_field].isin(lake_streams), id_field] = outlet | |
else: | |
streams_to_delete.update(lake_streams) | |
wt = wt[~wt[id_field].isin(streams_to_delete)] | |
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index() | |
except ValueError as e: | |
logger.error(f"Invalid JSON format in lake streams file: {e}") | |
raise |
island_streams_path = os.path.join(save_dir, 'mod_dissolve_islands.json') | ||
if os.path.exists(island_streams_path): | ||
with open(island_streams_path, 'r') as f: | ||
island_streams_dict: dict = json.load(f) | ||
|
||
streams_to_delete = set() | ||
for outlet, island_streams in island_streams_dict.items(): | ||
outlet = int(outlet) | ||
if outlet == -1 or outlet not in wt[id_field].values: | ||
streams_to_delete.update(island_streams) | ||
else: | ||
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet | ||
|
||
wt = wt[~wt[id_field].isin(streams_to_delete)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for island streams processing
The island streams processing also needs validation and error handling.
Add error handling:
if os.path.exists(island_streams_path):
+ try:
with open(island_streams_path, 'r') as f:
- island_streams_dict: dict = json.load(f)
+ island_streams_dict: dict = json.load(f)
+ if not isinstance(island_streams_dict, dict):
+ raise ValueError("Expected dictionary in island streams file")
streams_to_delete = set()
for outlet, island_streams in island_streams_dict.items():
- outlet = int(outlet)
+ try:
+ outlet = int(outlet)
+ except ValueError:
+ logger.warning(f"Invalid outlet ID format in island streams: {outlet}")
+ continue
if outlet == -1 or outlet not in wt[id_field].values:
streams_to_delete.update(island_streams)
else:
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet
+ except (json.JSONDecodeError, ValueError) as e:
+ logger.error(f"Error processing island streams file: {e}")
+ raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
island_streams_path = os.path.join(save_dir, 'mod_dissolve_islands.json') | |
if os.path.exists(island_streams_path): | |
with open(island_streams_path, 'r') as f: | |
island_streams_dict: dict = json.load(f) | |
streams_to_delete = set() | |
for outlet, island_streams in island_streams_dict.items(): | |
outlet = int(outlet) | |
if outlet == -1 or outlet not in wt[id_field].values: | |
streams_to_delete.update(island_streams) | |
else: | |
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet | |
wt = wt[~wt[id_field].isin(streams_to_delete)] | |
island_streams_path = os.path.join(save_dir, 'mod_dissolve_islands.json') | |
if os.path.exists(island_streams_path): | |
try: | |
with open(island_streams_path, 'r') as f: | |
island_streams_dict: dict = json.load(f) | |
if not isinstance(island_streams_dict, dict): | |
raise ValueError("Expected dictionary in island streams file") | |
streams_to_delete = set() | |
for outlet, island_streams in island_streams_dict.items(): | |
try: | |
outlet = int(outlet) | |
except ValueError: | |
logger.warning(f"Invalid outlet ID format in island streams: {outlet}") | |
continue | |
if outlet == -1 or outlet not in wt[id_field].values: | |
streams_to_delete.update(island_streams) | |
else: | |
wt.loc[wt[id_field].isin(island_streams), id_field] = outlet | |
wt = wt[~wt[id_field].isin(streams_to_delete)] | |
except (json.JSONDecodeError, ValueError) as e: | |
logger.error(f"Error processing island streams file: {e}") | |
raise |
low_flow_streams_path = os.path.join(save_dir, 'mod_drop_low_flow.json') | ||
if os.path.exists(low_flow_streams_path): | ||
with open(low_flow_streams_path, 'r') as f: | ||
low_flow_streams_dict: dict = json.load(f) | ||
|
||
streams_to_delete = set() | ||
for outlet, low_flow_streams in low_flow_streams_dict.items(): | ||
outlet = int(outlet) | ||
if outlet in wt[id_field].values: | ||
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet | ||
else: | ||
streams_to_delete.update(low_flow_streams) | ||
|
||
wt = wt[~wt[id_field].isin(streams_to_delete)] | ||
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for low flow streams processing
Similar to lake streams, the low flow streams processing needs validation and error handling.
Add error handling:
if os.path.exists(low_flow_streams_path):
+ try:
with open(low_flow_streams_path, 'r') as f:
- low_flow_streams_dict: dict = json.load(f)
+ low_flow_streams_dict: dict = json.load(f)
+ if not isinstance(low_flow_streams_dict, dict):
+ raise ValueError("Expected dictionary in low flow streams file")
streams_to_delete = set()
for outlet, low_flow_streams in low_flow_streams_dict.items():
- outlet = int(outlet)
+ try:
+ outlet = int(outlet)
+ except ValueError:
+ logger.warning(f"Invalid outlet ID format in low flow streams: {outlet}")
+ continue
if outlet in wt[id_field].values:
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet
else:
streams_to_delete.update(low_flow_streams)
+ except (json.JSONDecodeError, ValueError) as e:
+ logger.error(f"Error processing low flow streams file: {e}")
+ raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
low_flow_streams_path = os.path.join(save_dir, 'mod_drop_low_flow.json') | |
if os.path.exists(low_flow_streams_path): | |
with open(low_flow_streams_path, 'r') as f: | |
low_flow_streams_dict: dict = json.load(f) | |
streams_to_delete = set() | |
for outlet, low_flow_streams in low_flow_streams_dict.items(): | |
outlet = int(outlet) | |
if outlet in wt[id_field].values: | |
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet | |
else: | |
streams_to_delete.update(low_flow_streams) | |
wt = wt[~wt[id_field].isin(streams_to_delete)] | |
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index() | |
low_flow_streams_path = os.path.join(save_dir, 'mod_drop_low_flow.json') | |
if os.path.exists(low_flow_streams_path): | |
try: | |
with open(low_flow_streams_path, 'r') as f: | |
low_flow_streams_dict: dict = json.load(f) | |
if not isinstance(low_flow_streams_dict, dict): | |
raise ValueError("Expected dictionary in low flow streams file") | |
streams_to_delete = set() | |
for outlet, low_flow_streams in low_flow_streams_dict.items(): | |
try: | |
outlet = int(outlet) | |
except ValueError: | |
logger.warning(f"Invalid outlet ID format in low flow streams: {outlet}") | |
continue | |
if outlet in wt[id_field].values: | |
wt.loc[wt[id_field].isin(low_flow_streams), id_field] = outlet | |
else: | |
streams_to_delete.update(low_flow_streams) | |
wt = wt[~wt[id_field].isin(streams_to_delete)] | |
wt = wt.groupby(wt.columns.drop('area_sqm').tolist()).sum().reset_index() | |
except (json.JSONDecodeError, ValueError) as e: | |
logger.error(f"Error processing low flow streams file: {e}") | |
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Outside diff range and nitpick comments (10)
run_1.1_lakes.py (2)
51-185
: Refactor the Main Execution Block into Smaller FunctionsThe main execution block is lengthy and contains nested loops and conditional statements, which can be difficult to read and maintain. Refactor the code into smaller, well-named functions that perform specific tasks. This will improve readability and make the code easier to debug and test.
64-64
: Optimize Spatial Join PerformancePerforming a spatial join on large datasets can be resource-intensive. Ensure that spatial indexing is utilized to optimize performance. In Dask GeoDataFrame, spatial partitions and indexing can significantly improve the efficiency of spatial joins.
Consider verifying that spatial partitions are set up correctly:
dgdf = dgdf.partition_quadTree() lakes_subset = dgpd.from_geopandas(lakes_subset).partition_quadTree() intersect = dgpd.sjoin(dgdf, lakes_subset, how='inner', predicate='intersects').compute()run_7_catchments.py (1)
6-6
: Remove Unused ImportThe
tqdm
module is imported but not used in the script. Removing unused imports improves code clarity and reduces potential confusion.Apply this diff to remove the unused import:
-import tqdm
🧰 Tools
🪛 Ruff (0.8.0)
6-6:
tqdm
imported but unusedRemove unused import:
tqdm
(F401)
run_1_tdx_gpkg_to_geoparquet.py (1)
9-9
: Remove Unused ImportThe
Point
class fromshapely.geometry
is imported but not used in the script. Removing unused imports helps keep the code clean and maintainable.Apply this diff to remove the unused import:
-from shapely.geometry import Point
🧰 Tools
🪛 Ruff (0.8.0)
9-9:
shapely.geometry.Point
imported but unusedRemove unused import:
shapely.geometry.Point
(F401)
run_4_make_vpu_files.py (2)
65-67
: Remove unused exception variable.The error handling is improved with the use of
traceback.format_exc()
, but the exception variablee
is unused.- except Exception as e: + except Exception: logging.error(vpu) logging.error(tdx_region) logging.error(traceback.format_exc())
50-50
: Improve readability of the conditional check.The current condition combines multiple checks in a way that's hard to read. Consider splitting it for better clarity.
- if os.path.exists(vpu_dir) and (not MAKE_GPKG or os.path.exists(os.path.join(gpkg_dir, f'streams_{vpu}.gpkg'))): + gpkg_path = os.path.join(gpkg_dir, f'streams_{vpu}.gpkg') + if os.path.exists(vpu_dir) and (not MAKE_GPKG or os.path.exists(gpkg_path)):tdxhydrorapid/_validate.py (1)
Line range hint
40-77
: Consider extracting validation logic into separate functions.While the changes effectively support both rapid and non-rapid validation paths, the function could be more maintainable if split into smaller, focused functions.
Consider refactoring like this:
def check_outputs_are_valid(input_dir: str, use_rapid: bool = False) -> bool: + def _validate_rapid_files(): + return { + 'comid_lat_lon_z': pd.read_csv(os.path.join(input_dir, 'comid_lat_lon_z.csv')).shape[0], + 'rapid_connect': pd.read_csv(os.path.join(input_dir, 'rapid_connect.csv'), header=None).shape[0], + 'riv_bas_id': pd.read_csv(os.path.join(input_dir, 'riv_bas_id.csv'), header=None).shape[0], + 'k': pd.read_csv(os.path.join(input_dir, 'k.csv'), header=None).shape[0], + 'x': pd.read_csv(os.path.join(input_dir, 'x.csv'), header=None).shape[0], + } + def _validate_parquet_files(): + return { + 'connectivity': pd.read_parquet(os.path.join(input_dir, 'connectivity.parquet')).shape[0], + 'routing_parameters': pd.read_parquet(os.path.join(input_dir, 'routing_parameters.parquet')).shape[0], + } file_counts = _validate_rapid_files() if use_rapid else _validate_parquet_files() n_weights = [] for f in sorted(glob.glob(os.path.join(input_dir, 'weight_*.csv'))): df = pd.read_csv(f) n_weights.append((os.path.basename(f), df.iloc[:, 0].unique().shape[0])) logger.info('Checking for consistent numbers of basins in generated files') - if use_rapid: - all_nums = [n_comid_lat_lon_z, n_rapidconnect, n_rivbasid, n_k, n_x] + [n for f, n in n_weights if 'full' not in f] - else: - all_nums = [n_connectivity, n_routing_params] + [n for f, n in n_weights if 'full' not in f] + all_nums = list(file_counts.values()) + [n for f, n in n_weights if 'full' not in f]run_3_tdxhydro_rapid_masters.py (1)
92-93
: Simplify the complex file existence check.The condition for checking file existence is hard to read with multiple conditions and line breaks.
- if not os.path.exists(os.path.join(save_dir, 'rapid_inputs_master.parquet')) or \ - (CACHE_GEOMETRY and not len(list(glob.glob(os.path.join(save_dir, '*.geoparquet'))))) or \ - not os.path.exists(os.path.join(save_dir, 'rapid_inputs_master.parquet')): + master_file = os.path.join(save_dir, 'rapid_inputs_master.parquet') + geoparquet_files = list(glob.glob(os.path.join(save_dir, '*.geoparquet'))) + needs_processing = ( + not os.path.exists(master_file) or + (CACHE_GEOMETRY and not len(geoparquet_files)) + ) + if needs_processing:tdxhydrorapid/network.py (1)
59-65
: Simplify complex conditional logicThe nested conditions are hard to read and maintain. Consider simplifying the logic using clearer variable names and separate conditions.
Apply this diff to improve readability:
- if len(pred_orders) == 2 and not len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2: + has_two_predecessors = len(pred_orders) == 2 + both_are_lower_order = len([x for x in pred_orders if x == (min_order_to_keep - 1)]) == 2 + if has_two_predecessors and not both_are_lower_order: continue - if (len(pred_orders) > 2) and \ - (not max(pred_orders) == min_order_to_keep - 1) and \ - (len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2): + has_multiple_predecessors = len(pred_orders) > 2 + highest_order_matches = max(pred_orders) == min_order_to_keep - 1 + has_two_lower_order = len([x for x in pred_orders if x == (min_order_to_keep - 1)]) >= 2 + if has_multiple_predecessors and not highest_order_matches and has_two_lower_order: continue🧰 Tools
🪛 Ruff (0.8.0)
59-59: Use
len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2
instead ofnot len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2
Replace with
!=
operator(SIM201)
63-63: Use
max(pred_orders) != min_order_to_keep - 1
instead ofnot max(pred_orders) == min_order_to_keep - 1
Replace with
!=
operator(SIM201)
tdxhydrorapid/inputs.py (1)
631-666
: Add type hints to river_route_inputsAdding type hints would improve code maintainability and IDE support.
Apply this diff to add type hints:
def river_route_inputs(sdf: pd.DataFrame, save_dir: str, id_field: str = 'LINKNO', ds_id_field: str = 'DSLINKNO', ) -> None: """ Create river route input files from a stream network dataframe Produces the following files: + - connectivity.parquet: Contains river connectivity information + - routing_parameters.parquet: Contains routing parameters Args: sdf: Stream network dataframe save_dir: Directory to save outputs id_field: Field containing unique stream IDs ds_id_field: Field containing downstream stream IDs Returns: None + + Raises: + ValueError: If required columns are missing """ + required_columns = {id_field, ds_id_field, 'musk_x', 'musk_k'} + missing_columns = required_columns - set(sdf.columns) + if missing_columns: + raise ValueError(f"Missing required columns: {missing_columns}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (2)
tdxhydrorapid/network_data/lake_table.csv
is excluded by!**/*.csv
tdxhydrorapid/network_data/processing_options.xlsx
is excluded by!**/*.xlsx
📒 Files selected for processing (11)
run_1.1_lakes.py
(1 hunks)run_1_tdx_gpkg_to_geoparquet.py
(4 hunks)run_3_tdxhydro_rapid_masters.py
(5 hunks)run_4_make_vpu_files.py
(3 hunks)run_5_global_geometry.py
(1 hunks)run_7_catchments.py
(1 hunks)tdxhydrorapid/__init__.py
(1 hunks)tdxhydrorapid/_validate.py
(2 hunks)tdxhydrorapid/inputs.py
(16 hunks)tdxhydrorapid/network.py
(9 hunks)tdxhydrorapid/weights.py
(5 hunks)
✅ Files skipped from review due to trivial changes (1)
- tdxhydrorapid/init.py
🧰 Additional context used
🪛 Ruff (0.8.0)
run_7_catchments.py
6-6: tqdm
imported but unused
Remove unused import: tqdm
(F401)
tdxhydrorapid/weights.py
302-305: Use ternary operator gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid
instead of if
-else
-block
Replace if
-else
-block with gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid
(SIM108)
308-308: Multiple statements on one line (colon)
(E701)
309-309: Undefined name dys
(F821)
315-315: Multiple statements on one line (colon)
(E701)
run_4_make_vpu_files.py
19-19: Use '/Users/ricky/Downloads/vpu-boundaries.gpkg'
instead of '/Users/ricky/Downloads/vpu-boundaries.gpkg' or ...
Replace with '/Users/ricky/Downloads/vpu-boundaries.gpkg'
(SIM222)
64-64: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
tdxhydrorapid/network.py
59-59: Use len([x for x in pred_orders if x == min_order_to_keep - 1]) != 2
instead of not len([x for x in pred_orders if x == min_order_to_keep - 1]) == 2
Replace with !=
operator
(SIM201)
63-63: Use max(pred_orders) != min_order_to_keep - 1
instead of not max(pred_orders) == min_order_to_keep - 1
Replace with !=
operator
(SIM201)
90-90: Loop control variable index
not used within loop body
Rename unused index
to _index
(B007)
tdxhydrorapid/inputs.py
32-33: Undefined name riverconcat_tdxregions
in __all__
(F822)
212-215: Use ternary operator downstream = downstream[0] if downstream else -1
instead of if
-else
-block
Replace if
-else
-block with downstream = downstream[0] if downstream else -1
(SIM108)
302-302: Loop control variable key
not used within loop body
(B007)
465-465: Use feat[ds_id_field].values == -1
instead of not feat[ds_id_field].values != -1
Replace with ==
operator
(SIM202)
544-544: Loop control variable idx
not used within loop body
Rename unused idx
to _idx
(B007)
run_1_tdx_gpkg_to_geoparquet.py
9-9: shapely.geometry.Point
imported but unused
Remove unused import: shapely.geometry.Point
(F401)
56-56: Undefined name gdf
(F821)
56-56: Undefined name gdf
(F821)
🔇 Additional comments (10)
run_1.1_lakes.py (2)
25-75
: Ensure Type Hint Compatibility with Python Version
The type hints using dict[int, int]
require Python 3.9 or later. If the project needs to support earlier Python versions, use Dict[int, int]
from the typing
module instead.
Apply this diff to update type hints for compatibility:
+from typing import Dict
def create_directed_graphs(df: gpd.GeoDataFrame,
id_field='LINKNO',
ds_id_field='DSLINKNO', ) -> nx.DiGraph:
G: nx.DiGraph = nx.from_pandas_edgelist(df[df[ds_id_field] != -1],
source=id_field,
target=ds_id_field,
create_using=nx.DiGraph)
G.add_nodes_from(df[id_field].values)
return G
# ...
- lake_id_dict: dict[int, int] = intersect['Lake_id'].to_dict()
- lake_polygon_dict: dict[int, Polygon] = lakes_subset.set_index('Lake_id')['geometry'].to_dict()
- geom_dict: dict[int, Polygon] = gdf.set_index('LINKNO')['geometry'].to_dict()
+ lake_id_dict: Dict[int, int] = intersect['Lake_id'].to_dict()
+ lake_polygon_dict: Dict[int, Polygon] = lakes_subset.set_index('Lake_id')['geometry'].to_dict()
+ geom_dict: Dict[int, Polygon] = gdf.set_index('LINKNO')['geometry'].to_dict()
83-84
: 🛠️ Refactor suggestion
Handle Lakes with Multiple Outlets Gracefully
Currently, the script raises a RuntimeError
if a lake has multiple outlets:
if len(outlets) != 1:
raise RuntimeError(f'Lake has {len(outlets)} outlets')
Consider handling lakes with multiple outlets more gracefully, such as logging a warning and continuing processing, or implementing the logic to support multiple outlets if applicable.
Would you like assistance in modifying the code to handle multiple outlets?
run_1_tdx_gpkg_to_geoparquet.py (1)
43-44
: Ensure Output Directories Are Correctly Named
The os.makedirs
calls are creating directories with potentially incorrect paths:
os.makedirs(gpq_dir, exist_ok=True)
os.makedirs(save_dir, exist_ok=True)
Verify that gpq_dir
and save_dir
are correctly set and that the directories are intended to be created at these locations. Additionally, consider whether save_dir
is necessary if it's not used elsewhere in the script.
run_4_make_vpu_files.py (1)
13-15
:
Replace hardcoded absolute paths with environment variables or configuration.
The hardcoded absolute paths make the code environment-dependent and non-portable. This is particularly problematic for paths that reference user-specific directories.
Consider using environment variables or a configuration file:
-tdx_inputs_dir = '/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs'
-og_pqs = glob.glob("/Users/ricky/tdxhydro-postprocessing/test/pqs/TDX_streamnet_*_01.parquet")
-final_output_dir = '/Users/ricky/tdxhydro-postprocessing/test/vpus'
-vpu_boundaries = '/Users/ricky/Downloads/vpu-boundaries.gpkg' or None
+tdx_inputs_dir = os.getenv('TDX_INPUTS_DIR', 'test/rapid_inputs')
+og_pqs = glob.glob(os.path.join(os.getenv('TDX_PQS_DIR', 'test/pqs'), "TDX_streamnet_*_01.parquet"))
+final_output_dir = os.getenv('FINAL_OUTPUT_DIR', 'test/vpus')
+vpu_boundaries = os.getenv('VPU_BOUNDARIES_PATH')
Also applies to: 19-19
run_3_tdxhydro_rapid_masters.py (2)
Line range hint 21-22
: Replace hardcoded paths with environment variables or configuration.
Similar to other files, the hardcoded paths make the code environment-dependent.
-inputs_path = '/Volumes/T9Hales4TB/TDXHydroGeoParquet'
-outputs_path = '/Volumes/T9Hales4TB/geoglows2/tdxhydro-inputs'
+inputs_path = os.getenv('TDX_HYDRO_INPUTS', 'data/TDXHydroGeoParquet')
+outputs_path = os.getenv('TDX_HYDRO_OUTPUTS', 'data/tdxhydro-inputs')
Also applies to: 46-46
69-73
: LGTM! Well-structured lake processing configuration.
The new configuration options for lake processing are well-organized and properly typed. The changes align with the PR objective of removing unwanted lake geometries.
Also applies to: 83-87, 103-107
tdxhydrorapid/weights.py (3)
4-13
: LGTM! The new imports align with the performance objectives.
The addition of dask_geopandas
and related imports supports the goal of improving performance through parallel processing.
Also applies to: 22-22
325-328
: LGTM! Clean and efficient implementation.
The generator function is well-designed for memory efficiency when dealing with multiple grids.
79-88
: 🛠️ Refactor suggestion
Optimize projection timing in overlay function
The projection to CEA should occur before the intersection operation to ensure geometric accuracy. Currently, it's done after the intersection which could lead to less accurate results.
Apply this diff to fix the projection timing:
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Performs a spatial intersection overlay of two GeoDataFrames using Dask.
Reproject geometries to a Cylindrical Equal Area projection.
"""
+ left = left.to_crs({'proj': 'cea'})
+ right = right.to_crs({'proj': 'cea'})
return gpd.GeoDataFrame(
left.sjoin(right.assign(right_geometry=right.geometry))
- .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'}))
+ .assign(geometry=lambda x: x.geometry.intersection(x.right_geometry))
.drop(columns="right_geometry")
.compute()).sort_index()
Likely invalid or redundant comment.
tdxhydrorapid/network.py (1)
6-7
: LGTM! Consistent with performance improvements.
The addition of Dask-related imports aligns with the performance optimization goals stated in the PR.
Also applies to: 22-22
run_1.1_lakes.py
Outdated
lakes_parquet = '/Users/ricky/Downloads/GLAKES/all_lakes_filtered.parquet' # https://garslab.com/?p=234 August 24, 2022 | ||
gpq_dir = '/Volumes/EB406_T7_3/geoglows_v3/parquets*' | ||
save_dir = './tdxhydrorapid/network_data/' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace Hardcoded Paths with Configurable Parameters
The paths lakes_parquet
, gpq_dir
, and save_dir
are hardcoded with absolute paths specific to your local file system. This can cause issues when running the script in different environments or when sharing the code with others. Consider using command-line arguments, environment variables, or a configuration file to make these paths configurable.
run_1.1_lakes.py
Outdated
lakes_gdf = gpd.read_parquet(lakes_parquet) | ||
lakes_gdf = lakes_gdf[lakes_gdf['Area_PW'] > 1] | ||
lakes_gdf['geometry'] = lakes_gdf['geometry'].apply(fill_holes) # Remove holes from lakes, could lead to issues... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Error Handling for Missing or Incorrect Data Columns
The script assumes that the Area_PW
column exists in the lakes_gdf
DataFrame. If this column is missing or contains unexpected data types, the script will raise a KeyError
or other exceptions. Add error handling to check for the existence and validity of the Area_PW
column before performing operations.
Apply this diff to add error handling:
if __name__ == "__main__":
logging.info('Getting Lake Polygons')
lakes_gdf = gpd.read_parquet(lakes_parquet)
+ if 'Area_PW' not in lakes_gdf.columns:
+ raise KeyError("Column 'Area_PW' not found in lakes data.")
lakes_gdf = lakes_gdf[lakes_gdf['Area_PW'] > 1]
lakes_gdf['geometry'] = lakes_gdf['geometry'].apply(fill_holes) # Remove holes from lakes, could lead to issues...
Committable suggestion skipped: line range outside the PR's diff.
gdf = dgpd.read_parquet('/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs/*/*geoparquet', filesystem='arrow', columns=['LINKNO', 'geometry']) | ||
gdf['geometry'] = gdf['geometry'].simplify(0.001, preserve_topology=False) | ||
gdf.compute().to_file('test_global_streams_simplified_dask.gpkg', driver='GPKG') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure File Path Patterns Are Correct and Configurable
The file path pattern used in dgpd.read_parquet
is hardcoded and may not correctly match all intended files:
gdf = dgpd.read_parquet('/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs/*/*geoparquet', filesystem='arrow', columns=['LINKNO', 'geometry'])
Consider making the file path configurable and verify that the pattern matches all required files. Also, ensure that the use of wildcards (*
) is appropriate for the directory structure.
Apply this diff to use a configurable file path:
+import sys
+import argparse
+parser = argparse.ArgumentParser(description='Process global geometries.')
+parser.add_argument('--input_path', type=str, required=True, help='Path to input geoparquet files.')
+parser.add_argument('--output_file', type=str, default='global_streams_simplified.gpkg', help='Output file name.')
+args = parser.parse_args()
-gdf = dgpd.read_parquet('/Users/ricky/tdxhydro-postprocessing/test/rapid_inputs/*/*geoparquet', filesystem='arrow', columns=['LINKNO', 'geometry'])
+gdf = dgpd.read_parquet(args.input_path, filesystem='arrow', columns=['LINKNO', 'geometry'])
gdf['geometry'] = gdf['geometry'].simplify(0.001, preserve_topology=False)
-gdf.compute().to_file('test_global_streams_simplified_dask.gpkg', driver='GPKG')
+gdf.compute().to_file(args.output_file, driver='GPKG')
gdf = gpd.read_parquet(catchments_gpq) | ||
gdf, lake_gdf = ( | ||
rp | ||
.network | ||
.dissolve_catchments(tdx, gdf) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle Missing Catchments Data Gracefully
When reading the catchments GeoDataFrame and processing it with dissolve_catchments
, there may be cases where the catchment data is missing or empty. Add error handling to check if gdf
is empty before proceeding.
Apply this diff to add a check:
gdf = gpd.read_parquet(catchments_gpq)
+if gdf.empty:
+ logging.warning(f"No catchments data found for {tdxnumber}. Skipping.")
+ continue
gdf, lake_gdf = (
rp
.network
.dissolve_catchments(tdx, gdf)
)
Committable suggestion skipped: line range outside the PR's diff.
@@ -73,6 +73,7 @@ def _calculate_geodesic_length(line) -> float: | |||
]] | |||
|
|||
else: | |||
gdf['LINKNO'] = gdf['streamID'].astype(int) + (tdx_header_number * 10_000_000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Undefined Variable gdf
Similar to the previous issue, gdf
is undefined in this block. Ensure gdf
is read correctly.
Apply this diff to read gdf
:
else:
+ gdf = gpd.read_file(gpkg)
gdf['LINKNO'] = gdf['streamID'].astype(int) + (tdx_header_number * 10_000_000)
gdf = gdf.drop(columns=['streamID'])
Committable suggestion skipped: line range outside the PR's diff.
if 'streamnet' in os.path.basename(gpkg): | ||
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Undefined Variable gdf
The variable gdf
is used without being defined in this block. This will raise a NameError
.
Apply this diff to define gdf
before using it:
if 'streamnet' in os.path.basename(gpkg):
+ gdf = gpd.read_file(gpkg)
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000)
gdf['DSLINKNO'] = gdf['DSLINKNO'].astype(int)
gdf.loc[gdf['DSLINKNO'] != -1, 'DSLINKNO'] = gdf['DSLINKNO'] + (tdx_header_number * 10_000_000)
gdf['strmOrder'] = gdf['strmOrder'].astype(int)
gdf['LengthGeodesicMeters'] = gdf['geometry'].apply(_calculate_geodesic_length)
gdf['TDXHydroRegion'] = region_number
Ensure that gdf
is properly read from the GeoPackage file before processing.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000) | |
if 'streamnet' in os.path.basename(gpkg): | |
gdf = gpd.read_file(gpkg) | |
gdf['LINKNO'] = gdf['LINKNO'].astype(int) + (tdx_header_number * 10_000_000) | |
gdf['DSLINKNO'] = gdf['DSLINKNO'].astype(int) | |
gdf.loc[gdf['DSLINKNO'] != -1, 'DSLINKNO'] = gdf['DSLINKNO'] + (tdx_header_number * 10_000_000) | |
gdf['strmOrder'] = gdf['strmOrder'].astype(int) | |
gdf['LengthGeodesicMeters'] = gdf['geometry'].apply(_calculate_geodesic_length) | |
gdf['TDXHydroRegion'] = region_number |
🧰 Tools
🪛 Ruff (0.8.0)
56-56: Undefined name gdf
(F821)
56-56: Undefined name gdf
(F821)
tdxhydrorapid/weights.py
Outdated
def get_weight_table_name_from_grid(grid: Union[str, gpd.GeoDataFrame], warn=False) -> str: | ||
if isinstance(grid, str): | ||
gpq = gpd.read_parquet(grid) | ||
else: | ||
gpq = grid | ||
dxs = gpq['lon'].round(12).drop_duplicates().sort_values().diff().dropna().unique() | ||
if len(dxs) > 1: | ||
if warn: logger.warning(f'Multiple dx values found in {grid}, using median') | ||
dx = round(np.median(dys), 12) | ||
else: | ||
dx = round(dxs[0], 12) | ||
|
||
dys = gpq['lat'].round(12).drop_duplicates().sort_values().diff().dropna().unique() | ||
if len(dys) > 1: | ||
if warn: logger.warning(f'Multiple dy values found in {grid}, using median') | ||
dy = round(np.median(dys), 12) | ||
else: | ||
dy = round(dys[0], 12) | ||
|
||
xinit = gpq['lon'].round(12).min() | ||
yinit = gpq['lat'].round(12).min() | ||
|
||
return f"weight_xinit={xinit}_yinit={yinit}_dx={dx}_dy={dy}_.csv" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix undefined variable in get_weight_table_name_from_grid
There's a critical bug in the median calculation where dys
is used instead of dxs
.
Apply this diff to fix the bug:
if len(dxs) > 1:
if warn: logger.warning(f'Multiple dx values found in {grid}, using median')
- dx = round(np.median(dys), 12)
+ dx = round(np.median(dxs), 12)
else:
dx = round(dxs[0], 12)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_weight_table_name_from_grid(grid: Union[str, gpd.GeoDataFrame], warn=False) -> str: | |
if isinstance(grid, str): | |
gpq = gpd.read_parquet(grid) | |
else: | |
gpq = grid | |
dxs = gpq['lon'].round(12).drop_duplicates().sort_values().diff().dropna().unique() | |
if len(dxs) > 1: | |
if warn: logger.warning(f'Multiple dx values found in {grid}, using median') | |
dx = round(np.median(dys), 12) | |
else: | |
dx = round(dxs[0], 12) | |
dys = gpq['lat'].round(12).drop_duplicates().sort_values().diff().dropna().unique() | |
if len(dys) > 1: | |
if warn: logger.warning(f'Multiple dy values found in {grid}, using median') | |
dy = round(np.median(dys), 12) | |
else: | |
dy = round(dys[0], 12) | |
xinit = gpq['lon'].round(12).min() | |
yinit = gpq['lat'].round(12).min() | |
return f"weight_xinit={xinit}_yinit={yinit}_dx={dx}_dy={dy}_.csv" | |
def get_weight_table_name_from_grid(grid: Union[str, gpd.GeoDataFrame], warn=False) -> str: | |
if isinstance(grid, str): | |
gpq = gpd.read_parquet(grid) | |
else: | |
gpq = grid | |
dxs = gpq['lon'].round(12).drop_duplicates().sort_values().diff().dropna().unique() | |
if len(dxs) > 1: | |
if warn: logger.warning(f'Multiple dx values found in {grid}, using median') | |
dx = round(np.median(dxs), 12) | |
else: | |
dx = round(dxs[0], 12) | |
dys = gpq['lat'].round(12).drop_duplicates().sort_values().diff().dropna().unique() | |
if len(dys) > 1: | |
if warn: logger.warning(f'Multiple dy values found in {grid}, using median') | |
dy = round(np.median(dys), 12) | |
else: | |
dy = round(dys[0], 12) | |
xinit = gpq['lon'].round(12).min() | |
yinit = gpq['lat'].round(12).min() | |
return f"weight_xinit={xinit}_yinit={yinit}_dx={dx}_dy={dy}_.csv" |
🧰 Tools
🪛 Ruff (0.8.0)
302-305: Use ternary operator gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid
instead of if
-else
-block
Replace if
-else
-block with gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid
(SIM108)
308-308: Multiple statements on one line (colon)
(E701)
309-309: Undefined name dys
(F821)
315-315: Multiple statements on one line (colon)
(E701)
tdxhydrorapid/network.py
Outdated
def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, id_field: str = 'LINKNO') -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: | ||
headwater_dissolve_path = os.path.join(save_dir, 'mod_dissolve_headwater.csv') | ||
|
||
if os.path.exists(headwater_dissolve_path): | ||
with open(headwater_dissolve_path) as f: | ||
o2_to_dissolve = pd.read_csv(f).fillna(-1).astype(int) | ||
updates = {} | ||
for streams_to_merge in o2_to_dissolve.values: | ||
valid_streams = streams_to_merge[streams_to_merge != -1] | ||
updates.update({stream: valid_streams[0] for stream in valid_streams}) | ||
|
||
gdf[id_field] = gdf[id_field].map(updates).fillna(gdf[id_field]).astype(int) | ||
|
||
streams_to_prune_path = os.path.join(save_dir, 'mod_prune_streams.csv') | ||
if os.path.exists(streams_to_prune_path): | ||
with open(streams_to_prune_path) as f: | ||
ids_to_prune = pd.read_csv(f).astype(int).set_index('LINKTODROP')['LINKNO'].to_dict() | ||
gdf[id_field] = gdf[id_field].map(ids_to_prune).fillna(gdf[id_field]).astype(int) | ||
|
||
drop_streams_path = os.path.join(save_dir, 'mod_drop_small_trees.csv') | ||
if os.path.exists(drop_streams_path): | ||
with open(drop_streams_path) as f: | ||
ids_to_drop = pd.read_csv(f).astype(int) | ||
gdf = gdf[~gdf[id_field].isin(ids_to_drop.values.flatten())] | ||
|
||
|
||
short_streams_path = os.path.join(save_dir, 'mod_merge_short_streams.csv') | ||
if os.path.exists(short_streams_path): | ||
with open(short_streams_path) as f: | ||
short_streams = pd.read_csv(f).astype(int) | ||
for streams_to_merge in short_streams.values: | ||
gdf.loc[gdf[id_field].isin(streams_to_merge), id_field] = streams_to_merge[0] | ||
|
||
lake_streams_path = os.path.join(save_dir, 'mod_dissolve_lakes.json') | ||
lake_outlets = set() | ||
if os.path.exists(lake_streams_path): | ||
lake_streams_df = pd.read_json(lake_streams_path, orient='index', convert_axes=False, convert_dates=False) | ||
streams_to_delete = set() | ||
for outlet, _, lake_streams in lake_streams_df.itertuples(): | ||
outlet = int(outlet) | ||
lake_outlets.add(outlet) | ||
if outlet in gdf[id_field].values: | ||
gdf.loc[gdf[id_field].isin(lake_streams), id_field] = outlet | ||
else: | ||
streams_to_delete.update(lake_streams) | ||
|
||
gdf = gdf[~gdf[id_field].isin(streams_to_delete)] | ||
|
||
# dissolve the geometries based on shared value in the id field | ||
logging.info('\tDissolving catchments') | ||
dgdf: dd.DataFrame = dd.from_pandas(gdf, npartitions=os.cpu_count()*2) | ||
dgdf: dgpd.GeoDataFrame = dgpd.from_dask_dataframe(dgdf.shuffle(on=id_field)) | ||
gdf = dgdf.dissolve(by=id_field).compute().reset_index() | ||
if lake_outlets: | ||
lake_gdf = gdf[gdf[id_field].isin(lake_outlets)] | ||
else: | ||
lake_gdf = gpd.GeoDataFrame() | ||
return gdf, lake_gdf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for dissolve_catchments
The function should handle potential errors when reading files and processing data.
Apply this diff to add error handling:
def dissolve_catchments(save_dir: str, gdf: gpd.GeoDataFrame, id_field: str = 'LINKNO') -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
+ if not isinstance(gdf, gpd.GeoDataFrame):
+ raise TypeError("gdf must be a GeoDataFrame")
+ if id_field not in gdf.columns:
+ raise ValueError(f"id_field '{id_field}' not found in GeoDataFrame columns")
headwater_dissolve_path = os.path.join(save_dir, 'mod_dissolve_headwater.csv')
if os.path.exists(headwater_dissolve_path):
- with open(headwater_dissolve_path) as f:
- o2_to_dissolve = pd.read_csv(f).fillna(-1).astype(int)
+ try:
+ with open(headwater_dissolve_path) as f:
+ o2_to_dissolve = pd.read_csv(f).fillna(-1).astype(int)
+ except (IOError, pd.errors.EmptyDataError) as e:
+ logger.error(f"Error reading headwater dissolve file: {e}")
+ raise
Committable suggestion skipped: line range outside the PR's diff.
dissolve_lakes: bool = True, | ||
drop_islands: bool = True, | ||
drop_ocean_watersheds: bool = True, | ||
drop_within_sea: bool = True, | ||
drop_low_flow: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve documentation and parameter validation
The new parameters need better documentation and validation to ensure correct usage.
Apply this diff to improve the function:
def rapid_master_files(streams_gpq: str,
save_dir: str,
id_field: str = 'LINKNO',
ds_id_field: str = 'DSLINKNO',
length_field: str = 'Length',
default_velocity_factor: float = None,
default_x: float = .25,
drop_small_watersheds: bool = True,
dissolve_headwaters: bool = True,
prune_branches_from_main_stems: bool = True,
merge_short_streams: bool = True,
cache_geometry: bool = True,
dissolve_lakes: bool = True,
drop_islands: bool = True,
drop_ocean_watersheds: bool = True,
drop_within_sea: bool = True,
drop_low_flow: bool = False,
min_drainage_area_m2: float = 200_000_000,
min_headwater_stream_order: int = 3,
min_velocity_factor: float = 0.25,
min_k_value: int = 900,
lake_min_k: int = 3600, ) -> None:
"""
Create RAPID master files from a stream network
Args:
streams_gpq: Path to the streams geoparquet file
save_dir: Path to save the master files
id_field: Field name for the link id
ds_id_field: Field name for the downstream link id
length_field: Field name for stream segment length
default_velocity_factor: Default velocity factor for Muskingum routing
default_x: Default attenuation factor for Muskingum routing
drop_small_watersheds: Whether to drop small watersheds
dissolve_headwaters: Whether to dissolve headwater branches
prune_branches_from_main_stems: Whether to prune branches
merge_short_streams: Whether to merge short streams
cache_geometry: Whether to cache geometry
+ dissolve_lakes: Whether to dissolve lake geometries
+ drop_islands: Whether to remove island streams
+ drop_ocean_watersheds: Whether to remove ocean watersheds
+ drop_within_sea: Whether to remove streams within sea
+ drop_low_flow: Whether to remove low flow streams
min_drainage_area_m2: Minimum drainage area to keep
min_headwater_stream_order: Minimum stream order for headwaters
min_velocity_factor: Minimum velocity factor
min_k_value: Target minimum k value
lake_min_k: Minimum k value for lakes
Raises:
FileNotFoundError: If required files are missing
ValueError: If input parameters are invalid
"""
+ if not os.path.exists(streams_gpq):
+ raise FileNotFoundError(f"Streams file not found: {streams_gpq}")
+ if not os.path.exists(save_dir):
+ raise FileNotFoundError(f"Save directory not found: {save_dir}")
+ if min_drainage_area_m2 <= 0:
+ raise ValueError("min_drainage_area_m2 must be positive")
+ if min_headwater_stream_order < 1:
+ raise ValueError("min_headwater_stream_order must be at least 1")
Also applies to: 71-71, 94-94
tdxhydrorapid/inputs.py
Outdated
for river_id in gdf[gdf['LengthGeodesicMeters'] <= 0.01][id_field].values: | ||
feat = gdf[gdf[id_field] == river_id] | ||
upstreams = set(gdf[gdf[ds_id_field] == river_id][id_field]) | ||
if not feat[ds_id_field].values != -1 and all([x != -1 for x in upstreams]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect condition in create_nexus_points
The condition uses a confusing double negative that could lead to bugs.
Apply this diff to fix the logic:
- if not feat[ds_id_field].values != -1 and all([x != -1 for x in upstreams]):
+ if feat[ds_id_field].values == -1 and all([x != -1 for x in upstreams]):
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if not feat[ds_id_field].values != -1 and all([x != -1 for x in upstreams]): | |
if feat[ds_id_field].values == -1 and all([x != -1 for x in upstreams]): |
🧰 Tools
🪛 Ruff (0.8.0)
465-465: Use feat[ds_id_field].values == -1
instead of not feat[ds_id_field].values != -1
Replace with ==
operator
(SIM202)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (17)
tdxhydrorapid/inputs.py (6)
51-55
: Ensure robust validation for new boolean parameters.The newly introduced parameters (
dissolve_lakes
,drop_islands
,drop_ocean_watersheds
,drop_within_sea
,drop_low_flow
) provide granular control over stream processing. However, no checks confirm these parameters are used meaningfully (e.g., not mutually exclusive, or required in certain scenarios). Consider validating parameter combinations or providing better error messages for misconfigurations.
252-314
: Check edge cases for island removal.The island removal logic relies on multiple loops and conditions to determine valid outlet replacements. Edge cases could arise if an island merges with multiple outlets or if there are unresolved references to partially removed streams. Consider adding more defensive checks or a fallback scenario to handle unrecognized conditions.
🧰 Tools
🪛 Ruff (0.8.2)
301-301: Loop control variable
key
not used within loop body(B007)
388-389
: Clarify rationale formusk_x = 0.01
on lakes.Setting
musk_x
to a very low value provides substantial attenuation, but any edge-case scenarios where 0.01 is still too high/low? If you rely on hydrological data, a data-driven approach or an override parameter might be safer.
449-472
: Expand unit testing forcreate_nexus_points
.While generating nexus points for extremely short streams, edge cases involving unusual geometry or partial merges are possible. To prevent silent failures, ensure robust unit tests cover:
- Streams with multiple upstreams but no valid downstream.
- Streams bridging multiple confluences.
Would you like help creating or updating these tests?
624-662
: Consider factoring out repeated connectivity code.Both
rapid_input_csvs
andriver_route_inputs
build connectivity structures from the same graph approach. You might unify the logic in a shared helper function, reducing duplication.
663-699
: Gracefully handle missing VPU codes during concatenation.The fallback mechanism attempts to find a valid VPU for each terminal link. If a link still can’t be resolved, the process raises a
RuntimeError
. For better usability, consider providing more context (e.g., partial merges, suggested solutions) or partial fallback if feasible.environment.yml (2)
7-8
: Double-check ifgeopandas>=1.0
covers known bug fixes.Ensure there's no need for a higher pinned version to avoid potential geometry or coordinate reference system issues on large datasets.
10-10
: Add a comment aboutnetworkx
usage.Since
networkx
is newly added, consider documenting how or why it is used for future maintainers.run_1.1_lakes.py (3)
33-34
: Allow external configuration for hardcoded paths.Paths such as
gpq_dir
andsave_dir
are set in code, making them environment-specific. Providing overrides (e.g., CLI args or environment variables) can boost portability.
61-71
: Include fallback for empty lake intersections.You skip post-processing if no intersections are found, but partial or malformed files might produce false empties. Log potential mismatches to help debugging.
136-139
: Optimize assignment foroutlet_list
.A ternary operator or short guard clause can replace the
if not isinstance(...)
block to make code tidier and avoid potential confusion aboutoutlet
being a list.🧰 Tools
🪛 Ruff (0.8.2)
136-139: Use ternary operator
outlet_list = [outlet] if not isinstance(outlet, list) else outlet
instead ofif
-else
-blockReplace
if
-else
-block withoutlet_list = [outlet] if not isinstance(outlet, list) else outlet
(SIM108)
tdxhydrorapid/weights.py (2)
240-257
: Add error handling or logging for lake merges.If reading
mod_dissolve_lakes.json
results in unexpected structures, consider logging mismatches or partial merges to help diagnose.
295-295
: Safeguard merges when IDs are missing.The code only continues if both IDs exist. Some merges might remain partially applied, leaving orphan references. Consider logging or storing these partial merges for later inspection.
tdxhydrorapid/network.py (4)
23-24
: Elevate or removedissolve_catchments
if widely needed.If dissolving catchments is a core operation, consider moving it into a more central location or clarifying its usage.
36-37
: Validate presence of required columns.
create_directed_graphs
usesdf[df[ds_id_field] != -1]
, but no checks confirmds_id_field
indeed exists or that values are valid integers. A safeguard can prevent silent failures.
85-125
: Cascade merges infind_branches_to_prune
.Logic for merging sibling streams can become complex if multiple merges happen in chain. Track merges carefully to avoid losing references or skipping partial merges.
Would you like me to draft a solution that systematically updates merges across repeated passes?
🧰 Tools
🪛 Ruff (0.8.2)
92-92: Loop control variable
index
not used within loop bodyRename unused
index
to_index
(B007)
351-409
: Collect metrics on dissolved catchments for debugging.After dissolving catchments, it may help to record how many merges took place or how many streams got dropped. This data can assist in troubleshooting complex merges.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
tdxhydrorapid/network_data/lake_table.csv
is excluded by!**/*.csv
📒 Files selected for processing (7)
environment.yml
(1 hunks)run_1.1_lakes.py
(1 hunks)run_1_tdx_gpkg_to_geoparquet.py
(4 hunks)run_7_catchments.py
(1 hunks)tdxhydrorapid/inputs.py
(19 hunks)tdxhydrorapid/network.py
(8 hunks)tdxhydrorapid/weights.py
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- run_1_tdx_gpkg_to_geoparquet.py
- run_7_catchments.py
🧰 Additional context used
🪛 Ruff (0.8.2)
run_1.1_lakes.py
136-139: Use ternary operator outlet_list = [outlet] if not isinstance(outlet, list) else outlet
instead of if
-else
-block
Replace if
-else
-block with outlet_list = [outlet] if not isinstance(outlet, list) else outlet
(SIM108)
tdxhydrorapid/weights.py
306-309: Use ternary operator gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid
instead of if
-else
-block
Replace if
-else
-block with gpq = gpd.read_parquet(grid) if isinstance(grid, str) else grid
(SIM108)
312-312: Multiple statements on one line (colon)
(E701)
319-319: Multiple statements on one line (colon)
(E701)
tdxhydrorapid/network.py
92-92: Loop control variable index
not used within loop body
Rename unused index
to _index
(B007)
tdxhydrorapid/inputs.py
33-34: Undefined name riverconcat_tdxregions
in __all__
(F822)
211-214: Use ternary operator downstream = downstream[0] if downstream else -1
instead of if
-else
-block
Replace if
-else
-block with downstream = downstream[0] if downstream else -1
(SIM108)
301-301: Loop control variable key
not used within loop body
(B007)
539-539: Loop control variable idx
not used within loop body
Rename unused idx
to _idx
(B007)
🔇 Additional comments (9)
tdxhydrorapid/inputs.py (3)
354-375
: Re-evaluate the outlet distance update logic for lakes.This section adds the maximum straight-line distance to the outlet’s length. In some cases, merging multiple inlets with varied distances might cause underestimation or overestimation of the lake’s effective length. You might consider averaging or applying a more stable metric if multiple inlets differ significantly.
722-724
: Double-check the usage of new parametersuse_rapid
andvpu_boundaries
.The updated parameters control whether RAPID input files or alternative route files are generated. Validate that the code paths remain consistent if
vpu_boundaries
is missing or ifuse_rapid
toggles mid-run.
141-182
: Potential for partial updates tosgdf
when dissolving lakes.While updating stream connectivity for inlets and removing ancestral streams, complex networks can occur if multiple lakes share partial upstreams. Consider adding comprehensive checks or logging to detect partial merges or edge cases.
Run the following script to scan the codebase for references to potential multi-lake conflicts:
environment.yml (2)
6-7
: Confirm thatdask-geopandas
works with Python≥3.11.While these version constraints often suffice, confirm that
dask-geopandas
is fully compatible with Python 3.11 in your environment.
15-15
: Confirmpyogrio
usage.
pyogrio
can improve I/O performance, but ensure you’ve tested it thoroughly. If it’s not used heavily, you might avoid extra dependencies.run_1.1_lakes.py (3)
49-50
: Confirm the threshold forArea_PW > 3
.Filtering lakes by
Area_PW > 3
is presumably empirical. Re-verify the threshold if large but critical lakes are accidentally excluded.
81-85
: Recheck projection handling increate_directed_graphs
.This function relies on raw geometry but does not forcibly reproject. For large-scale or multi-CRS data, consider ensuring consistent coordinate reference systems before building graphs.
242-244
: Ensure partial merges handle multiple inlets.When multiple inlets converge on the same
outlet
, you update the stream references. Confirm that partial merges or overlapping references do not leave the data in an inconsistent state.tdxhydrorapid/weights.py (1)
229-230
: Confirm structured iteration foro2_to_dissolve
.You map each
streams_to_merge
array in a single comprehension, which can misalign if the DataFrame columns aren’t in the expected order. Double-check columns or consider a safer row-wise approach likeitertuples()
.
'river' | ||
'concat_tdxregions', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix missing comma in __all__
list.
There's a missing comma between 'river'
and 'concat_tdxregions'
, which can cause a syntax or name error.
- 'river'
- 'concat_tdxregions',
+ 'river',
+ 'concat_tdxregions',
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
'river' | |
'concat_tdxregions', | |
'river', | |
'concat_tdxregions', |
🧰 Tools
🪛 Ruff (0.8.2)
33-34: Undefined name riverconcat_tdxregions
in __all__
(F822)
def overlay(left: dgpd.GeoDataFrame, right: dgpd.GeoDataFrame) -> gpd.GeoDataFrame: | ||
""" | ||
Performs a spatial intersection overlay of two GeoDataFrames using Dask. | ||
Reproject geometries to a Cylindrical Equal Area projection. | ||
""" | ||
return gpd.GeoDataFrame( | ||
left.sjoin(right.assign(right_geometry=right.geometry)) | ||
.assign(geometry=lambda x: x.geometry.intersection(x.right_geometry).to_crs({'proj':'cea'})) | ||
.drop(columns="right_geometry") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refine overlay logic to avoid potential re-projection confusion.
Applying .to_crs({'proj': 'cea'})
directly in the lambda might lead to a partial or repeated re-projection if used further. Also consider re-projecting inputs strictly once before intersection for clarity.
Summary by CodeRabbit
New Features
Bug Fixes