Skip to content

Commit

Permalink
Adds velocity LSQ solution test (#63)
Browse files Browse the repository at this point in the history
* corrects for sat velocity

* fixes user velocity estimation test

* fixes signs
  • Loading branch information
jtec authored Mar 19, 2024
1 parent efa068e commit e5cf0af
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/prx/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def write_csv_file(
records = flat_records.loc[flat_records.observation_type.str.startswith("C")]
records["C_obs"] = records.observation_value
records = records.drop(columns=["observation_value", "observation_type"])
type_2_unit = {"D": "mps", "L": "m", "S": "dBHz", "C": "m"}
type_2_unit = {"D": "hz", "L": "cycles", "S": "dBHz", "C": "m"}
for obs_type in ["D", "L", "S"]:
obs = flat_records.loc[flat_records.observation_type.str.startswith(obs_type)][
[
Expand Down
8 changes: 8 additions & 0 deletions src/prx/test/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,11 @@ def test_gfzrnx_function_call(input_for_test):
except AssertionError:
log.info(f"gfzrnx binary did not execute with file {file_sp3}")
assert True


def test_row_wise_dot_product():
# Check whether the way we compute the row-wise dot product with numpy yields the expected result
A = np.array([[1, 2], [4, 5], [7, 8]])
B = np.array([[10, 20], [30, 40], [50, 60]])
row_wise_dot = np.sum(A * B, axis=1).reshape(-1, 1)
assert (row_wise_dot == np.array([[10 + 40], [120 + 200], [350 + 480]])).all()
8 changes: 5 additions & 3 deletions src/prx/test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from prx import helpers
from prx import constants
from prx import main
from prx.user import parse_prx_csv_file, spp_pt_lsq
from prx.user import parse_prx_csv_file, spp_pt_lsq, spp_vt_lsq


# This function sets up a temporary directory, copies a rinex observations file into that directory
Expand Down Expand Up @@ -101,15 +101,17 @@ def test_spp_lsq(input_for_test):
]
for constellations_to_use in [("G", "E", "C"), ("G",), ("E",), ("C",)]:
obs = df_first_epoch[df.constellation.isin(constellations_to_use)]
x_lsq = spp_pt_lsq(obs)
pt_lsq = spp_pt_lsq(obs)
vt_lsq = spp_vt_lsq(obs, p_ecef_m=pt_lsq[0:3, :])
assert (
np.max(
np.abs(
x_lsq[0:3, :]
pt_lsq[0:3, :]
- np.array(
metadata["approximate_receiver_ecef_position_m"]
).reshape(-1, 1)
)
)
< 1e1
)
assert np.max(np.abs(vt_lsq[0:3, :])) < 1e-1
40 changes: 36 additions & 4 deletions src/prx/user.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
from pathlib import Path

import numpy as np
import pandas as pd

from prx.constants import cGpsSpeedOfLight_mps


def parse_prx_csv_file_metadata(prx_file: Path):
with open(prx_file, "r") as f:
Expand All @@ -15,7 +16,38 @@ def parse_prx_csv_file(prx_file: Path):
return pd.read_csv(prx_file, comment="#"), parse_prx_csv_file_metadata(prx_file)


def spp_vt_lsq(df, p_ecef_m):
df = df[df.D_obs_hz.notna()].reset_index(drop=True)
df["D_obs_mps"] = -df.D_obs_hz * cGpsSpeedOfLight_mps / df.carrier_frequency_hz
# Remove satellite velocity projected onto line of sight
rx_sat_vectors = df[["x_m", "y_m", "z_m"]].to_numpy() - p_ecef_m.T
row_sums = np.linalg.norm(rx_sat_vectors, axis=1)
unit_vectors = (rx_sat_vectors.T / row_sums).T
# The next line computes the row-wise dot product of the two matrices
df["satellite_los_velocities"] = np.sum(
unit_vectors * df[["dx_mps", "dy_mps", "dz_mps"]].to_numpy(), axis=1
).reshape(-1, 1)
df["D_obs_corrected_mps"] = (
df.D_obs_mps.to_numpy().reshape(-1, 1)
+ df.dclock_mps.to_numpy().reshape(-1, 1)
- df["satellite_los_velocities"].to_numpy().reshape(-1, 1)
)
# Jacobian of Doppler observation w.r.t. receiver clock offset drift w.r.t. constellation system clock
H_dclock = np.zeros(
(
len(df.D_obs_corrected_mps),
len(df.constellation.unique()),
)
)
for i, constellation in enumerate(df.constellation.unique()):
H_dclock[df.constellation == constellation, i] = 1
H = np.hstack((-unit_vectors, H_dclock))
x_lsq, residuals, _, _ = np.linalg.lstsq(H, df.D_obs_corrected_mps, rcond="warn")
return x_lsq.reshape(-1, 1)


def spp_pt_lsq(df, dx_convergence_l2=1e-6, max_iterations=10):
df = df[df.C_obs.notna()]
df["C_obs_corrected"] = (
df.C_obs
+ df.clock_m
Expand All @@ -25,7 +57,7 @@ def spp_pt_lsq(df, dx_convergence_l2=1e-6, max_iterations=10):
- df.tropo_delay_m
- df.group_delay_m
)
# Determine the constellation selection matrix of each obs
# Jacobian of pseudorange observation w.r.t. receiver clock offset w.r.t. constellation system clock
H_clock = np.zeros(
(
len(df.C_obs_corrected),
Expand All @@ -34,12 +66,12 @@ def spp_pt_lsq(df, dx_convergence_l2=1e-6, max_iterations=10):
)
for i, constellation in enumerate(df.constellation.unique()):
H_clock[df.constellation == constellation, i] = 1
# initial linearization point
# Initial linearization point
x_linearization = np.zeros((3 + len(df.constellation.unique()), 1))
solution_increment_l2 = np.inf
n_iterations = 0
while solution_increment_l2 > dx_convergence_l2:
# compute predicted pseudo-range as geometric distance + clock bias, predicted at x_linearization
# Compute predicted pseudo-range as geometric distance + receiver clock bias, predicted at x_linearization
C_obs_predicted = np.linalg.norm(
x_linearization[0:3].T - df[["x_m", "y_m", "z_m"]].to_numpy(), axis=1
) + np.squeeze( # geometric distance
Expand Down

0 comments on commit e5cf0af

Please sign in to comment.