Skip to content

Commit

Permalink
Separate usage of Point and GpsPoint
Browse files Browse the repository at this point in the history
  • Loading branch information
malconsei committed Sep 13, 2023
1 parent c43c482 commit d145411
Show file tree
Hide file tree
Showing 24 changed files with 155 additions and 114 deletions.
21 changes: 11 additions & 10 deletions mapillary_tools/exiftool_read_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _aggregate_gps_track(
alt_tag: T.Optional[str] = None,
direction_tag: T.Optional[str] = None,
ground_speed_tag: T.Optional[str] = None,
) -> T.List[geo.PointWithFix]:
) -> T.List[geo.GpsPoint]:
"""
Aggregate all GPS data by the tags.
It requires lat, lon to be present, and their lengths must match.
Expand Down Expand Up @@ -173,7 +173,7 @@ def _aggregate_float_values_same_length(
if timestamp is None or lon is None or lat is None:
continue
track.append(
geo.PointWithFix(
geo.GpsPoint(
time=timestamp,
lon=lon,
lat=lat,
Expand All @@ -182,6 +182,7 @@ def _aggregate_float_values_same_length(
gps_fix=None,
gps_precision=None,
gps_ground_speed=ground_speed,
unix_timestamp_ms=int(timestamp * 1000),
)
)

Expand Down Expand Up @@ -230,8 +231,8 @@ def _aggregate_gps_track_by_sample_time(
ground_speed_tag: T.Optional[str] = None,
gps_fix_tag: T.Optional[str] = None,
gps_precision_tag: T.Optional[str] = None,
) -> T.List[geo.PointWithFix]:
track: T.List[geo.PointWithFix] = []
) -> T.List[geo.GpsPoint]:
track: T.List[geo.GpsPoint] = []

expanded_gps_fix_tag = None
if gps_fix_tag is not None:
Expand Down Expand Up @@ -298,21 +299,21 @@ def __init__(
self._texts_by_tag = _index_text_by_tag(self.etree.getroot())
self._all_tags = set(self._texts_by_tag.keys())

def extract_gps_track(self) -> T.List[geo.Point]:
def extract_gps_track(self) -> T.List[geo.GpsPoint]:
# blackvue and many other cameras
track_with_fix = self._extract_gps_track_from_quicktime()
if track_with_fix:
return T.cast(T.List[geo.Point], track_with_fix)
return track_with_fix

# insta360 has its own tag
track_with_fix = self._extract_gps_track_from_quicktime(namespace="Insta360")
if track_with_fix:
return T.cast(T.List[geo.Point], track_with_fix)
return track_with_fix

# mostly for gopro
track_with_fix = self._extract_gps_track_from_track()
if track_with_fix:
return T.cast(T.List[geo.Point], track_with_fix)
return track_with_fix

return []

Expand Down Expand Up @@ -355,7 +356,7 @@ def extract_model(self) -> T.Optional[str]:
_, model = self._extract_make_and_model()
return model

def _extract_gps_track_from_track(self) -> T.List[geo.PointWithFix]:
def _extract_gps_track_from_track(self) -> T.List[geo.GpsPoint]:
for track_id in range(1, MAX_TRACK_ID + 1):
track_ns = f"Track{track_id}"
if self._all_tags_exists(
Expand Down Expand Up @@ -397,7 +398,7 @@ def _all_tags_exists(self, tags: T.Set[str]) -> bool:

def _extract_gps_track_from_quicktime(
self, namespace: str = "QuickTime"
) -> T.List[geo.PointWithFix]:
) -> T.List[geo.GpsPoint]:
if not self._all_tags_exists(
{
expand_tag(f"{namespace}:GPSDateTime"),
Expand Down
10 changes: 6 additions & 4 deletions mapillary_tools/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class GPSFix(Enum):


@dataclasses.dataclass
class PointWithFix(Point):
gps_fix: T.Optional[GPSFix]
gps_precision: T.Optional[float]
gps_ground_speed: T.Optional[float]
class GpsPoint(Point):
gps_fix: T.Optional[GPSFix] = None
gps_precision: T.Optional[float] = None
gps_ground_speed: T.Optional[float] = None
unix_timestamp_ms: T.Optional[int] = None
gps_horizontal_accuracy: T.Optional[float] = None


def _ecef_from_lla_DEPRECATED(
Expand Down
8 changes: 4 additions & 4 deletions mapillary_tools/geotag/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)


def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.GpsPoint, None, None]:
for line_bytes in gps_data.splitlines():
match = NMEA_LINE_REGEX.match(line_bytes)
if match is None:
Expand All @@ -44,7 +44,7 @@ def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
if not nmea.is_valid:
continue
epoch_ms = int(match.group(1))
yield geo.Point(
yield geo.GpsPoint(
time=epoch_ms,
lat=nmea.latitude,
lon=nmea.longitude,
Expand Down Expand Up @@ -90,7 +90,7 @@ def extract_camera_model(fp: T.BinaryIO) -> str:
return ""


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.GpsPoint]]:
gps_data = simple_mp4_parser.parse_mp4_data_first(fp, [b"free", b"gps "])
if gps_data is None:
return None
Expand All @@ -108,7 +108,7 @@ def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
return points


def parse_gps_points(path: pathlib.Path) -> T.List[geo.Point]:
def parse_gps_points(path: pathlib.Path) -> T.List[geo.GpsPoint]:
with path.open("rb") as fp:
points = extract_points(fp)

Expand Down
35 changes: 28 additions & 7 deletions mapillary_tools/geotag/camm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dataclasses
import io
import logging
import math
import pathlib
import typing as T
from enum import Enum
Expand Down Expand Up @@ -81,12 +82,12 @@ class CAMMType(Enum):

def _parse_point_from_sample(
fp: T.BinaryIO, sample: sample_parser.Sample
) -> T.Optional[geo.Point]:
) -> T.Optional[geo.GpsPoint]:
fp.seek(sample.offset, io.SEEK_SET)
data = fp.read(sample.size)
box = CAMMSampleData.parse(data)
if box.type == CAMMType.MIN_GPS.value:
return geo.Point(
return geo.GpsPoint(
time=sample.time_offset,
lat=box.data[0],
lon=box.data[1],
Expand All @@ -96,19 +97,39 @@ def _parse_point_from_sample(
elif box.type == CAMMType.GPS.value:
# Not using box.data.time_gps_epoch as the point timestamp
# because it is from another clock
return geo.Point(
speed = math.sqrt(
math.pow(box.data.velocity_east, 2) + math.pow(box.data.velocity_north, 2)
)

return geo.GpsPoint(
time=sample.time_offset,
lat=box.data.latitude,
lon=box.data.longitude,
alt=box.data.altitude,
angle=None,
unix_timestamp_ms=int(
_gps_timestamp_to_unix(box.data.time_gps_epoch) * 1000
),
gps_fix=geo.GPSFix(box.data.gps_fix_type),
gps_ground_speed=speed,
gps_horizontal_accuracy=box.data.horizontal_accuracy,
)
return None


def _gps_timestamp_to_unix(ts: float) -> float:
"""
Convert a GPS timestamp to UNIX timestamp.
See https://stackoverflow.com/questions/20521750/ticks-between-unix-epoch-and-gps-epoch
for some info on GPS timestamp. Here we arbitrarily use the offset as of 2023, since we
are not interested in precision to the second.
"""
return ts + 315964800 - 18


def filter_points_by_elst(
points: T.Iterable[geo.Point], elst: T.Sequence[T.Tuple[float, float]]
) -> T.Generator[geo.Point, None, None]:
points: T.Iterable[geo.GpsPoint], elst: T.Sequence[T.Tuple[float, float]]
) -> T.Generator[geo.GpsPoint, None, None]:
empty_elst = [entry for entry in elst if entry[0] == -1]
if empty_elst:
offset = empty_elst[-1][1]
Expand Down Expand Up @@ -159,7 +180,7 @@ def _extract_camm_samples(
yield from camm_samples


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.GpsPoint]]:
"""
Return a list of points (could be empty) if it is a valid CAMM video,
otherwise None
Expand Down Expand Up @@ -223,7 +244,7 @@ def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
return points


