Skip to content

Commit

Permalink
doc: more doc and type hints.
Browse files Browse the repository at this point in the history
  • Loading branch information
seballgeyer committed Jul 22, 2024
1 parent fefbaba commit e7a4747
Showing 1 changed file with 44 additions and 21 deletions.
65 changes: 44 additions & 21 deletions gnssanalysis/gn_io/sp3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import io as _io
import os as _os
import re as _re
from typing import Literal, Union, List
from typing import Literal, Union, List, Tuple

import numpy as _np
import pandas as _pd
Expand Down Expand Up @@ -69,8 +69,14 @@ def sp3_clock_nodata_to_nan(sp3_df: _pd.DataFrame) -> None:
sp3_df.loc[nan_mask, ("EST", "CLK")] = _np.nan


def mapparm(old, new):
"""scipy function f map values"""
def mapparm(old: Tuple[float, float], new: Tuple[float, float]) -> Tuple[float, float]:
"""
Maps values from the old range to the new range.
:param Tuple[float, float] old: The old range of values.
:param Tuple[float, float] new: The new range of values.
:return Tuple[float, float]: The offset and scale factor for the mapping.
"""
oldlen = old[1] - old[0]
newlen = new[1] - new[0]
off = (old[1] * new[0] - old[0] * new[1]) / oldlen
Expand Down Expand Up @@ -242,7 +248,7 @@ def getVelSpline(sp3Df: _pd.DataFrame) -> _pd.DataFrame:
return _pd.concat([sp3Df, _pd.concat([velDf], keys=["VELi"], axis=1)], axis=1)


def getVelPoly(sp3Df: _pd.DataFrame, deg: int = 35):
def getVelPoly(sp3Df: _pd.DataFrame, deg: int = 35) -> _pd.DataFrame:
"""
Interpolates the positions for -1s and +1s in the sp3_df DataFrame and outputs velocities.
Expand Down Expand Up @@ -278,7 +284,7 @@ def getVelPoly(sp3Df: _pd.DataFrame, deg: int = 35):
return _pd.concat([sp3Df, vel_i], axis=1)


def gen_sp3_header(sp3_df):
def gen_sp3_header(sp3_df: _pd.DataFrame) -> str:
"""
Generate the header for an SP3 file based on the given DataFrame.
Expand Down Expand Up @@ -341,7 +347,7 @@ def gen_sp3_header(sp3_df):
return "".join(line1 + line2 + sats_header.tolist() + sv_orb_head.tolist() + head_c + head_fi + comment)


def gen_sp3_content(sp3_df: _pd.DataFrame, sort_outputs: bool = False, buf: Union[None, _io.TextIOBase] = None):
def gen_sp3_content(sp3_df: _pd.DataFrame, sort_outputs: bool = False, buf: Union[None, _io.TextIOBase] = None) -> str:
"""
Organises, formats (including nodata values), then writes out SP3 content to a buffer if provided, or returns
it otherwise.
Expand Down Expand Up @@ -490,28 +496,43 @@ def clk_std_formatter(x):
return None


def write_sp3(sp3_df, path):
"""sp3 writer, dataframe to sp3 file"""
def write_sp3(sp3_df: _pd.DataFrame, path: str) -> None:
"""sp3 writer, dataframe to sp3 file
:param pandas.DataFrame sp3_df: The DataFrame containing the SP3 data.
:param str path: The path to write the SP3 file to.
"""
content = gen_sp3_header(sp3_df) + gen_sp3_content(sp3_df) + "EOF"
with open(path, "w") as file:
file.write(content)


def merge_attrs(df_list):
"""Merges attributes of a list of sp3 dataframes into a single set of attributes"""
df = _pd.concat(list(map(lambda obj: obj.attrs["HEADER"], df_list)), axis=1)
def merge_attrs(df_list: List[_pd.DataFrame]) -> _pd.Series:
"""Merges attributes of a list of sp3 dataframes into a single set of attributes.
:param List[pd.DataFrame] df_list: The list of sp3 dataframes.
:return pd.Series: The merged attributes.
"""
df = _pd.concat(list(map(lambda obj: obj.attrs["HEADER"], df_list)), axis=1)
mask_mixed = ~_gn_aux.unique_cols(df.loc["HEAD"])
values_if_mixed = _np.asarray(["MIX", "MIX", "MIX", None, "M", None, "MIX", "P", "MIX", "d"])
head = df[0].loc["HEAD"].values
head[mask_mixed] = values_if_mixed[mask_mixed]
sv_info = df.loc["SV_INFO"].max(axis=1).values.astype(int)

return _pd.Series(_np.concatenate([head, sv_info]), index=df.index)


def sp3merge(sp3paths, clkpaths=None, nodata_to_nan=False):
"""Reads in a list of sp3 files and optianl list of clk file and merges them into a single sp3 file"""
def sp3merge(
sp3paths: List[str], clkpaths: Union[List[str], None] = None, nodata_to_nan: bool = False
) -> _pd.DataFrame:
"""Reads in a list of sp3 files and optional list of clk files and merges them into a single sp3 file.
:param List[str] sp3paths: The list of paths to the sp3 files.
:param Union[List[str], None] clkpaths: The list of paths to the clk files, or None if no clk files are provided.
:param bool nodata_to_nan: Flag indicating whether to convert nodata values to NaN.
:return pd.DataFrame: The merged sp3 DataFrame.
"""
sp3_dfs = [read_sp3(sp3_file, nodata_to_nan=nodata_to_nan) for sp3_file in sp3paths]
merged_sp3 = _pd.concat(sp3_dfs)
merged_sp3.attrs["HEADER"] = merge_attrs(sp3_dfs)
Expand Down Expand Up @@ -542,11 +563,15 @@ def diff_sp3_rac(
sp3_test: _pd.DataFrame,
hlm_mode: Literal[None, "ECF", "ECI"] = None,
use_cubic_spline: bool = True,
):
) -> _pd.DataFrame:
"""
Computes the difference between the two sp3 files in the radial, along-track and cross-track coordinates
the interpolator used for computation of the velocities can be based on cubic spline (default) or polynomial
Breaks if NaNs appear on unstack in getVelSpline function
Computes the difference between the two sp3 files in the radial, along-track and cross-track coordinates.
:param DataFrame sp3_baseline: The baseline sp3 DataFrame.
:param DataFrame sp3_test: The test sp3 DataFrame.
:param string hlm_mode: The mode for HLM transformation. Can be None, "ECF", or "ECI".
:param bool use_cubic_spline: Flag indicating whether to use cubic spline for velocity computation.
:return: The DataFrame containing the difference in RAC coordinates.
"""
hlm_modes = [None, "ECF", "ECI"]
if hlm_mode not in hlm_modes:
Expand Down Expand Up @@ -576,9 +601,7 @@ def diff_sp3_rac(
nd_rac = diff_eci.values[:, _np.newaxis] @ _gn_transform.eci2rac_rot(sp3_baseline_eci_vel)
df_rac = _pd.DataFrame(
nd_rac.reshape(-1, 3),
index=sp3_baseline.index, # Note that if the test and baseline have different SVs, this index will refer to
# data which is missing in the 'test' dataframe (and so is likely to be missing in
# the diff too).
index=sp3_baseline.index,
columns=[["EST_RAC"] * 3, ["Radial", "Along-track", "Cross-track"]],
)

Expand Down

0 comments on commit e7a4747

Please sign in to comment.