diff --git a/dev-environment.yml b/dev-environment.yml index fb682146..f4030799 100644 --- a/dev-environment.yml +++ b/dev-environment.yml @@ -47,8 +47,5 @@ dependencies: # Optional dependency - -e ./ - # "Headless" needed for opencv to install without requiring system dependencies - - opencv-contrib-python-headless - # To run CI against latest GeoUtils # - git+https://github.com/rhugonnet/geoutils.git diff --git a/doc/source/coregistration.md b/doc/source/coregistration.md index efc4e930..03303f04 100644 --- a/doc/source/coregistration.md +++ b/doc/source/coregistration.md @@ -311,8 +311,6 @@ plt.tight_layout() Iterative Closest Point (ICP) coregistration is an iterative point cloud registration method from [Besl and McKay (1992)](https://doi.org/10.1117/12.57955). It aims at iteratively minimizing the distance between closest neighbours by applying sequential rigid transformations. If DEMs are used as inputs, they are converted to point clouds. As for Nuth and Kääb (2011), the iteration stops if it reaches the maximum number of iteration limit or if the iterative transformation amplitude falls below a specified tolerance. -ICP is currently based on [OpenCV's implementation](https://docs.opencv.org/4.x/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html) (an optional dependency), which includes outlier removal arguments. This may improve results significantly on outlier-prone data, but care should still be taken, as the risk of landing in [local minima](https://en.wikipedia.org/wiki/Maxima_and_minima) increases. - ```{code-cell} ipython3 :tags: [hide-cell] :mystnb: diff --git a/pyproject.toml b/pyproject.toml index ceef909f..abc3bd0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,3 +21,6 @@ testpaths = [ "tests", "xdem" ] +log_cli = "True" +log_cli_level = "INFO" + diff --git a/setup.cfg b/setup.cfg index a31d36ff..7f831aad 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,8 +49,6 @@ include = [options.extras_require] opt = - opencv - openh264 pytransform3d noisyopt test = diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 9e82d556..931fffd9 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -2,6 +2,7 @@ from __future__ import annotations import warnings +import logging import geopandas as gpd import geoutils @@ -18,7 +19,6 @@ from xdem import coreg, examples from xdem.coreg.affine import AffineCoreg, _reproject_horizontal_shift_samecrs - def load_examples(crop: bool = True) -> tuple[RasterType, RasterType, Vector]: """Load example files to try coregistration methods with.""" @@ -199,8 +199,8 @@ def test_raise_all_nans(self) -> None: pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore - @pytest.mark.parametrize("shifts", [(20, 5, 2), (-50, 100, 2)]) # type: ignore - @pytest.mark.parametrize("coreg_method", [coreg.NuthKaab, coreg.DhMinimize, coreg.ICP]) # type: ignore + @pytest.mark.parametrize("shifts", [(-1000, 2000, 200)]) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> None: """ Test the horizontal/vertical shift coregistrations with synthetic shifted data. These tests include NuthKaab, diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index 76f705be..f5ffb659 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -8,14 +8,6 @@ import affine -import xdem.coreg.base - -try: - import cv2 - - _has_cv2 = True -except ImportError: - _has_cv2 = False import geopandas as gpd import numpy as np import rasterio as rio @@ -29,7 +21,7 @@ from xdem.coreg.base import ( Coreg, CoregDict, - _apply_matrix_pts_arr, + _apply_matrix_pts_mat, InFitOrBinDict, InRandomDict, OutAffineDict, @@ -56,170 +48,6 @@ _HAS_NOISYOPT = False -######################### -# Iterative closest point -######################### - -def _matrix_from_translations_rotations(t1: float, t2: float, t3: float, - alpha1: float, alpha2: float, alpha3: float) -> NDArrayf: - """ - Build rigid matrix based on 3 translations (unit of coordinates) and 3 rotations (degrees). - """ - matrix = np.zeros((4, 4)) - e = np.deg2rad(np.array([alpha1, alpha2, alpha3])) - rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) - matrix[0:3, 0:3] = rot_matrix - matrix[:3, 3] = [t1, t2, t3] - - return matrix - -def _icp_fit_func(inputs: tuple[NDArrayf, NDArrayf, NDArrayf | None], t1: float, t2: float, t3: float, alpha1: float, - alpha2: float, alpha3: float, method: Literal["point-to-point", "point-to-plane"]) -> NDArrayf: - """ - The ICP function to optimize is a rigid transformation with 6 parameters (3 translations and 3 rotations) - between nearest neighbour points (that are fixed for the optimization, and update at each iterative step). - - To more easily support any curve_fit options, we return the residuals and will have them match zero. - """ - - # Get inputs - ref, tba, norm = inputs - - # Build an affine matrix for 3D translations and rotations - matrix = _matrix_from_translations_rotations(t1, t2, t3, alpha1, alpha2, alpha3) - - # Apply affine transformation - trans_tba = _apply_matrix_pts_arr(tba, matrix=matrix) - - # Define residuals depending on type of ICP method - # Point-to-point is simply the difference, from Besl and McKay (1992), https://doi.org/10.1117/12.57955 - if method == "point-to-point": - diffs = trans_tba - ref - # Point-to-plane used the normals, from Chen and Medioni (1992), https://doi.org/10.1016/0262-8856(92)90066-C - # A priori, this method is faster based on Rusinkiewicz and Levoy (2001), https://doi.org/10.1109/IM.2001.924423 - elif method == "point-to-plane": - diffs = (trans_tba - ref) * norm - - else: - raise ValueError("ICP method must be 'point-to-point' or 'point-to-plane'.") - - # Sum residuals for any dimension - res = np.sum(diffs**2, axis=1) - - return res - - -def _icp_fit(ref: NDArrayf, normals: NDArrayf, tba: NDArrayf, method: Literal["point-to-point", "point-to-plane"], - params_fit_or_bin: InFitOrBinDict, **kwargs: Any) -> NDArrayf: - - # Group inputs into a single array - inputs = (ref, normals, tba) - - # For this type of method, the procedure can only be fit - if params_fit_or_bin["fit_or_bin"] not in ["fit"]: - raise ValueError("ICP method only supports 'fit'.") - - params_fit_or_bin["fit_func"] = _icp_fit_func - - # Define partial function - loss_func = params_fit_or_bin["fit_loss_func"] - - def fit_func(offsets: tuple[float, float, float, float, float, float]) -> np.floating[Any]: - return loss_func(_icp_fit_func(inputs=inputs, t1=offsets[0], t2=offsets[1], t3=offsets[2], alpha1=offsets[3], - alpha2=offsets[4], alpha3=offsets[5], method=method)) - - # Initial offset near zero - init_offsets = (0., 0., 0., 0., 0., 0.) - - results = params_fit_or_bin["fit_minimizer"](fit_func, init_offsets, **kwargs) - - # Mypy: having results as "None" is impossible, but not understood through overloading of _bin_or_and_fit_nd... - assert results is not None - # Build matrix out of optimized parameters - matrix = _matrix_from_translations_rotations(*results[0]) - - return matrix - -def _icp_iteration_step(matrix, ref_epc, tba_epc, normals, centroid, distance_upper_bound): - - # Apply transform matrix from previous steps - trans_tba_epc = _apply_matrix_pts_arr(tba_epc, matrix, centroid=centroid) - - # Create nearest neighbour tree from reference elevations, and query for transformed point cloud - ref_epc_nearest_tree = scipy.spatial.cKDTree(ref_epc) - _, ind = ref_epc_nearest_tree.query(trans_tba_epc, k=1, distance_upper_bound=distance_upper_bound) - - # Index points to get nearest - ind_ref = ind[ind < ref_epc.shape[0]] - step_ref = ref_epc[ind_ref] - step_normals = normals[ind_ref] - ind_tba = ind < ref_epc.shape[0] - step_trans_tba = trans_tba_epc[ind_tba] - - # Fit step to get new step transform - step_matrix = _icp_fit(step_ref, step_normals, step_trans_tba) - - # Increment transformation matrix by step - new_matrix = step_matrix @ matrix - - return new_matrix - -def _icp_norms(dem: NDArrayf, transform: affine.Affine): - """ - Derive normals from the DEM for "point-to-plane" method. - - :param dem: - :param transform: - :return: - """ - resolution = _res(transform) - - # Generate the x and y coordinates for the reference_dem - gradient_x, gradient_y = np.gradient(dem) - - normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 - normal_north = np.sin(np.arctan(gradient_x / resolution[0])) - normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) - - return np.dstack(normal_east, normal_north, normal_up) - -def icp(ref_elev: NDArrayf | gpd.GeoDataFrame, - tba_elev: NDArrayf | gpd.GeoDataFrame, - transform: affine.Affine, - area_or_point: Literal["Area", "Point"] | None, - max_iterations: int, - tolerance: float, - method: Literal["point-to-point", "point-to-plane"] = "point-to-plane", - verbose: bool = False): - """ - Main function for ICP method. - - The function assumes we have a DEM and an elevation point cloud in the same CRS. - The normals (for point-to-plane) are computed on the DEM for speed. - """ - - # Pre-process inputs - - # Derive normals if method is point-to-plane - if method == "point-to-plane": - normals = _icp_norms(dem, transform) - else: - normals = None - - # Iterate through method until tolerance or max number of iterations is reached - init_matrix = np.zeros(6) - constant_inputs = (ref_epc, tba_epc, ref_norms) - final_offsets = _iterate_method( - method=_icp_iteration_step, - iterating_input=init_matrix, - constant_inputs=constant_inputs, - tolerance=tolerance, - max_iterations=max_iterations, - verbose=verbose, - ) - - return final_offsets, subsample_final - ###################################### # Generic functions for affine methods ###################################### @@ -298,7 +126,7 @@ def _iterate_method( # Print final results # TODO: Allow to pass a string to _iterate_method on how to print/describe exactly the iterating input - if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: + if logging.getLogger().getEffectiveLevel() <= logging.INFO: pbar.write(f" Iteration #{i + 1:d} - Offset: {new_inputs}; Magnitude: {new_statistic}") if i > 1 and new_statistic < tolerance: @@ -864,7 +692,7 @@ def vertical_shift( logging.info("Running vertical shift coregistration") # Pre-process point-raster inputs to the same subsampled points - sub_ref, sub_tba, _ = _preprocess_pts_rst_subsample( + sub_ref, sub_tba, _, _ = _preprocess_pts_rst_subsample( params_random=params_random, ref_elev=ref_elev, tba_elev=tba_elev, @@ -891,6 +719,265 @@ def vertical_shift( return vshift, subsample_final +############################ +# 4/ Iterative closest point +############################ + +def _matrix_from_translations_rotations(t1: float, t2: float, t3: float, + alpha1: float, alpha2: float, alpha3: float) -> NDArrayf: + """ + Build rigid matrix based on 3 translations (unit of coordinates) and 3 rotations (degrees). + """ + matrix = np.eye(4) + e = np.array([alpha1, alpha2, alpha3]) + rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) + matrix[0:3, 0:3] = rot_matrix + matrix[:3, 3] = [t1, t2, t3] + + return matrix + +def _icp_fit_func(inputs: tuple[NDArrayf, NDArrayf, NDArrayf | None], t1: float, t2: float, t3: float, alpha1: float, + alpha2: float, alpha3: float, method: Literal["point-to-point", "point-to-plane"]) -> NDArrayf: + """ + The ICP function to optimize is a rigid transformation with 6 parameters (3 translations and 3 rotations) + between nearest neighbour points (that are fixed for the optimization, and update at each iterative step). + + To more easily support any curve_fit options, we return the residuals and will have them match zero. + """ + + # Get inputs + ref, tba, norm = inputs + + # Build an affine matrix for 3D translations and rotations + matrix = _matrix_from_translations_rotations(t1, t2, t3, alpha1, alpha2, alpha3) + + # Apply affine transformation + trans_tba = _apply_matrix_pts_mat(mat=tba, matrix=matrix) + + # Define residuals depending on type of ICP method + # Point-to-point is simply the difference, from Besl and McKay (1992), https://doi.org/10.1117/12.57955 + if method == "point-to-point": + diffs = trans_tba - ref + # Point-to-plane used the normals, from Chen and Medioni (1992), https://doi.org/10.1016/0262-8856(92)90066-C + # A priori, this method is faster based on Rusinkiewicz and Levoy (2001), https://doi.org/10.1109/IM.2001.924423 + elif method == "point-to-plane": + diffs = (trans_tba - ref) * norm + + else: + raise ValueError("ICP method must be 'point-to-point' or 'point-to-plane'.") + + # Sum residuals for any dimension + res = np.sum(diffs**2, axis=1) + + return res + + +def _icp_fit(ref: NDArrayf, tba: NDArrayf, norms: NDArrayf, method: Literal["point-to-point", "point-to-plane"], + params_fit_or_bin: InFitOrBinDict, **kwargs: Any) -> NDArrayf: + + # Group inputs into a single array + inputs = (ref, tba, norms) + + # For this type of method, the procedure can only be fit + # if params_fit_or_bin["fit_or_bin"] not in ["fit"]: + # raise ValueError("ICP method only supports 'fit'.") + # + # params_fit_or_bin["fit_func"] = _icp_fit_func + + # Define partial function + loss_func = params_fit_or_bin["fit_loss_func"] + + def fit_func(offsets: tuple[float, float, float, float, float, float]) -> NDArrayf: + return _icp_fit_func(inputs=inputs, t1=offsets[0], t2=offsets[1], t3=offsets[2], + alpha1=offsets[3], alpha2=offsets[4], alpha3=offsets[5], method=method) + + # Initial offset near zero + init_offsets = (0., 0., 0., 0., 0., 0.) + + results = params_fit_or_bin["fit_minimizer"](fit_func, init_offsets, **kwargs, loss=loss_func) + + # Mypy: having results as "None" is impossible, but not understood through overloading of _bin_or_and_fit_nd... + assert results is not None + # Build matrix out of optimized parameters + matrix = _matrix_from_translations_rotations(*results.x) + + # Linear approx with least-squares + + # b1 = np.sum(ref * norms, axis=1) + # b2 = np.sum(tba * norms, axis=1) + # b = np.expand_dims(b1 - b2, axis=1) + # A1 = np.cross(tba, norms) + # A2 = norms + # A = np.hstack((A1, A2)) + # + # # if self.config["ICP_ROBUST"]: + # # x = np.linalg.inv(A.T @ weights @ A) @ A.T @ weights @ b + # x = np.linalg.inv(A.T @ A) @ A.T @ b + # + # R = np.eye(3) + # T = np.zeros(3) + # R[0, 0] = np.cos(x[2]) * np.cos(x[1]) + # R[0, 1] = -np.sin(x[2]) * np.cos(x[0]) + np.cos(x[2]) * np.sin(x[1]) * np.sin( + # x[0] + # ) + # R[0, 2] = np.sin(x[2]) * np.sin(x[0]) + np.cos(x[2]) * np.sin(x[1]) * np.cos( + # x[0] + # ) + # R[1, 0] = np.sin(x[2]) * np.cos(x[1]) + # R[1, 1] = np.cos(x[2]) * np.cos(x[0]) + np.sin(x[2]) * np.sin(x[1]) * np.sin( + # x[0] + # ) + # R[1, 2] = -np.cos(x[2]) * np.sin(x[0]) + np.sin(x[2]) * np.sin(x[1]) * np.cos( + # x[0] + # ) + # R[2, 0] = -np.sin(x[1]) + # R[2, 1] = np.cos(x[1]) * np.sin(x[0]) + # R[2, 2] = np.cos(x[1]) * np.cos(x[0]) + # T[0] = x[3] + # T[1] = x[4] + # T[2] = x[5] + # + # matrix = np.eye(4) + # matrix[:3, :3] = R + # matrix[:3, 3] = T + + return matrix + +def _icp_iteration_step(matrix, ref_epc, tba_epc, norms, ref_epc_nearest_tree, params_fit_or_bin, method): + + # Apply transform matrix from previous steps + trans_tba_epc = _apply_matrix_pts_mat(tba_epc, matrix=matrix) + + # Create nearest neighbour tree from reference elevations, and query for transformed point cloud + _, ind = ref_epc_nearest_tree.query(trans_tba_epc, k=1) + + # Index points to get nearest + ind_ref = ind[ind < ref_epc.shape[1]] + step_ref = ref_epc[ind_ref] + step_normals = norms[ind_ref] + ind_tba = ind < ref_epc.shape[1] + step_trans_tba = trans_tba_epc[ind_tba] + + # Fit to get new step transform + step_matrix = _icp_fit(step_ref, step_normals, step_trans_tba, params_fit_or_bin=params_fit_or_bin, method=method) + + # Increment transformation matrix by step + new_matrix = step_matrix @ matrix + + # Compute statistic on offset to know if it reached tolerance + translations = step_matrix[:3, 3] + rotations = step_matrix[:3, :3] + print(translations) + tolerance_translation = np.sqrt(np.sum(translations)**2) + tolerance_rotation = np.rad2deg(np.arccos(np.clip((np.trace(rotations) - 1) / 2, -1, 1))) + + return new_matrix, tolerance_translation + +def _icp_norms(dem: NDArrayf, transform: affine.Affine) -> tuple[NDArrayf, NDArrayf, NDArrayf]: + """ + Derive normals from the DEM for "point-to-plane" method. + """ + + # Get DEM resolution + resolution = _res(transform) + + # Generate the X, Y and Z normals + gradient_x, gradient_y = np.gradient(dem) + normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 + normal_north = np.sin(np.arctan(gradient_x / resolution[0])) + normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) + + return normal_east, normal_north, normal_up + +def icp(ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + max_iterations: int, + tolerance: float, + params_random: InRandomDict, + params_fit_or_bin: InFitOrBinDict, + method: Literal["point-to-point", "point-to-plane"] = "point-to-plane", + ) -> tuple[NDArrayf, tuple[float, float, float], int]: + """ + Main function for ICP method. + + The function assumes we have a DEM and an elevation point cloud in the same CRS. + The normals (for point-to-plane) are computed on the DEM for speed. + """ + + # Derive normals if method is point-to-plane, otherwise not + if method == "point-to-plane": + # We use the DEM to derive the normals + if isinstance(ref_elev, np.ndarray): + dem = ref_elev + else: + dem = tba_elev + nx, ny, nz = _icp_norms(dem, transform) + aux_vars = {"nx": nx, "ny": ny, "nz": nz} + else: + aux_vars = None + + # Pre-process point-raster inputs to the same subsampled points + sub_ref, sub_tba, sub_aux_vars, sub_coords = _preprocess_pts_rst_subsample( + params_random=params_random, + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + aux_vars=aux_vars, + return_coords=True, + ) + + # TODO: Enforce that _preprocess function returns no NaNs + # (Temporary) + ind_valid = np.logical_and(np.isfinite(sub_ref), np.isfinite(sub_tba)) + sub_ref = sub_ref[ind_valid] + sub_tba = sub_tba[ind_valid] + sub_coords = (sub_coords[0][ind_valid], sub_coords[1][ind_valid]) + if sub_aux_vars is not None: + sub_aux_vars["nx"] = sub_aux_vars["nx"][ind_valid] + sub_aux_vars["ny"] = sub_aux_vars["ny"][ind_valid] + sub_aux_vars["nz"] = sub_aux_vars["nz"][ind_valid] + + # Convert point clouds to Nx3 arrays for efficient calculations below + ref_epc = np.vstack((sub_coords[0], sub_coords[1], sub_ref)) + tba_epc = np.vstack((sub_coords[0], sub_coords[1], sub_tba)) + if sub_aux_vars is not None: + norms = np.vstack((sub_aux_vars["nx"], sub_aux_vars["ny"], sub_aux_vars["nz"])) + else: + norms = None + + # Remove centroid + centroid = np.mean(ref_epc, axis=1) + ref_epc = ref_epc - centroid[:, None] + tba_epc = tba_epc - centroid[:, None] + + # Define search tree outside of loop for performance + ref_epc_nearest_tree = scipy.spatial.KDTree(ref_epc) + + # Iterate through method until tolerance or max number of iterations is reached + init_matrix = np.eye(4) # Initial matrix is the identity transform + constant_inputs = (ref_epc, tba_epc, norms, ref_epc_nearest_tree, params_fit_or_bin, method) + final_matrix = _iterate_method( + method=_icp_iteration_step, + iterating_input=init_matrix, + constant_inputs=constant_inputs, + tolerance=tolerance, + max_iterations=max_iterations, + ) + + # Get subsample size + subsample_final = len(sub_ref) + + return final_matrix, centroid, subsample_final + ################################## # Affine coregistration subclasses ################################## @@ -1203,7 +1290,8 @@ def _to_matrix_func(self) -> NDArrayf: class ICP(AffineCoreg): """ - Iterative closest point registration, based on Besl and McKay (1992), https://doi.org/10.1117/12.57955. + Iterative closest point registration, based on Besl and McKay (1992), https://doi.org/10.1117/12.57955 for + "point-to-point" and on Chen and Medioni (1992), https://doi.org/10.1016/0262-8856(92)90066-C for "point-to-plane". Estimates a rigid transform (rotation + translation) between two elevation datasets. @@ -1211,36 +1299,32 @@ class ICP(AffineCoreg): on the coordinates in the key "centroid". The translation parameters are also stored individually in the keys "shift_x", "shift_y" and "shift_z" (in georeferenced units for horizontal shifts, and unit of the elevation dataset inputs for the vertical shift). - - Requires 'opencv'. See opencv doc for more info: - https://docs.opencv.org/master/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html """ def __init__( self, - max_iterations: int = 100, + icp_method: Literal["point-to-point", "point-to-plane"] = "point-to-plane", + fit_minimizer: Callable[..., tuple[NDArrayf, Any]] = scipy.optimize.least_squares, + fit_loss_func: Callable[[NDArrayf], np.floating[Any]] = "linear", + max_iterations: int = 20, tolerance: float = 0.05, - rejection_scale: float = 2.5, - num_levels: int = 6, subsample: float | int = 5e5, ) -> None: """ Instantiate an ICP coregistration object. - :param max_iterations: The maximum allowed iterations before stopping. - :param tolerance: The residual change threshold after which to stop the iterations. - :param rejection_scale: The threshold (std * rejection_scale) to consider points as outliers. - :param num_levels: Number of octree levels to consider. A higher number is faster but may be more inaccurate. + :param icp_method: Method of iterative closest point registration, either "point-to-point" or "point-to-plane". + :param max_iterations: Maximum allowed iterations before stopping. + :param tolerance: Residual change threshold after which to stop the iterations. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ - if not _has_cv2: - raise ValueError("Optional dependency needed. Install 'opencv'") meta = { + "icp_method": icp_method, + "fit_minimizer": fit_minimizer, + "fit_loss_func": fit_loss_func, "max_iterations": max_iterations, "tolerance": tolerance, - "rejection_scale": rejection_scale, - "num_levels": num_levels, } super().__init__(subsample=subsample, meta=meta) @@ -1259,42 +1343,18 @@ def _fit_rst_rst( ) -> None: """Estimate the rigid transform from tba_dem to ref_dem.""" - if weights is not None: - warnings.warn("ICP was given weights, but does not support it.") - - resolution = _res(transform) - - # Generate the x and y coordinates for the reference_dem - x_coords, y_coords = _coords(transform, ref_elev.shape, area_or_point=area_or_point) - gradient_x, gradient_y = np.gradient(ref_elev) - - normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 - normal_north = np.sin(np.arctan(gradient_x / resolution[0])) - normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) - - valid_mask = np.logical_and.reduce( - (inlier_mask, np.isfinite(ref_elev), np.isfinite(normal_east), np.isfinite(normal_north)) - ) - subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask) - - ref_pts = gpd.GeoDataFrame( - geometry=gpd.points_from_xy(x=x_coords[subsample_mask], y=y_coords[subsample_mask], crs=None), - data={ - "z": ref_elev[subsample_mask], - "nx": normal_east[subsample_mask], - "ny": normal_north[subsample_mask], - "nz": normal_up[subsample_mask], - }, - ) - + # Method is the same for 2D or 1D elevation differences, so we can simply re-direct to fit_rst_pts self._fit_rst_pts( - ref_elev=ref_pts, + ref_elev=ref_elev, tba_elev=tba_elev, inlier_mask=inlier_mask, transform=transform, crs=crs, area_or_point=area_or_point, - z_name="z", + z_name=z_name, + weights=weights, + bias_vars=bias_vars, + **kwargs, ) def _fit_rst_pts( @@ -1311,102 +1371,26 @@ def _fit_rst_pts( **kwargs: Any, ) -> None: - # Check which one is reference - if isinstance(ref_elev, gpd.GeoDataFrame): - point_elev = ref_elev - rst_elev = tba_elev - ref = "point" - else: - point_elev = tba_elev - rst_elev = ref_elev - ref = "raster" - - # Pre-process point data - point_elev = point_elev.dropna(how="any", subset=[z_name]) - - bounds = _bounds(transform=transform, shape=rst_elev.shape) - resolution = _res(transform) - - # Generate the x and y coordinates for the TBA DEM - x_coords, y_coords = _coords(transform, rst_elev.shape, area_or_point=area_or_point) - centroid = (float(np.mean([bounds.left, bounds.right])), float(np.mean([bounds.bottom, bounds.top])), 0.0) - # Subtract by the bounding coordinates to avoid float32 rounding errors. - x_coords -= centroid[0] - y_coords -= centroid[1] - - gradient_x, gradient_y = np.gradient(rst_elev) - - # This CRS is temporary and doesn't affect the result. It's just needed for Raster instantiation. - normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 - normal_north = np.sin(np.arctan(gradient_x / resolution[0])) - normal_up = 1 - np.linalg.norm([normal_east.data, normal_north.data], axis=0) - - valid_mask = ~np.isnan(rst_elev) & ~np.isnan(normal_east.data) & ~np.isnan(normal_north.data) - - points: dict[str, NDArrayf] = {} - points["raster"] = np.dstack( - [ - x_coords[valid_mask], - y_coords[valid_mask], - rst_elev[valid_mask], - normal_east[valid_mask], - normal_north[valid_mask], - normal_up[valid_mask], - ] - ).squeeze() - - # TODO: Should be a way to not duplicate this column and just feed it directly - point_elev["E"] = point_elev.geometry.x.values - point_elev["N"] = point_elev.geometry.y.values - - if any(col not in point_elev for col in ["nx", "ny", "nz"]): - for key, arr in [("nx", normal_east), ("ny", normal_north), ("nz", normal_up)]: - point_elev[key] = _interp_points( - arr, - transform=transform, - area_or_point=area_or_point, - points=(point_elev["E"].values, point_elev["N"].values), - ) - - point_elev["E"] -= centroid[0] - point_elev["N"] -= centroid[1] - - points["point"] = point_elev[["E", "N", z_name, "nx", "ny", "nz"]].values - - for key in points: - points[key] = points[key][~np.any(np.isnan(points[key]), axis=1)].astype("float32") - points[key][:, 0] -= resolution[0] / 2 - points[key][:, 1] -= resolution[1] / 2 - - # Extract parameters and pass them to method - max_it = self._meta["inputs"]["iterative"]["max_iterations"] - tol = self._meta["inputs"]["iterative"]["tolerance"] - rej = self._meta["inputs"]["specific"]["rejection_scale"] - num_lv = self._meta["inputs"]["specific"]["num_levels"] - icp = cv2.ppf_match_3d_ICP(max_it, tol, rej, num_lv) - logging.info("Running ICP...") - try: - # Use points as reference - _, residual, matrix = icp.registerModelToScene(points["raster"], points["point"]) - except cv2.error as exception: - if "(expected: 'n > 0'), where" not in str(exception): - raise exception - - raise ValueError( - "Not enough valid points in input data." - f"'reference_dem' had {points['ref'].size} valid points." - f"'dem_to_be_aligned' had {points['tba'].size} valid points." - ) - - # If raster was reference, invert the matrix - if ref == "raster": - matrix = xdem.coreg.base.invert_matrix(matrix) - - logging.info("ICP finished") + # Get parameters stored in class + params_random = self._meta["inputs"]["random"] + params_fit_or_bin = self._meta["inputs"]["fitorbin"] - assert residual < 1000, f"ICP coregistration failed: residual={residual}, threshold: 1000" + # Call method + matrix, centroid, subsample_final = icp( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + params_random=params_random, + params_fit_or_bin=params_fit_or_bin, + max_iterations=self._meta["inputs"]["iterative"]["max_iterations"], + tolerance=self._meta["inputs"]["iterative"]["tolerance"], + ) - # Save outputs + # Write output to class # (Mypy does not pass with normal dict, requires "OutAffineDict" here for some reason...) output_affine = OutAffineDict( centroid=centroid, @@ -1416,6 +1400,7 @@ def _fit_rst_pts( shift_z=matrix[2, 3], ) self._meta["outputs"]["affine"] = output_affine + self._meta["outputs"]["random"] = {"subsample_final": subsample_final} class NuthKaab(AffineCoreg): @@ -1442,8 +1427,8 @@ def __init__( """ Instantiate a new Nuth and Kääb (2011) coregistration object. - :param max_iterations: The maximum allowed iterations before stopping. - :param offset_threshold: The residual offset threshold after which to stop the iterations (in pixels). + :param max_iterations: Maximum allowed iterations before stopping. + :param offset_threshold: Residual offset threshold after which to stop the iterations (in pixels). :param bin_before_fit: Whether to bin data before fitting the coregistration function. For the Nuth and Kääb (2011) algorithm, this corresponds to bins of aspect to compute statistics on dh/tan(slope). :param fit_optimizer: Optimizer to minimize the coregistration function. diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 78e992a1..2d1007d0 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -104,8 +104,7 @@ "shift_y": "Northward shift estimated (georeferenced unit)", "shift_z": "Vertical shift estimated (elevation unit)", "matrix": "Affine transformation matrix estimated", - "rejection_scale": "Rejection scale", - "num_levels": "Number of levels", + "icp_method": "Type of ICP method" } ##################################### # Generic functions for preprocessing @@ -683,13 +682,14 @@ def _subsample_on_mask( transform: rio.transform.Affine, area_or_point: Literal["Area", "Point"] | None, z_name: str, -) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf]]: + return_coords: bool = False, +) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None | tuple[NDArrayf, NDArrayf]]: """ Perform subsampling on mask for raster-raster or point-raster datasets on valid points of all inputs (including potential auxiliary variables). Returns 1D arrays of subsampled inputs: reference elevation, to-be-aligned elevation and auxiliary variables - (in dictionary). + (in dictionary), and (optionally) tuple of X/Y coordinates. """ # For two rasters @@ -705,6 +705,13 @@ def _subsample_on_mask( else: sub_bias_vars = None + # Return coordinates if required + if return_coords: + coords = _coords(transform=transform, shape=ref_elev.shape, area_or_point=area_or_point) + sub_coords = (coords[0][sub_mask], coords[1][sub_mask]) + else: + sub_coords = None + # For one raster and one point cloud else: @@ -734,7 +741,13 @@ def _subsample_on_mask( else: sub_bias_vars = None - return sub_ref, sub_tba, sub_bias_vars + # Return coordinates if required + if return_coords: + sub_coords = pts + else: + sub_coords = None + + return sub_ref, sub_tba, sub_bias_vars, sub_coords def _preprocess_pts_rst_subsample( @@ -747,7 +760,8 @@ def _preprocess_pts_rst_subsample( area_or_point: Literal["Area", "Point"] | None, z_name: str, aux_vars: None | dict[str, NDArrayf] = None, -) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf]]: + return_coords: bool = False, +) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None | tuple[NDArrayf, NDArrayf]]: """ Pre-process raster-raster or point-raster datasets into 1D arrays subsampled at the same points (and interpolated in the case of point-raster input). @@ -768,7 +782,7 @@ def _preprocess_pts_rst_subsample( ) # Perform subsampling on mask for all inputs - sub_ref, sub_tba, sub_bias_vars = _subsample_on_mask( + sub_ref, sub_tba, sub_bias_vars, sub_coords = _subsample_on_mask( ref_elev=ref_elev, tba_elev=tba_elev, aux_vars=aux_vars, @@ -776,10 +790,11 @@ def _preprocess_pts_rst_subsample( transform=transform, area_or_point=area_or_point, z_name=z_name, + return_coords=return_coords, ) # Return 1D arrays of subsampled points at the same location - return sub_ref, sub_tba, sub_bias_vars + return sub_ref, sub_tba, sub_bias_vars, sub_coords @overload @@ -979,6 +994,32 @@ def invert_matrix(matrix: NDArrayf) -> NDArrayf: return pytransform3d.transformations.invert_transform(checked_matrix) +def _apply_matrix_pts_mat( + mat: NDArrayf, + matrix: NDArrayf, + centroid: tuple[float, float, float] | None = None, + invert: bool = False,) -> NDArrayf: + + # Invert matrix if required + if invert: + matrix = invert_matrix(matrix) + + # First, get 4xN array, adding a column of ones for translations during matrix multiplication + points = np.concatenate((mat, np.ones((1, mat.shape[1])))) + + # Temporarily subtract centroid coordinates + if centroid is not None: + points[:3, :] -= np.array(centroid)[:, None] + + # Transform using matrix multiplication, and get only the first three columns + transformed_points = (matrix @ points)[:3, :] + + # Add back centroid coordinates + if centroid is not None: + transformed_points += np.array(centroid)[:, None] + + return transformed_points + def _apply_matrix_pts_arr( x: NDArrayf, y: NDArrayf, @@ -1543,11 +1584,8 @@ class InSpecificDict(TypedDict, total=False): angle: float # (Using Deramp) Polynomial order selected for deramping poly_order: int - - # (Using ICP) - rejection_scale: float - num_levels: int - + # (Using ICP) Method type + icp_method: Literal["point-to-point", "point-to-plane"] class OutSpecificDict(TypedDict, total=False): """Keys and types of outputs associated with specific methods.""" @@ -1899,7 +1937,7 @@ def _preprocess_rst_pts_subsample( ) # Perform subsampling on mask for all inputs - sub_ref, sub_tba, sub_bias_vars = _subsample_on_mask( + sub_ref, sub_tba, sub_bias_vars, _ = _subsample_on_mask( ref_elev=ref_elev, tba_elev=tba_elev, aux_vars=aux_vars,