def parse_gpx(path: pathlib.Path) -> T.List[geo.Point]:
def parse_gpx(path: pathlib.Path) -> T.List[geo.GpsPoint]:
with path.open("rb") as fp:
points = extract_points(fp)
if points is None:
Expand Down
17 changes: 13 additions & 4 deletions mapillary_tools/geotag/geotag_images_from_gpx_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ def __init__(
len(tracks),
source_path,
)
self.points: T.List[geo.Point] = sum(tracks, [])
points: T.List[geo.GpsPoint] = sum(tracks, [])
self.points = [
geo.Point(time=p.time, lat=p.lat, lon=p.lon, alt=p.alt, angle=p.angle)
for p in points
]
self.image_paths = image_paths
self.source_path = source_path
self.use_gpx_start_time = use_gpx_start_time
Expand Down Expand Up @@ -120,7 +124,7 @@ def to_description(self) -> T.List[types.ImageMetadataOrError]:
)


Track = T.List[geo.Point]
Track = T.List[geo.GpsPoint]


def parse_gpx(gpx_file: Path) -> T.List[Track]:
Expand All @@ -134,13 +138,18 @@ def parse_gpx(gpx_file: Path) -> T.List[Track]:
tracks.append([])
for point in segment.points:
if point.time is not None:
time = geo.as_unix_time(point.time) if point.time else 0
tracks[-1].append(
geo.Point(
time=geo.as_unix_time(point.time),
geo.GpsPoint(
time=time,
lat=point.latitude,
lon=point.longitude,
alt=point.elevation,
angle=None,
gps_fix=None,
gps_precision=None,
gps_ground_speed=point.speed,
unix_timestamp_ms=int(time * 1000),
)
)

Expand Down
15 changes: 12 additions & 3 deletions mapillary_tools/geotag/geotag_images_from_nmea_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
)


