From 01063bc164b59d6df653260fb6ed07c9649b2bde Mon Sep 17 00:00:00 2001 From: Andrei Rusu Date: Fri, 12 Jul 2024 13:38:11 +0200 Subject: [PATCH] Added export location functionality, fixed some issues with parallel processing, and added more logging --- oceanstream/cli/main.py | 126 ++++++++++++--- oceanstream/echodata/sv_computation.py | 10 +- oceanstream/echodata/sv_dataset_extension.py | 35 ++-- oceanstream/echodata/sv_interpolation.py | 19 ++- oceanstream/exports/__init__.py | 8 +- oceanstream/exports/csv/csv_export_from_Sv.py | 103 +++++++++++- oceanstream/plot/echogram.py | 26 +-- oceanstream/process/__init__.py | 7 +- oceanstream/process/aws/s3.py | 5 +- oceanstream/process/file_processor.py | 149 ++++++++++++----- oceanstream/process/folder_processor.py | 152 ++++++++++++++---- oceanstream/process/process.py | 6 +- oceanstream/process/utils.py | 69 +++++++- 13 files changed, 568 insertions(+), 147 deletions(-) diff --git a/oceanstream/cli/main.py b/oceanstream/cli/main.py index 2527f16..8aea9ad 100644 --- a/oceanstream/cli/main.py +++ b/oceanstream/cli/main.py @@ -1,9 +1,12 @@ +from asyncio import CancelledError + import typer import asyncio import os import logging import sys import warnings +import dask from pathlib import Path from rich import print @@ -11,7 +14,7 @@ from oceanstream.settings import load_config from dask.distributed import LocalCluster, Client, Variable from rich.console import Console - +from oceanstream.process import compute_and_export_single_file, process_zarr_files install(show_locals=False, width=120) @@ -30,6 +33,11 @@ logging.basicConfig(level="ERROR", format='%(asctime)s - %(levelname)s - %(message)s') +dask.config.set({ + 'distributed.comm.timeouts.connect': '60s', # Increase the connection timeout + 'distributed.comm.timeouts.tcp': '120s', # Increase the TCP timeout + 'distributed.comm.retry.count': 0 +}) def initialize(settings, file_path, log_level=None): config_data = load_config(settings["config"]) @@ -115,7 +123,8 @@ def convert( if filePath.is_file(): from oceanstream.process import convert_raw_file convert_raw_file(filePath, configData) - print(f"[blue]✅ Converted raw file {source} to Zarr and wrote output to: {configData['output_folder']} [/blue]") + print( + f"[blue]✅ Converted raw file {source} to Zarr and wrote output to: {configData['output_folder']} [/blue]") elif filePath.is_dir(): from oceanstream.process import convert_raw_files convert_raw_files(configData, workers_count=workers_count) @@ -193,7 +202,8 @@ def compute_sv( sonar_model: str = typer.Option(DEFAULT_SONAR_MODEL, help="Sonar model used to collect the data", show_choices=["AZFP", "EK60", "ES70", "EK80", "ES80", "EA640", "AD2CP"]), plot_echogram: bool = typer.Option(False, help="Plot the echogram after processing"), - use_dask: bool = typer.Option(False, help="Start a Local Dask cluster for parallel processing (always enabled for multiple files)"), + use_dask: bool = typer.Option(False, + help="Start a Local Dask cluster for parallel processing (always enabled for multiple files)"), depth_offset: float = typer.Option(0.0, help="Depth offset for the echogram plot"), waveform_mode: str = typer.Option("CW", help="Waveform mode, can be either CW or BB", show_choices=["CW", "BB"]), @@ -209,6 +219,87 @@ def compute_sv( "sonar_model": sonar_model, "output_folder": output or DEFAULT_OUTPUT_FOLDER } + dask.config.set({'distributed.comm.retry.count': 1}) + + file_path = Path(source) + config_data = initialize(settings_dict, file_path, log_level=log_level) + single_file = file_path.is_dir() and source.endswith(".zarr") + + client = None + cluster = None + + if use_dask or not single_file: + cluster = LocalCluster(n_workers=workers_count, threads_per_worker=1) + client = Client(cluster) + + try: + if file_path.is_dir() and source.endswith(".zarr"): + console = Console() + with console.status("Processing...", spinner="dots") as status: + status.start() + status.update( + f"[blue] Computing Sv for {file_path}...[/blue]" + use_dask * "– navigate to " + "http://localhost:8787/status for " + "progress") + + chunks = None + if use_dask: + chunks = config_data.get('base_chunk_sizes') + + compute_and_export_single_file(config_data, + chunks=chunks, + plot_echogram=plot_echogram, + waveform_mode=waveform_mode, + depth_offset=depth_offset) + + status.stop() + print("✅ The file have been processed successfully.") + elif file_path.is_dir(): + print(f"Dashboard available at {client.dashboard_link}") + process_zarr_files(config_data, + client, + workers_count=workers_count, + chunks=config_data.get('base_chunk_sizes'), + plot_echogram=plot_echogram, + waveform_mode=waveform_mode, + depth_offset=depth_offset) + else: + print(f"[red]❌ The provided path '{source}' is not a valid Zarr root.[/red]") + sys.exit(1) + except KeyboardInterrupt: + logging.info("KeyboardInterrupt received, terminating processes...") + except Exception as e: + logging.exception("Error while processing %s", config_data['raw_path']) + print(Traceback()) + finally: + if client is not None: + client.close() + + if cluster is not None: + cluster.close() + + +@app.command() +def export_location( + source: str = typer.Option(..., help="Path to a Zarr root file or a directory containing Zarr files"), + output: str = typer.Option(None, + help="Destination path for saving the exported data. Defaults to a predefined " + "directory if not specified."), + workers_count: int = typer.Option(os.cpu_count(), help="Number of CPU workers to use for parallel processing"), + use_dask: bool = typer.Option(False, + help="Start a Local Dask cluster for parallel processing (always enabled for " + "multiple files)"), + config: str = typer.Option(None, help="Path to a configuration file"), + log_level: str = typer.Option("WARNING", help="Set the logging level", + show_choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) +): + """ + Given a Zarr dataset containing Sv data, exports the GPS location data to a JSON file. + """ + settings_dict = { + "config": config, + "output_folder": output or DEFAULT_OUTPUT_FOLDER + } file_path = Path(source) config_data = initialize(settings_dict, file_path, log_level=log_level) @@ -225,27 +316,17 @@ def compute_sv( if file_path.is_dir() and source.endswith(".zarr"): status.update( f"[blue] Computing Sv for {file_path}...[/blue] – navigate to http://localhost:8787/status for progress") - from oceanstream.process import compute_single_file - - compute_single_file(config_data, - chunks=config_data.get('base_chunk_sizes'), - plot_echogram=plot_echogram, - waveform_mode=waveform_mode, - depth_offset=depth_offset) + # TODO: Implement export_location_json elif file_path.is_dir(): status.update( f"[blue] Processing zarr files in {file_path}...[/blue] – navigate to " f"http://localhost:8787/status for progress") - from oceanstream.process import process_zarr_files - processed_count_var = Variable('processed_count', client) - process_zarr_files(config_data, - workers_count=workers_count, - status=status, - chunks=config_data.get('base_chunk_sizes'), - plot_echogram=plot_echogram, - waveform_mode=waveform_mode, - processed_count_var=processed_count_var, - depth_offset=depth_offset) + from oceanstream.process import export_location_from_zarr_files + + export_location_from_zarr_files(config_data, + workers_count=workers_count, + client=client, + chunks=config_data.get('base_chunk_sizes')) else: print(f"[red]❌ The provided path '{source}' is not a valid Zarr root.[/red]") sys.exit(1) @@ -261,11 +342,6 @@ def compute_sv( status.stop() -@app.command() -def export(): - typer.echo("Export data...") - - def main(): print(BANNER) warnings.filterwarnings("ignore", category=UserWarning) diff --git a/oceanstream/echodata/sv_computation.py b/oceanstream/echodata/sv_computation.py index 903986f..06dae9a 100644 --- a/oceanstream/echodata/sv_computation.py +++ b/oceanstream/echodata/sv_computation.py @@ -93,6 +93,7 @@ def compute_sv(echodata: EchoData, **kwargs) -> xr.Dataset: raise ValueError(str(e)) # Check if the sonar model is supported sonar_model = echodata.sonar_model + try: SupportedSonarModelsForSv(sonar_model) except ValueError: @@ -103,6 +104,7 @@ def compute_sv(echodata: EchoData, **kwargs) -> xr.Dataset: {list(SupportedSonarModelsForSv)}." ) # Compute Sv + print(f"Computing Sv for {sonar_model} sonar model...", kwargs) Sv = ep.calibrate.compute_Sv(echodata, **kwargs) # Check if the computed Sv is empty if Sv["Sv"].values.size == 0: @@ -113,6 +115,12 @@ def compute_sv(echodata: EchoData, **kwargs) -> xr.Dataset: def compute_sv_with_encode_mode( echodata: EchoData, waveform_mode: str = "CW", encode_mode: str = "power" ) -> xr.Dataset: - sv_dataset = compute_sv(echodata, waveform_mode=waveform_mode, encode_mode=encode_mode) + try: + sv_dataset = compute_sv(echodata, waveform_mode=waveform_mode, encode_mode=encode_mode) + except Exception as e: + print(f"\n--------Error computing Sv with encode mode: {e}--------\n") + import traceback + traceback.print_exc() + raise e return sv_dataset diff --git a/oceanstream/echodata/sv_dataset_extension.py b/oceanstream/echodata/sv_dataset_extension.py index e8506d8..54de663 100644 --- a/oceanstream/echodata/sv_dataset_extension.py +++ b/oceanstream/echodata/sv_dataset_extension.py @@ -22,13 +22,13 @@ """ import warnings import numpy as np +import xarray as xr from echopype.consolidate import add_location, add_splitbeam_angle -from echopype.echodata.echodata import EchoData -from xarray import Dataset +from echopype.echodata import EchoData -def enrich_sv_dataset(sv: Dataset, echodata: EchoData, **kwargs) -> Dataset: +def enrich_sv_dataset(sv: xr.Dataset, echodata: EchoData, **kwargs) -> xr.Dataset: """ Enhances the input `sv` dataset by adding depth, location, and split-beam angle information. @@ -58,28 +58,26 @@ def enrich_sv_dataset(sv: Dataset, echodata: EchoData, **kwargs) -> Dataset: "return_dataset", ] splitbeam_args = {k: kwargs[k] for k in splitbeam_keys if k in kwargs} - enriched_sv = sv try: - add_depth(enriched_sv, **depth_args) + add_depth(sv, **depth_args) except Exception as e: warnings.warn(f"Failed to add depth due to error: {str(e)}") - # try: - # enriched_sv = add_location(enriched_sv, echodata, **location_args) - # except Exception as e: - # warnings.warn(f"Failed to add location due to error: {str(e)}") + try: + sv = add_location(sv, echodata, **location_args) + except Exception as e: + warnings.warn(f"Failed to add location due to error: {str(e)}") - # try: - # add_splitbeam_angle(enriched_sv, echodata, **splitbeam_args) - # except Exception as e: - # warnings.warn(f"Failed to add split-beam angle due to error: {str(e)}") - # traceback.print_exc() + try: + add_splitbeam_angle(sv, echodata, **splitbeam_args) + except Exception as e: + warnings.warn(f"Failed to add split-beam angle due to error: {str(e)}") - return enriched_sv + return sv -def add_depth(Sv: Dataset, depth_offset: float = 0, tilt: float = 0, downward: bool = True): +def add_depth(Sv: xr.Dataset, depth_offset: float = 0, tilt: float = 0, downward: bool = True): """ Given an existing Sv dataset, it adds a data variable called depth containing the depth of each ping. @@ -101,11 +99,14 @@ def add_depth(Sv: Dataset, depth_offset: float = 0, tilt: float = 0, downward: b selected_echo_range = selected_echo_range.values.tolist() selected_echo_range = [mult * value * np.cos(tilt / 180 * np.pi) + depth_offset for value in selected_echo_range] Sv = Sv.assign_coords(range_sample=selected_echo_range) + min_val = np.nanmin(selected_echo_range) + max_val = np.nanmax(selected_echo_range) + Sv = Sv.sel(range_sample=slice(min_val, max_val)) return Sv -def add_seabed_depth(Sv: Dataset): +def add_seabed_depth(Sv: xr.Dataset): """ Given an existing Sv dataset with a seabed mask attached, it adds a data variable called seabed depth containing the location of the seabed on diff --git a/oceanstream/echodata/sv_interpolation.py b/oceanstream/echodata/sv_interpolation.py index 9df852a..07972c9 100644 --- a/oceanstream/echodata/sv_interpolation.py +++ b/oceanstream/echodata/sv_interpolation.py @@ -26,7 +26,7 @@ def linear_to_db(linear: xr.DataArray) -> xr.DataArray: def interpolate_sv( - sv: Union[xr.Dataset, str, Path], method: str = "linear", with_edge_fill: bool = False + ds_Sv: xr.Dataset, method: str = "linear", with_edge_fill: bool = False ) -> xr.Dataset: """ Apply masks to the Sv DataArray in the dataset and interpolate over the resulting NaN values. @@ -43,23 +43,22 @@ def interpolate_sv( >> interpolate_sv(Sv, method) Expected Output """ - # Load the dataset - if isinstance(sv, xr.Dataset): - dataset = sv - # Initialize an empty list to store the processed channels processed_channels = [] # Loop over each channel - for channel in sv_dataarray["channel"]: - channel_data = sv_dataarray.sel(channel=channel) + for channel in ds_Sv.Sv["channel"]: + channel_data = ds_Sv.Sv.sel(channel=channel) # Convert from dB to linear scale channel_data_linear = db_to_linear(channel_data) # Perform interpolation to fill NaN values in linear scale using Xarray's interpolate_na interpolated_channel_data_linear = channel_data_linear.interpolate_na( - dim="ping_time", method=method, use_coordinate=True + dim="ping_time", + method=method, + use_coordinate=True, + dask_gufunc_kwargs={"allow_rechunk": True}, ) if with_edge_fill: @@ -76,9 +75,9 @@ def interpolate_sv( interpolated_sv = xr.concat(processed_channels, dim="channel") # Update the Sv DataArray in the dataset with the interpolated values - dataset["Sv"] = interpolated_sv + ds_Sv["Sv"] = interpolated_sv - return dataset + return ds_Sv def find_impacted_variables( diff --git a/oceanstream/exports/__init__.py b/oceanstream/exports/__init__.py index 2bfa9fe..7c9f421 100644 --- a/oceanstream/exports/__init__.py +++ b/oceanstream/exports/__init__.py @@ -1,2 +1,6 @@ -from .nasc import compute_and_write_nasc -from .shoals.shoals_handler import get_shoals_list, write_csv +from .csv.csv_export_from_Sv import create_location, export_location_json + +__all__ = [ + "create_location", + "export_location_json" +] \ No newline at end of file diff --git a/oceanstream/exports/csv/csv_export_from_Sv.py b/oceanstream/exports/csv/csv_export_from_Sv.py index 07b60c6..4c6cab2 100644 --- a/oceanstream/exports/csv/csv_export_from_Sv.py +++ b/oceanstream/exports/csv/csv_export_from_Sv.py @@ -1,18 +1,59 @@ +import json import os - +import numpy as np import pandas as pd import xarray as xr +import tempfile from haversine import haversine +from scipy.signal import savgol_filter + + +def ramer_douglas_peucker(points, epsilon): + if len(points) < 3: + return points + + def get_perpendicular_distance(point, line_start, line_end): + if np.allclose(line_start, line_end): + return np.linalg.norm(point - line_start) + + line_vec = line_end - line_start + point_vec = point - line_start + line_len = np.linalg.norm(line_vec) + line_unitvec = line_vec / line_len + point_vec_scaled = point_vec / line_len + t = np.dot(line_unitvec, point_vec_scaled) + t = np.clip(t, 0, 1) + nearest = line_start + t * line_vec + return np.linalg.norm(point - nearest) + + max_distance = 0 + index = 0 + for i in range(1, len(points) - 1): + distance = get_perpendicular_distance(points[i], points[0], points[-1]) + if distance > max_distance: + index = i + max_distance = distance + if max_distance > epsilon: + left_points = ramer_douglas_peucker(points[:index + 1], epsilon) + right_points = ramer_douglas_peucker(points[index:], epsilon) + return np.vstack((left_points[:-1], right_points)) -def create_location(data: xr.Dataset) -> pd.DataFrame: + return np.vstack((points[0], points[-1])) + + +def create_location(data: xr.Dataset, epsilon=0.00001, min_distance=0.01) -> pd.DataFrame: """ Given a processed Sv file, enriched with lat/lon, it returns location data: - lat,lon,time, speed + lat, lon, time, speed Parameters: - data: xr.Dataset The raw data to extract information from. + - epsilon: float + The epsilon parameter for the Ramer-Douglas-Peucker algorithm. + - min_distance: float + The minimum distance between points after thinning (in nautical miles). Returns: - pd.DataFrame @@ -23,6 +64,22 @@ def create_location(data: xr.Dataset) -> pd.DataFrame: ).to_dataframe() df["dt"] = data.coords["ping_time"] df.columns = ["lat", "lon", "dt"] + + # Filter out rows with null lat/lon values and invalid lat/lon ranges + df = df.dropna(subset=["lat", "lon"]) + df = df[(df["lat"] >= -90) & (df["lat"] <= 90) & (df["lon"] >= -180) & (df["lon"] <= 180)] + + if df.empty: + return pd.DataFrame(columns=["lat", "lon", "dt", "knt"]) + + # Apply smoothing to lat/lon to reduce noise + window_size = min(11, len(df)) # Window size for smoothing filter + poly_order = 2 # Polynomial order for smoothing filter + + if len(df) > window_size: + df["lat"] = savgol_filter(df["lat"], window_size, poly_order) + df["lon"] = savgol_filter(df["lon"], window_size, poly_order) + df["distance"] = [ haversine( (df["lat"].iloc[i], df["lon"].iloc[i]), @@ -37,7 +94,32 @@ def create_location(data: xr.Dataset) -> pd.DataFrame: df["knt"] = (df["distance"] / df["time_interval"].dt.total_seconds()) * 3600 df = df[["lat", "lon", "dt", "knt"]] - return df + # Additional check for outliers based on unrealistic speed + df = df[df["knt"] < 100] # Removing points with speeds higher than 100 knots + + # Apply Ramer-Douglas-Peucker algorithm for thinning coordinates + points = df[["lat", "lon"]].values + thinned_points = ramer_douglas_peucker(points, epsilon) + + # Create a thinned DataFrame + thinned_df = pd.DataFrame(thinned_points, columns=["lat", "lon"]) + + # Ensure the datetime and speed values are correctly associated + thinned_df["dt"] = thinned_df.apply( + lambda row: df.loc[(df["lat"] == row["lat"]) & (df["lon"] == row["lon"]), "dt"].values[0], axis=1) + thinned_df["knt"] = thinned_df.apply( + lambda row: df.loc[(df["lat"] == row["lat"]) & (df["lon"] == row["lon"]), "knt"].values[0], axis=1) + + # Further thin by minimum distance + final_points = [thinned_df.iloc[0]] + for i in range(1, len(thinned_df)): + if haversine((final_points[-1]["lat"], final_points[-1]["lon"]), + (thinned_df.iloc[i]["lat"], thinned_df.iloc[i]["lon"]), unit="nmi") >= min_distance: + final_points.append(thinned_df.iloc[i]) + + final_df = pd.DataFrame(final_points) + + return final_df def create_Sv(data: xr.Dataset, channel: str) -> pd.DataFrame: @@ -79,7 +161,7 @@ def export_Sv_csv(data: xr.Dataset, folder: str, root_name: str): - None """ location = create_location(data) - Sv = create_Sv(data, data["channel"][1]) # for R Shiny compat reasons + Sv = create_Sv(data, data["channel"][1]) location_filename = os.path.join(folder, root_name + "_GPS.csv") Sv_filename = os.path.join(folder, root_name + "_Sv_38000.0.csv") try: @@ -87,3 +169,14 @@ def export_Sv_csv(data: xr.Dataset, folder: str, root_name: str): Sv.to_csv(Sv_filename) except Exception as e: raise ValueError(str(e)) + + +def export_location_json(data: xr.Dataset): + location = create_location(data) + with tempfile.NamedTemporaryFile(delete=False, suffix='.json') as temp_file: + location.to_json(temp_file.name, orient="records") + temp_file_path = temp_file.name + with open(temp_file_path, 'r') as file: + gps_data = json.load(file) + os.remove(temp_file_path) + return gps_data diff --git a/oceanstream/plot/echogram.py b/oceanstream/plot/echogram.py index 83dca6a..ed47178 100644 --- a/oceanstream/plot/echogram.py +++ b/oceanstream/plot/echogram.py @@ -295,8 +295,6 @@ def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_na """Plot and save echogram for a single channel with optional regions and enhancements.""" full_channel_name = ds_Sv.channel.values[channel] channel_name = "_".join(full_channel_name.split()[:3]) - - plt.figure(figsize=(30, 18)) ds = ds_Sv filtered_ds = ds['Sv'] @@ -305,9 +303,12 @@ def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_na if 'channel' in filtered_ds.dims: filtered_ds = filtered_ds.assign_coords({'frequency': ds.frequency_nominal}) - filtered_ds = filtered_ds.swap_dims({'channel': 'frequency'}) - if filtered_ds.frequency.size == 1: - filtered_ds = filtered_ds.isel(frequency=0) + try: + filtered_ds = filtered_ds.swap_dims({'channel': 'frequency'}) + if filtered_ds.frequency.size == 1: + filtered_ds = filtered_ds.isel(frequency=0) + except Exception as e: + print(f"Error in swapping dims while plotting echogram: {e}") # Update axis labels filtered_ds['ping_time'].attrs = { @@ -323,7 +324,7 @@ def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_na filtered_ds = filtered_ds.dropna(dim='ping_time', how='all') filtered_ds = filtered_ds.dropna(dim='range_sample', how='all') - # Create the echogram using xarray's plot method + plt.figure(figsize=(30, 18)) filtered_ds.isel(frequency=channel).T.plot( x='ping_time', y='range_sample', @@ -353,6 +354,8 @@ def plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_na plt.savefig(echogram_output_path, dpi=300, bbox_inches='tight') plt.close() + return echogram_output_path + def plot_sv_data_parallel(ds_Sv, file_base_name=None, output_path=None, cmap=None, client=None): """Plot the echogram data and the regions.""" @@ -375,14 +378,17 @@ def plot_sv_data(ds_Sv, file_base_name=None, output_path=None, echogram_path=Non if not plt.isinteractive(): plt.switch_backend('Agg') + echogram_files = [] for channel in range(ds_Sv.dims['channel']): - plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_name, echogram_path=echogram_path, - config_data=config_data, - cmap='ocean_r') + echogram_file_path = plot_individual_channel_simplified(ds_Sv, channel, output_path, file_base_name, + echogram_path=echogram_path, + config_data=config_data, + cmap='ocean_r') + echogram_files.append(echogram_file_path) # plot_individual_channel_image_only(ds_Sv, channel, output_path, file_base_name, cmap) # plot_individual_channel_shaders(ds_Sv=ds_Sv, channel=channel, output_path=output_path, # file_base_name=file_base_name, cmap='ocean_r') - + return echogram_files async def plot_sv_data_with_progress(ds_Sv, file_base_name=None, output_path=None, progress=None, plot_task=None, cmap='viridis', diff --git a/oceanstream/process/__init__.py b/oceanstream/process/__init__.py index 6a30dd4..46187d9 100644 --- a/oceanstream/process/__init__.py +++ b/oceanstream/process/__init__.py @@ -1,8 +1,9 @@ from .process import compute_sv, process_file_with_progress, read_file_with_progress from .processed_data_io import read_processed, write_processed from .combine_zarr import combine_zarr_files, read_zarr_files -from .file_processor import process_raw_file_with_progress, compute_Sv_to_zarr, convert_raw_file, compute_single_file -from .folder_processor import convert_raw_files, process_zarr_files +from .file_processor import process_raw_file_with_progress, compute_Sv_to_zarr, convert_raw_file, compute_single_file, \ + compute_and_export_single_file +from .folder_processor import convert_raw_files, process_zarr_files, export_location_from_zarr_files __all__ = [ "compute_sv", @@ -10,9 +11,11 @@ "process_raw_file_with_progress", "convert_raw_file", "process_zarr_files", + "export_location_from_zarr_files", "compute_Sv_to_zarr", "convert_raw_files", "compute_single_file", + "compute_and_export_single_file", "read_processed", "write_processed", "read_file_with_progress", diff --git a/oceanstream/process/aws/s3.py b/oceanstream/process/aws/s3.py index 7dd6f97..22d103a 100644 --- a/oceanstream/process/aws/s3.py +++ b/oceanstream/process/aws/s3.py @@ -2,6 +2,7 @@ import tempfile import botocore import boto3 +from pathlib import Path from asyncio import CancelledError from rich import print from rich.traceback import install, Traceback @@ -76,8 +77,8 @@ def list_raw_files_from_bucket(bucket_name, prefix): def download_file_from_bucket(bucket_name, s3_key, local_dir): """Download a file from S3.""" s3_client = boto3.client('s3', config=Config(signature_version=botocore.UNSIGNED)) - local_path = os.path.join(local_dir, os.path.basename(s3_key)) - s3_client.download_file(bucket_name, s3_key, local_path) + local_path = Path(local_dir) / os.path.basename(s3_key) + s3_client.download_file(bucket_name, s3_key, str(local_path)) return local_path diff --git a/oceanstream/process/file_processor.py b/oceanstream/process/file_processor.py index 5ad0cad..495431a 100644 --- a/oceanstream/process/file_processor.py +++ b/oceanstream/process/file_processor.py @@ -1,10 +1,15 @@ +import json import logging import os import sys import time import traceback +import shutil +from asyncio import CancelledError + import echopype as ep +import xarray as xr from pathlib import Path from rich import print from rich.traceback import install, Traceback @@ -13,14 +18,87 @@ from oceanstream.plot import plot_sv_data_with_progress, plot_sv_data from oceanstream.echodata import get_campaign_metadata, read_file +from oceanstream.denoise import apply_background_noise_removal +from oceanstream.exports import export_location_json +from .utils import save_output_data, append_gps_data from .process import compute_sv, process_file_with_progress, read_file_with_progress -install(show_locals=True, width=120) +install(show_locals=False, width=120) -def get_chunk_sizes(var_dims, chunk_sizes): - return {dim: chunk_sizes[dim] for dim in var_dims if dim in chunk_sizes} +def compute_single_file(config_data, **kwargs): + file_path = config_data["raw_path"] + start_time = time.time() + chunks = kwargs.get("chunks") + echodata = ep.open_converted(file_path, chunks=chunks) + + try: + output_path = compute_Sv_to_zarr(echodata, config_data, **kwargs) + print(f"[blue]✅ Computed Sv and saved to: {output_path}[/blue]") + except Exception as e: + logging.error(f"Error computing Sv for {file_path}") + print(Traceback()) + finally: + end_time = time.time() + total_time = end_time - start_time + print(f"Total time taken: {total_time:.2f} seconds") + + +def compute_and_export_single_file(config_data, **kwargs): + file_path = config_data["raw_path"] + chunks = kwargs.get("chunks") + echodata = ep.open_converted(file_path, chunks=chunks) + print(f"File {file_path} opened successfully.") + + try: + file_data = [] + output_path, ds_processed, echogram_files = compute_Sv_to_zarr(echodata, config_data, **kwargs) + gps_data = export_location_json(ds_processed) + + output_message = { + "filename": str(output_path), # Convert PosixPath to string + "file_npings": len(ds_processed["ping_time"].values), + "file_nsamples": len(ds_processed["range_sample"].values), + "file_start_time": str(ds_processed["ping_time"].values[0]), + "file_end_time": str(ds_processed["ping_time"].values[-1]), + "file_freqs": ",".join(map(str, ds_processed["frequency_nominal"].values)), + "file_start_depth": str(ds_processed["range_sample"].values[0]), + "file_end_depth": str(ds_processed["range_sample"].values[-1]), + "file_start_lat": echodata["Platform"]["latitude"].values[0], + "file_start_lon": echodata["Platform"]["longitude"].values[0], + "file_end_lat": echodata["Platform"]["latitude"].values[-1], + "file_end_lon": echodata["Platform"]["longitude"].values[-1], + "echogram_files": echogram_files + } + file_data.append(output_message) + + json_file_path = Path(config_data["output_folder"]) / "output_data.json" + gps_json_file_path = Path(config_data["output_folder"]) / "gps_data.json" + + save_output_data(output_message, json_file_path) + append_gps_data(gps_data, gps_json_file_path, str(output_path)) + + echogram_folder = Path(config_data["output_folder"]) / "echograms" + echogram_folder.mkdir(parents=True, exist_ok=True) + + for png_file in Path(output_path).glob("*.png"): + shutil.copy(png_file, echogram_folder / png_file.name) + except CancelledError as e: + logging.info("CancelledError received, terminating processes...") + except (ValueError, RuntimeError) as e: + logging.error(f"Error computing Sv for {file_path}") + print(Traceback()) + + +def export_location_from_Sv_dataset(store, config_data, chunks=None): + dataset = xr.open_zarr(store, chunks=chunks, consolidated=True) + file_name = Path(store).stem + file_name = file_name.replace("_Sv", "") + gps_data = export_location_json(dataset) + + gps_json_file_path = Path(config_data["output_folder"]) / f"gps_data_{file_name}.json" + append_gps_data(gps_data, gps_json_file_path, file_name) def compute_Sv_to_zarr(echodata, config_data, base_path=None, chunks=None, plot_echogram=False, **kwargs): @@ -42,6 +120,13 @@ def compute_Sv_to_zarr(echodata, config_data, base_path=None, chunks=None, plot_ encode_mode = waveform_mode == "CW" and "power" or "complex" Sv = compute_sv(echodata, encode_mode=encode_mode, **kwargs) + ds_processed = Sv + # ds_processed = apply_background_noise_removal(Sv, config=config_data) + + output_path = None + file_base_name = None + file_path = None + if config_data.get('raw_path') is not None: file_path = config_data["raw_path"] file_base_name = file_path.stem @@ -62,43 +147,30 @@ def compute_Sv_to_zarr(echodata, config_data, base_path=None, chunks=None, plot_ else: zarr_path = base_path zarr_file_name = f"{zarr_path}_Sv.zarr" - output_path = None - file_base_name = None - file_path = None echogram_path = zarr_path if chunks is not None: for var in Sv.data_vars: - var_chunk_sizes = get_chunk_sizes(Sv[var].dims, chunks) + var_chunk_sizes = _get_chunk_sizes(Sv[var].dims, chunks) Sv[var] = Sv[var].chunk(var_chunk_sizes) # Remove chunk encoding to avoid conflicts if 'chunks' in Sv[var].encoding: del Sv[var].encoding['chunks'] - - ds_processed = Sv - - # ds_processed = apply_background_noise_removal(Sv, config=config_data) + print(f"\nWriting zarr file to: {output_path}/{zarr_file_name}") write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data, output_path) + echogram_files = None if plot_echogram: try: - plot_sv_data(ds_processed, file_base_name=file_base_name, output_path=output_path, - echogram_path=echogram_path, config_data=config_data) + echogram_files = plot_sv_data(ds_processed, file_base_name=file_base_name, output_path=output_path, + echogram_path=echogram_path, config_data=config_data) except Exception as e: - logging.exception(f"Error plotting echogram for {file_path}:") - raise e + logging.error(f"Error plotting echogram for {file_path}: {e}") - return output_path + return output_path, ds_processed, echogram_files -def write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data=None, output_path=None): - if 'cloud_storage' in config_data: - store = get_chunk_store(config_data['cloud_storage'], Path(zarr_path) / zarr_file_name) - else: - store = os.path.join(output_path, zarr_file_name) - - ds_processed.to_zarr(store, mode='w') - +#################################################################################################### async def process_raw_file_with_progress(config_data, plot_echogram, waveform_mode="CW", depth_offset=0): try: @@ -163,7 +235,7 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None if 'cloud_storage' in config_data: file_name = file_path_obj.stem + ".zarr" - store = get_chunk_store(config_data['cloud_storage'], Path(relative_path) / file_name) + store = _get_chunk_store(config_data['cloud_storage'], Path(relative_path) / file_name) echodata.to_zarr(save_path=store, overwrite=True, parallel=False) else: output_path = Path(config_data["output_folder"]) / relative_path @@ -177,25 +249,16 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None print(Traceback()) -def compute_single_file(config_data, **kwargs): - file_path = config_data["raw_path"] - start_time = time.time() - chunks = kwargs.get("chunks") - echodata = ep.open_converted(file_path, chunks=chunks) +def write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data=None, output_path=None): + if 'cloud_storage' in config_data: + store = _get_chunk_store(config_data['cloud_storage'], Path(zarr_path) / zarr_file_name) + else: + store = os.path.join(output_path, zarr_file_name) - try: - output_path = compute_Sv_to_zarr(echodata, config_data, **kwargs) - print(f"[blue]✅ Computed Sv and saved to: {output_path}[/blue]") - except Exception as e: - logging.error(f"Error computing Sv for {file_path}") - print(Traceback()) - finally: - end_time = time.time() - total_time = end_time - start_time - print(f"Total time taken: {total_time:.2f} seconds") + ds_processed.to_zarr(store, mode='w') -def get_chunk_store(storage_config, path): +def _get_chunk_store(storage_config, path): if storage_config['storage_type'] == 'azure': from adlfs import AzureBlobFileSystem azfs = AzureBlobFileSystem(**storage_config['storage_options']) @@ -204,3 +267,7 @@ def get_chunk_store(storage_config, path): else: raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}") + + +def _get_chunk_sizes(var_dims, chunk_sizes): + return {dim: chunk_sizes[dim] for dim in var_dims if dim in chunk_sizes} diff --git a/oceanstream/process/folder_processor.py b/oceanstream/process/folder_processor.py index cbec95a..cbe1714 100644 --- a/oceanstream/process/folder_processor.py +++ b/oceanstream/process/folder_processor.py @@ -1,3 +1,4 @@ +import json import os import logging import time @@ -5,9 +6,13 @@ import signal import sys import traceback +import inspect import warnings +from queue import Queue +from threading import Thread from dask import delayed, compute +from distributed import Semaphore from pathlib import Path from datetime import datetime from rich import print @@ -20,7 +25,8 @@ from .combine_zarr import read_zarr_files from .process import compute_sv from .processed_data_io import write_processed -from .file_processor import convert_raw_file, compute_single_file +from .file_processor import convert_raw_file, compute_single_file, compute_and_export_single_file, \ + export_location_from_Sv_dataset # logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') warnings.filterwarnings("ignore", module="echopype") @@ -117,6 +123,12 @@ def process_single_file(file_path, config_data, progress_queue, logging.exception("Error processing file %s: %s", file_path, e) +def print_call_stack(): + stack = inspect.stack() + for frame in stack: + print(f"File: {frame.filename}, Line: {frame.lineno}, Function: {frame.function}") + + def signal_handler(sig, frame): global pool print('Terminating processes...') @@ -204,26 +216,20 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()): logging.exception("Error processing folder %s: %s", config_data['raw_path'], e) -def process_single_zarr_file(file_path, config_data, base_path=None, chunks=None, plot_echogram=False, waveform_mode="CW", - depth_offset=0, total_files=1, processed_count_var=None): +def process_single_zarr_file(file_path, config_data, base_path=None, chunks=None, plot_echogram=False, + waveform_mode="CW", + depth_offset=0): file_config_data = {**config_data, 'raw_path': Path(file_path)} + print(f"Processing file: {file_path}") + # print_call_stack() - processed_count = None - - if processed_count_var: - processed_count = processed_count_var.get() + 1 - processed_count_var.set(processed_count) - - compute_single_file(file_config_data, base_path=base_path, chunks=chunks, plot_echogram=plot_echogram, - waveform_mode=waveform_mode, depth_offset=depth_offset) + compute_and_export_single_file(file_config_data, base_path=base_path, chunks=chunks, plot_echogram=plot_echogram, + waveform_mode=waveform_mode, depth_offset=depth_offset) + print(f"Finished processing file: {file_path}") - if processed_count: - print(f"Processed file: {file_path}. {processed_count}/{total_files}") - -def process_zarr_files(config_data, workers_count=os.cpu_count(), status=None, chunks=None, processed_count_var=None, - plot_echogram=False, - waveform_mode="CW", depth_offset=0): +def process_zarr_files(config_data, client, workers_count=None, chunks=None, plot_echogram=False, waveform_mode="CW", depth_offset=0): + from tqdm.auto import tqdm dir_path = config_data['raw_path'] zarr_files = read_zarr_files(dir_path) @@ -231,28 +237,115 @@ def process_zarr_files(config_data, workers_count=os.cpu_count(), status=None, c logging.error("No valid .zarr files with creation time found in directory: %s", dir_path) return - if status: - status.update(f"Found {len(zarr_files)} Zarr files in directory: {dir_path}\n") + max_concurrent_tasks = workers_count or os.cpu_count() + total_files = len(zarr_files) + print(f"Found {total_files} Zarr files in directory: {dir_path}\n") + progress_bar = tqdm(total=total_files, desc="Processing Files", unit="file", ncols=100) + task_queue = Queue() + results = [] + + def process_file_task(file_path): + @delayed + def process_task(): + process_single_zarr_file(file_path, config_data, chunks=chunks, base_path=dir_path, + plot_echogram=plot_echogram, waveform_mode=waveform_mode, + depth_offset=depth_offset) + return file_path + + return process_task() + + def worker(): + while True: + file_path = task_queue.get() + if file_path is None: + break + try: + task = process_file_task(file_path) + result = task.compute() + results.append(result) + progress_bar.update() + print(f"Task completed: {file_path}") + except Exception as e: + print(f"Error processing file: {file_path}, error: {e}") + finally: + task_queue.task_done() + + # Enqueue initial tasks + for file_path in zarr_files: + task_queue.put(file_path) + + # Create and start worker threads + threads = [] + for _ in range(max_concurrent_tasks): + thread = Thread(target=worker) + thread.start() + threads.append(thread) + + # Block until all tasks are done + task_queue.join() + + # Stop workers + for _ in range(max_concurrent_tasks): + task_queue.put(None) + for thread in threads: + thread.join() + + progress_bar.close() + print("✅ All files have been processed") + + +def export_location_from_zarr_files(config_data, client=None, workers_count=os.cpu_count(), chunks=None): + from tqdm.auto import tqdm + semaphore = Semaphore(max_leases=workers_count) tasks = [] - total_files = len(zarr_files) - if processed_count_var: - processed_count_var.set(0) + dir_path = config_data['raw_path'] + zarr_files = read_zarr_files(dir_path) + progress_bar = tqdm(total=len(zarr_files), desc="Processing Files", unit="file", ncols=100) + + def update_progress_fn(*args): + progress_bar.update() + + if not zarr_files: + logging.error("No valid .zarr files with creation time found in directory: %s", dir_path) + return + + logging.info(f"Found {len(zarr_files)} Zarr files in directory: {dir_path}\n") for file_path in zarr_files: - if status: - status.update(f"Computing Sv for {file_path}...\n") - task = delayed(process_single_zarr_file)(file_path, config_data, chunks=chunks, base_path=dir_path, - plot_echogram=plot_echogram, - waveform_mode=waveform_mode, depth_offset=depth_offset, - total_files=total_files, processed_count_var=processed_count_var) + task = delayed(export_location_from_Sv_dataset)(file_path, config_data, chunks=chunks) tasks.append(task) # Execute all tasks in parallel - compute(*tasks) + futures = client.compute(tasks) + for future in futures: + future.add_done_callback(update_progress_fn) + + client.gather(futures) + progress_bar.close() logging.info("✅ All files have been processed") + merge_json_files(dir_path) + + +def merge_json_files(output_folder): + json_files = list(Path(output_folder).glob("gps_data_*.json")) + all_data = [] + + for json_file in json_files: + with open(json_file, 'r') as file: + data = json.load(file) + if isinstance(data, list): + all_data.extend(data) + else: + all_data.append(data) + + merged_json_file_path = Path(output_folder) / "gps_data.json" + with open(merged_json_file_path, 'w') as merged_file: + json.dump(all_data, merged_file, indent=4) + + logging.info(f"✅ Merged {len(json_files)} JSON files into {merged_json_file_path}") def from_filename(file_name): @@ -268,5 +361,4 @@ def from_filename(file_name): return None - signal.signal(signal.SIGINT, signal_handler) diff --git a/oceanstream/process/process.py b/oceanstream/process/process.py index ee0f5b3..3b47eee 100644 --- a/oceanstream/process/process.py +++ b/oceanstream/process/process.py @@ -4,6 +4,7 @@ read_file, compute_sv_with_encode_mode, enrich_sv_dataset, + interpolate_sv, regrid_dataset ) @@ -41,14 +42,17 @@ async def process_file_with_progress(progress, compute_task_id, echodata, encode def compute_sv(echodata, encode_mode="power", waveform_mode="CW", depth_offset=0): sv_dataset = compute_sv_with_encode_mode(echodata, waveform_mode, encode_mode) + # sv_processed = interpolate_sv(sv_dataset) + sv_processed = sv_dataset sv_enriched = enrich_sv_dataset( - sv_dataset, + sv_processed, echodata, waveform_mode=waveform_mode, encode_mode=encode_mode, depth_offset=depth_offset ) + # del echodata # sv_enriched_downsampled = regrid_dataset(sv_enriched) # sv_enriched = sv_enriched_downsampled diff --git a/oceanstream/process/utils.py b/oceanstream/process/utils.py index 4491b98..2c75cef 100644 --- a/oceanstream/process/utils.py +++ b/oceanstream/process/utils.py @@ -1,3 +1,4 @@ +import json from pathlib import Path from rich.console import Console from rich.table import Table @@ -6,6 +7,7 @@ from IPython.display import display, HTML import re from datetime import datetime, timedelta +from typing import Dict, Any, List split_by_day_pattern = r'D(\d{8})-T(\d{6})' @@ -156,4 +158,69 @@ def format_duration(duration): total_seconds = int(duration.total_seconds()) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) - return f"{hours:02}:{minutes:02}:{seconds:02}" \ No newline at end of file + return f"{hours:02}:{minutes:02}:{seconds:02}" + + +def append_gps_data(gps_data: List[Dict[str, Any]], gps_json_file_path: Path, filename: str): + """ + Append GPS data to a JSON file. + + Args: + gps_data (List[Dict[str, Any]]): The GPS data to append. + gps_json_file_path (Path): The path to the JSON file where the GPS data will be saved. + filename (str): The filename to associate with the GPS data. + """ + gps_entry = { + "filename": filename, + "gps_data": gps_data + } + + # Check if the file already exists + if gps_json_file_path.exists(): + with open(gps_json_file_path, 'r+') as json_file: + try: + # Load existing data + existing_data = json.load(json_file) + except json.JSONDecodeError: + existing_data = [] + + # Ensure existing data is a list + if not isinstance(existing_data, list): + existing_data = [existing_data] + + # Append new data + existing_data.append(gps_entry) + + # Move file pointer to the beginning + json_file.seek(0) + json_file.truncate() + + # Write updated data + json.dump(existing_data, json_file, indent=4) + else: + # Create a new file with the GPS data + with open(gps_json_file_path, 'w') as json_file: + json.dump([gps_entry], json_file, indent=4) + + +def save_output_data(output_message: dict, output_data_path: Path): + """ + Save the output message data to a JSON file. + + Args: + output_message (dict): The output message data. + output_data_path (Path): The path to the JSON file where the output data will be saved. + """ + # Check if the JSON file already exists and read its content + if output_data_path.exists(): + with open(output_data_path, "r") as json_file: + existing_data = json.load(json_file) + else: + existing_data = [] + + # Append the new data to the existing data + existing_data.append(output_message) + + # Write the updated data back to the JSON file + with open(output_data_path, "w") as json_file: + json.dump(existing_data, json_file, indent=4)