def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.Point]:
def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.GpsPoint]:
with nmea_file.open("r") as f:
lines = f.readlines()
lines = [line.rstrip("\n\r") for line in lines]
Expand All @@ -49,10 +49,19 @@ def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.Point]:
if "$GPGGA" in line:
data = pynmea2.parse(line)
dt = datetime.datetime.combine(date, data.timestamp)
time = geo.as_unix_time(dt)
lat, lon, alt = data.latitude, data.longitude, data.altitude
points.append(
geo.Point(
time=geo.as_unix_time(dt), lat=lat, lon=lon, alt=alt, angle=None
geo.GpsPoint(
time=time,
lat=lat,
lon=lon,
alt=alt,
angle=None,
unix_timestamp_ms=int(time * 1000),
gps_fix=None,
gps_ground_speed=None,
gps_precision=None,
)
)

Expand Down
14 changes: 4 additions & 10 deletions mapillary_tools/geotag/geotag_videos_from_exiftool_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@ def geotag_video(element: ET.Element) -> types.VideoMetadataOrError:
points = geo.extend_deduplicate_points(points)
assert points, "must have at least one point"

if all(isinstance(p, geo.PointWithFix) for p in points):
points = T.cast(
T.List[geo.Point],
gpmf_gps_filter.remove_noisy_points(
T.cast(T.List[geo.PointWithFix], points)
),
)
if not points:
raise exceptions.MapillaryGPSNoiseError("GPS is too noisy")
points = list(gpmf_gps_filter.remove_noisy_points(points))
if not points:
raise exceptions.MapillaryGPSNoiseError("GPS is too noisy")

stationary = video_utils.is_video_stationary(
geo.get_max_distance_from_start([(p.lat, p.lon) for p in points])
Expand All @@ -66,7 +60,7 @@ def geotag_video(element: ET.Element) -> types.VideoMetadataOrError:
video_path,
md5sum=None,
filetype=types.FileType.VIDEO,
points=points,
points=list(points),
make=exif.extract_make(),
model=exif.extract_model(),
)
Expand Down
4 changes: 2 additions & 2 deletions mapillary_tools/geotag/geotag_videos_from_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ def geotag_video(
video_metadata.points = geo.extend_deduplicate_points(video_metadata.points)
assert video_metadata.points, "must have at least one point"

if all(isinstance(p, geo.PointWithFix) for p in video_metadata.points):
if all(isinstance(p, geo.GpsPoint) for p in video_metadata.points):
video_metadata.points = T.cast(
T.List[geo.Point],
gpmf_gps_filter.remove_noisy_points(
T.cast(T.List[geo.PointWithFix], video_metadata.points)
T.cast(T.List[geo.GpsPoint], video_metadata.points)
),
)
if not video_metadata.points:
Expand Down
10 changes: 5 additions & 5 deletions mapillary_tools/geotag/gpmf_gps_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@


def remove_outliers(
sequence: T.Sequence[geo.PointWithFix],
) -> T.Sequence[geo.PointWithFix]:
sequence: T.Sequence[geo.GpsPoint],
) -> T.Sequence[geo.GpsPoint]:
distances = [
geo.gps_distance((left.lat, left.lon), (right.lat, right.lon))
for left, right in geo.pairwise(sequence)
Expand Down Expand Up @@ -50,14 +50,14 @@ def remove_outliers(
)

return T.cast(
T.List[geo.PointWithFix],
T.List[geo.GpsPoint],
gps_filter.find_majority(merged.values()),
)


def remove_noisy_points(
sequence: T.Sequence[geo.PointWithFix],
) -> T.Sequence[geo.PointWithFix]:
sequence: T.Sequence[geo.GpsPoint],
) -> T.Sequence[geo.GpsPoint]:
num_points = len(sequence)
sequence = [
p
Expand Down
Loading

0 comments on commit d145411

Please sign in to comment.