diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index 1ec7714c..f7fee661 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -87,7 +87,7 @@ def _aggregate_gps_track( alt_tag: T.Optional[str] = None, direction_tag: T.Optional[str] = None, ground_speed_tag: T.Optional[str] = None, -) -> T.List[geo.PointWithFix]: +) -> T.List[geo.GpsPoint]: """ Aggregate all GPS data by the tags. It requires lat, lon to be present, and their lengths must match. @@ -173,7 +173,7 @@ def _aggregate_float_values_same_length( if timestamp is None or lon is None or lat is None: continue track.append( - geo.PointWithFix( + geo.GpsPoint( time=timestamp, lon=lon, lat=lat, @@ -182,6 +182,7 @@ def _aggregate_float_values_same_length( gps_fix=None, gps_precision=None, gps_ground_speed=ground_speed, + unix_timestamp_ms=int(timestamp * 1000), ) ) @@ -230,8 +231,8 @@ def _aggregate_gps_track_by_sample_time( ground_speed_tag: T.Optional[str] = None, gps_fix_tag: T.Optional[str] = None, gps_precision_tag: T.Optional[str] = None, -) -> T.List[geo.PointWithFix]: - track: T.List[geo.PointWithFix] = [] +) -> T.List[geo.GpsPoint]: + track: T.List[geo.GpsPoint] = [] expanded_gps_fix_tag = None if gps_fix_tag is not None: @@ -298,21 +299,21 @@ def __init__( self._texts_by_tag = _index_text_by_tag(self.etree.getroot()) self._all_tags = set(self._texts_by_tag.keys()) - def extract_gps_track(self) -> T.List[geo.Point]: + def extract_gps_track(self) -> T.List[geo.GpsPoint]: # blackvue and many other cameras track_with_fix = self._extract_gps_track_from_quicktime() if track_with_fix: - return T.cast(T.List[geo.Point], track_with_fix) + return track_with_fix # insta360 has its own tag track_with_fix = self._extract_gps_track_from_quicktime(namespace="Insta360") if track_with_fix: - return T.cast(T.List[geo.Point], track_with_fix) + return track_with_fix # mostly for gopro track_with_fix = self._extract_gps_track_from_track() if track_with_fix: - return T.cast(T.List[geo.Point], track_with_fix) + return track_with_fix return [] @@ -355,7 +356,7 @@ def extract_model(self) -> T.Optional[str]: _, model = self._extract_make_and_model() return model - def _extract_gps_track_from_track(self) -> T.List[geo.PointWithFix]: + def _extract_gps_track_from_track(self) -> T.List[geo.GpsPoint]: for track_id in range(1, MAX_TRACK_ID + 1): track_ns = f"Track{track_id}" if self._all_tags_exists( @@ -397,7 +398,7 @@ def _all_tags_exists(self, tags: T.Set[str]) -> bool: def _extract_gps_track_from_quicktime( self, namespace: str = "QuickTime" - ) -> T.List[geo.PointWithFix]: + ) -> T.List[geo.GpsPoint]: if not self._all_tags_exists( { expand_tag(f"{namespace}:GPSDateTime"), diff --git a/mapillary_tools/geo.py b/mapillary_tools/geo.py index b4e78408..0e2570a3 100644 --- a/mapillary_tools/geo.py +++ b/mapillary_tools/geo.py @@ -40,10 +40,12 @@ class GPSFix(Enum): @dataclasses.dataclass -class PointWithFix(Point): - gps_fix: T.Optional[GPSFix] - gps_precision: T.Optional[float] - gps_ground_speed: T.Optional[float] +class GpsPoint(Point): + gps_fix: T.Optional[GPSFix] = None + gps_precision: T.Optional[float] = None + gps_ground_speed: T.Optional[float] = None + unix_timestamp_ms: T.Optional[int] = None + gps_horizontal_accuracy: T.Optional[float] = None def _ecef_from_lla_DEPRECATED( diff --git a/mapillary_tools/geotag/blackvue_parser.py b/mapillary_tools/geotag/blackvue_parser.py index a34d5327..3cde904a 100644 --- a/mapillary_tools/geotag/blackvue_parser.py +++ b/mapillary_tools/geotag/blackvue_parser.py @@ -26,7 +26,7 @@ ) -def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]: +def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.GpsPoint, None, None]: for line_bytes in gps_data.splitlines(): match = NMEA_LINE_REGEX.match(line_bytes) if match is None: @@ -44,7 +44,7 @@ def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]: if not nmea.is_valid: continue epoch_ms = int(match.group(1)) - yield geo.Point( + yield geo.GpsPoint( time=epoch_ms, lat=nmea.latitude, lon=nmea.longitude, @@ -90,7 +90,7 @@ def extract_camera_model(fp: T.BinaryIO) -> str: return "" -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: +def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.GpsPoint]]: gps_data = simple_mp4_parser.parse_mp4_data_first(fp, [b"free", b"gps "]) if gps_data is None: return None @@ -108,7 +108,7 @@ def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: return points -def parse_gps_points(path: pathlib.Path) -> T.List[geo.Point]: +def parse_gps_points(path: pathlib.Path) -> T.List[geo.GpsPoint]: with path.open("rb") as fp: points = extract_points(fp) diff --git a/mapillary_tools/geotag/camm_parser.py b/mapillary_tools/geotag/camm_parser.py index 994769d4..966f7ad6 100644 --- a/mapillary_tools/geotag/camm_parser.py +++ b/mapillary_tools/geotag/camm_parser.py @@ -3,6 +3,7 @@ import dataclasses import io import logging +import math import pathlib import typing as T from enum import Enum @@ -81,12 +82,12 @@ class CAMMType(Enum): def _parse_point_from_sample( fp: T.BinaryIO, sample: sample_parser.Sample -) -> T.Optional[geo.Point]: +) -> T.Optional[geo.GpsPoint]: fp.seek(sample.offset, io.SEEK_SET) data = fp.read(sample.size) box = CAMMSampleData.parse(data) if box.type == CAMMType.MIN_GPS.value: - return geo.Point( + return geo.GpsPoint( time=sample.time_offset, lat=box.data[0], lon=box.data[1], @@ -96,19 +97,39 @@ def _parse_point_from_sample( elif box.type == CAMMType.GPS.value: # Not using box.data.time_gps_epoch as the point timestamp # because it is from another clock - return geo.Point( + speed = math.sqrt( + math.pow(box.data.velocity_east, 2) + math.pow(box.data.velocity_north, 2) + ) + + return geo.GpsPoint( time=sample.time_offset, lat=box.data.latitude, lon=box.data.longitude, alt=box.data.altitude, angle=None, + unix_timestamp_ms=int( + _gps_timestamp_to_unix(box.data.time_gps_epoch) * 1000 + ), + gps_fix=geo.GPSFix(box.data.gps_fix_type), + gps_ground_speed=speed, + gps_horizontal_accuracy=box.data.horizontal_accuracy, ) return None +def _gps_timestamp_to_unix(ts: float) -> float: + """ + Convert a GPS timestamp to UNIX timestamp. + See https://stackoverflow.com/questions/20521750/ticks-between-unix-epoch-and-gps-epoch + for some info on GPS timestamp. Here we arbitrarily use the offset as of 2023, since we + are not interested in precision to the second. + """ + return ts + 315964800 - 18 + + def filter_points_by_elst( - points: T.Iterable[geo.Point], elst: T.Sequence[T.Tuple[float, float]] -) -> T.Generator[geo.Point, None, None]: + points: T.Iterable[geo.GpsPoint], elst: T.Sequence[T.Tuple[float, float]] +) -> T.Generator[geo.GpsPoint, None, None]: empty_elst = [entry for entry in elst if entry[0] == -1] if empty_elst: offset = empty_elst[-1][1] @@ -159,7 +180,7 @@ def _extract_camm_samples( yield from camm_samples -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: +def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.GpsPoint]]: """ Return a list of points (could be empty) if it is a valid CAMM video, otherwise None @@ -223,7 +244,7 @@ def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: return points -def parse_gpx(path: pathlib.Path) -> T.List[geo.Point]: +def parse_gpx(path: pathlib.Path) -> T.List[geo.GpsPoint]: with path.open("rb") as fp: points = extract_points(fp) if points is None: diff --git a/mapillary_tools/geotag/geotag_images_from_gpx_file.py b/mapillary_tools/geotag/geotag_images_from_gpx_file.py index 23337f8e..e69b632c 100644 --- a/mapillary_tools/geotag/geotag_images_from_gpx_file.py +++ b/mapillary_tools/geotag/geotag_images_from_gpx_file.py @@ -32,7 +32,11 @@ def __init__( len(tracks), source_path, ) - self.points: T.List[geo.Point] = sum(tracks, []) + points: T.List[geo.GpsPoint] = sum(tracks, []) + self.points = [ + geo.Point(time=p.time, lat=p.lat, lon=p.lon, alt=p.alt, angle=p.angle) + for p in points + ] self.image_paths = image_paths self.source_path = source_path self.use_gpx_start_time = use_gpx_start_time @@ -120,7 +124,7 @@ def to_description(self) -> T.List[types.ImageMetadataOrError]: ) -Track = T.List[geo.Point] +Track = T.List[geo.GpsPoint] def parse_gpx(gpx_file: Path) -> T.List[Track]: @@ -134,13 +138,18 @@ def parse_gpx(gpx_file: Path) -> T.List[Track]: tracks.append([]) for point in segment.points: if point.time is not None: + time = geo.as_unix_time(point.time) if point.time else 0 tracks[-1].append( - geo.Point( - time=geo.as_unix_time(point.time), + geo.GpsPoint( + time=time, lat=point.latitude, lon=point.longitude, alt=point.elevation, angle=None, + gps_fix=None, + gps_precision=None, + gps_ground_speed=point.speed, + unix_timestamp_ms=int(time * 1000), ) ) diff --git a/mapillary_tools/geotag/geotag_images_from_nmea_file.py b/mapillary_tools/geotag/geotag_images_from_nmea_file.py index 7be409d1..3472ced9 100644 --- a/mapillary_tools/geotag/geotag_images_from_nmea_file.py +++ b/mapillary_tools/geotag/geotag_images_from_nmea_file.py @@ -27,7 +27,7 @@ def __init__( ) -def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.Point]: +def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.GpsPoint]: with nmea_file.open("r") as f: lines = f.readlines() lines = [line.rstrip("\n\r") for line in lines] @@ -49,10 +49,19 @@ def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.Point]: if "$GPGGA" in line: data = pynmea2.parse(line) dt = datetime.datetime.combine(date, data.timestamp) + time = geo.as_unix_time(dt) lat, lon, alt = data.latitude, data.longitude, data.altitude points.append( - geo.Point( - time=geo.as_unix_time(dt), lat=lat, lon=lon, alt=alt, angle=None + geo.GpsPoint( + time=time, + lat=lat, + lon=lon, + alt=alt, + angle=None, + unix_timestamp_ms=int(time * 1000), + gps_fix=None, + gps_ground_speed=None, + gps_precision=None, ) ) diff --git a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py index aebe9de4..248af109 100644 --- a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py @@ -45,15 +45,9 @@ def geotag_video(element: ET.Element) -> types.VideoMetadataOrError: points = geo.extend_deduplicate_points(points) assert points, "must have at least one point" - if all(isinstance(p, geo.PointWithFix) for p in points): - points = T.cast( - T.List[geo.Point], - gpmf_gps_filter.remove_noisy_points( - T.cast(T.List[geo.PointWithFix], points) - ), - ) - if not points: - raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") + points = list(gpmf_gps_filter.remove_noisy_points(points)) + if not points: + raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") stationary = video_utils.is_video_stationary( geo.get_max_distance_from_start([(p.lat, p.lon) for p in points]) @@ -66,7 +60,7 @@ def geotag_video(element: ET.Element) -> types.VideoMetadataOrError: video_path, md5sum=None, filetype=types.FileType.VIDEO, - points=points, + points=list(points), make=exif.extract_make(), model=exif.extract_model(), ) diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index 77be8c6f..acc02f48 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -160,11 +160,11 @@ def geotag_video( video_metadata.points = geo.extend_deduplicate_points(video_metadata.points) assert video_metadata.points, "must have at least one point" - if all(isinstance(p, geo.PointWithFix) for p in video_metadata.points): + if all(isinstance(p, geo.GpsPoint) for p in video_metadata.points): video_metadata.points = T.cast( T.List[geo.Point], gpmf_gps_filter.remove_noisy_points( - T.cast(T.List[geo.PointWithFix], video_metadata.points) + T.cast(T.List[geo.GpsPoint], video_metadata.points) ), ) if not video_metadata.points: diff --git a/mapillary_tools/geotag/gpmf_gps_filter.py b/mapillary_tools/geotag/gpmf_gps_filter.py index dad3769a..b1093211 100644 --- a/mapillary_tools/geotag/gpmf_gps_filter.py +++ b/mapillary_tools/geotag/gpmf_gps_filter.py @@ -13,8 +13,8 @@ def remove_outliers( - sequence: T.Sequence[geo.PointWithFix], -) -> T.Sequence[geo.PointWithFix]: + sequence: T.Sequence[geo.GpsPoint], +) -> T.Sequence[geo.GpsPoint]: distances = [ geo.gps_distance((left.lat, left.lon), (right.lat, right.lon)) for left, right in geo.pairwise(sequence) @@ -50,14 +50,14 @@ def remove_outliers( ) return T.cast( - T.List[geo.PointWithFix], + T.List[geo.GpsPoint], gps_filter.find_majority(merged.values()), ) def remove_noisy_points( - sequence: T.Sequence[geo.PointWithFix], -) -> T.Sequence[geo.PointWithFix]: + sequence: T.Sequence[geo.GpsPoint], +) -> T.Sequence[geo.GpsPoint]: num_points = len(sequence) sequence = [ p diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 06716571..f9218655 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -172,7 +172,7 @@ class KLVDict(T.TypedDict): # ] def gps_from_stream( stream: T.Sequence[KLVDict], -) -> T.Generator[geo.PointWithFix, None, None]: +) -> T.Generator[geo.GpsPoint, None, None]: indexed: T.Dict[bytes, T.List[T.List[T.Any]]] = { klv["key"]: klv["data"] for klv in stream } @@ -204,7 +204,7 @@ def gps_from_stream( lat, lon, alt, ground_speed, _speed_3d = [ v / s for v, s in zip(point, scal_values) ] - yield geo.PointWithFix( + yield geo.GpsPoint( # will figure out the actual timestamp later time=0, lat=lat, @@ -233,8 +233,8 @@ def _find_first_device_id(stream: T.Sequence[KLVDict]) -> int: return device_id -def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.PointWithFix]: - sample_points: T.List[geo.PointWithFix] = [] +def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.GpsPoint]: + sample_points: T.List[geo.GpsPoint] = [] for klv in stream: if klv["key"] == b"STRM": @@ -270,9 +270,9 @@ def _extract_dvnm_from_samples( def _extract_points_from_samples( fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample] -) -> T.List[geo.PointWithFix]: +) -> T.List[geo.GpsPoint]: # To keep GPS points from different devices separated - points_by_dvid: T.Dict[int, T.List[geo.PointWithFix]] = {} + points_by_dvid: T.Dict[int, T.List[geo.GpsPoint]] = {} for sample in samples: fp.seek(sample.offset, io.SEEK_SET) @@ -297,7 +297,7 @@ def _extract_points_from_samples( return values[0] if values else [] -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.PointWithFix]]: +def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.GpsPoint]]: """ Return a list of points (could be empty) if it is a valid GoPro video, otherwise None @@ -380,7 +380,7 @@ def extract_camera_model(fp: T.BinaryIO) -> str: return unicode_names[0].strip() -def parse_gpx(path: pathlib.Path) -> T.List[geo.PointWithFix]: +def parse_gpx(path: pathlib.Path) -> T.List[geo.GpsPoint]: with path.open("rb") as fp: points = extract_points(fp) if points is None: diff --git a/mapillary_tools/video_data_extraction/extract_video_data.py b/mapillary_tools/video_data_extraction/extract_video_data.py index 4c5a9915..65a0d866 100644 --- a/mapillary_tools/video_data_extraction/extract_video_data.py +++ b/mapillary_tools/video_data_extraction/extract_video_data.py @@ -125,7 +125,7 @@ def _extract_points( if parser.must_rebase_times_to_zero: points = self._rebase_times(points) - return points + return self._gps_points_to_points(points) @staticmethod def _check_paths(import_paths: T.Sequence[Path]): @@ -146,7 +146,7 @@ def _check_sources_cardinality(self, files: T.Sequence[Path]): ) @staticmethod - def _sanitize_points(points: T.Sequence[geo.Point]) -> T.Sequence[geo.Point]: + def _sanitize_points(points: T.Sequence[geo.GpsPoint]) -> T.Sequence[geo.GpsPoint]: """ Deduplicates points, when possible removes noisy ones, and checks against stationary videos @@ -158,16 +158,10 @@ def _sanitize_points(points: T.Sequence[geo.Point]) -> T.Sequence[geo.Point]: ) points = geo.extend_deduplicate_points(points) + points = gpmf_gps_filter.remove_noisy_points(points) - if all(isinstance(p, geo.PointWithFix) for p in points): - points = T.cast( - T.Sequence[geo.Point], - gpmf_gps_filter.remove_noisy_points( - T.cast(T.Sequence[geo.PointWithFix], points) - ), - ) - if not points: - raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") + if not points: + raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") stationary = video_utils.is_video_stationary( geo.get_max_distance_from_start([(p.lat, p.lon) for p in points]) @@ -179,7 +173,7 @@ def _sanitize_points(points: T.Sequence[geo.Point]) -> T.Sequence[geo.Point]: return points @staticmethod - def _rebase_times(points: T.Sequence[geo.Point]): + def _rebase_times(points: T.Sequence[geo.GpsPoint]): """ Make point times start from 0 """ @@ -188,3 +182,10 @@ def _rebase_times(points: T.Sequence[geo.Point]): for p in points: p.time = p.time - first_timestamp return points + + @staticmethod + def _gps_points_to_points(points: T.Sequence[geo.GpsPoint]): + return [ + geo.Point(time=p.time, lat=p.lat, lon=p.lon, alt=p.alt, angle=p.angle) + for p in points + ] diff --git a/mapillary_tools/video_data_extraction/extractors/base_parser.py b/mapillary_tools/video_data_extraction/extractors/base_parser.py index d0a29ee6..bb995ac1 100644 --- a/mapillary_tools/video_data_extraction/extractors/base_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/base_parser.py @@ -43,7 +43,7 @@ def must_rebase_times_to_zero(self) -> bool: raise NotImplementedError @abc.abstractmethod - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: raise NotImplementedError @abc.abstractmethod diff --git a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py index 7f088677..f3f96b6a 100644 --- a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py @@ -12,7 +12,7 @@ class BlackVueParser(BaseParser): pointsFound: bool = False - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: source_path = self.geotag_source_path if not source_path: return [] diff --git a/mapillary_tools/video_data_extraction/extractors/camm_parser.py b/mapillary_tools/video_data_extraction/extractors/camm_parser.py index 98e0b8d6..7aa360b4 100644 --- a/mapillary_tools/video_data_extraction/extractors/camm_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/camm_parser.py @@ -16,7 +16,7 @@ def __camera_info(self) -> T.Tuple[str, str]: with self.videoPath.open("rb") as fp: return camm_parser.extract_camera_make_and_model(fp) - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: source_path = self.geotag_source_path if not source_path: return [] diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py index c21dd4ec..81712863 100644 --- a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py @@ -44,7 +44,7 @@ def __init__( video_path, options, parser_options, xml_content ) - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: return self.exiftoolXmlParser.extract_points() if self.exiftoolXmlParser else [] def extract_make(self) -> T.Optional[str]: diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py index 6c8751b9..4e6c4ec9 100644 --- a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py @@ -41,7 +41,7 @@ def __init__( element = next(etree.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)) self.exifToolReadVideo = ExifToolReadVideo(ET.ElementTree(element)) - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: return ( self.exifToolReadVideo.extract_gps_track() if self.exifToolReadVideo else [] ) diff --git a/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py b/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py index d48418b4..af7feeff 100644 --- a/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py @@ -35,7 +35,7 @@ def __init__( blackvue_parser = BlackVueParser(video_path, options, parser_options) self.parsers = [camm_parser, gopro_parser, blackvue_parser] - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: for parser in self.parsers: points = parser.extract_points() if points: diff --git a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py index 3a4c3efd..8867b5fc 100644 --- a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py @@ -12,7 +12,7 @@ class GoProParser(BaseParser): pointsFound: bool = False - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: source_path = self.geotag_source_path if not source_path: return [] diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py index d38347b8..b1c3d78f 100644 --- a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py @@ -10,7 +10,7 @@ class GpxParser(BaseParser): must_rebase_times_to_zero = True parser_label = "gpx" - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: path = self.geotag_source_path if not path: return [] @@ -19,7 +19,7 @@ def extract_points(self) -> T.Sequence[geo.Point]: except Exception as e: return [] - points: T.Sequence[geo.Point] = sum(tracks, []) + points: T.Sequence[geo.GpsPoint] = sum(tracks, []) return points def extract_make(self) -> T.Optional[str]: diff --git a/mapillary_tools/video_data_extraction/extractors/nmea_parser.py b/mapillary_tools/video_data_extraction/extractors/nmea_parser.py index 22805dea..ca19d112 100644 --- a/mapillary_tools/video_data_extraction/extractors/nmea_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/nmea_parser.py @@ -10,7 +10,7 @@ class NmeaParser(BaseParser): must_rebase_times_to_zero = True parser_label = "nmea" - def extract_points(self) -> T.Sequence[geo.Point]: + def extract_points(self) -> T.Sequence[geo.GpsPoint]: source_path = self.geotag_source_path if not source_path: return [] diff --git a/tests/cli/gpmf_parser.py b/tests/cli/gpmf_parser.py index 0ad1ce39..6b09a7a5 100644 --- a/tests/cli/gpmf_parser.py +++ b/tests/cli/gpmf_parser.py @@ -14,7 +14,7 @@ def _convert_points_to_gpx_track_segment( - points: T.Sequence[geo.PointWithFix], + points: T.Sequence[geo.GpsPoint], ) -> gpxpy.gpx.GPXTrackSegment: gpx_segment = gpxpy.gpx.GPXTrackSegment() gps_fix_map = { diff --git a/tests/cli/gps_filter.py b/tests/cli/gps_filter.py index 96b630a6..8c8e29e5 100644 --- a/tests/cli/gps_filter.py +++ b/tests/cli/gps_filter.py @@ -36,7 +36,7 @@ def _parse_args(): def _gpx_track_segment_to_points( segment: gpxpy.gpx.GPXTrackSegment, -) -> T.List[geo.PointWithFix]: +) -> T.List[geo.GpsPoint]: gps_fix_map = { "none": geo.GPSFix.NO_FIX, "2d": geo.GPSFix.FIX_2D, @@ -57,7 +57,7 @@ def _gpx_track_segment_to_points( else: ground_speed = None - point = geo.PointWithFix( + point = geo.GpsPoint( time=geo.as_unix_time(T.cast(datetime.datetime, p.time)), lat=p.latitude, lon=p.longitude, @@ -74,10 +74,10 @@ def _gpx_track_segment_to_points( def _filter_noise( - points: T.Sequence[geo.PointWithFix], + points: T.Sequence[geo.GpsPoint], gps_fix: T.Set[int], max_dop: float, -) -> T.List[geo.PointWithFix]: +) -> T.List[geo.GpsPoint]: return [ p for p in points @@ -87,9 +87,9 @@ def _filter_noise( def _filter_outliers( - points: T.List[geo.PointWithFix], + points: T.List[geo.GpsPoint], gps_precision: float, -) -> T.List[geo.PointWithFix]: +) -> T.List[geo.GpsPoint]: if gps_precision == 0: return points @@ -118,7 +118,7 @@ def _filter_outliers( merged = gps_filter.dbscan(subseqs, gps_filter.speed_le(max_speed)) return T.cast( - T.List[geo.PointWithFix], + T.List[geo.GpsPoint], gps_filter.find_majority(merged.values()), ) diff --git a/tests/unit/test_blackvue_parser.py b/tests/unit/test_blackvue_parser.py index 9ec65a45..6b872fe9 100644 --- a/tests/unit/test_blackvue_parser.py +++ b/tests/unit/test_blackvue_parser.py @@ -44,13 +44,13 @@ def test_parse_points(): x = blackvue_parser.extract_points(io.BytesIO(data)) assert x is not None assert [ - geo.Point( + geo.GpsPoint( time=0.0, lat=38.8861575, lon=-76.99239516666667, alt=10.2, angle=None ), - geo.Point( + geo.GpsPoint( time=0.968, lat=38.88615816666667, lon=-76.992434, alt=7.7, angle=None ), - geo.Point( + geo.GpsPoint( time=0.968, lat=38.88615816666667, lon=-76.992434, alt=7.7, angle=None ), ] == list(x) diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index ca22b571..74295da1 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -15,10 +15,10 @@ def test_filter_points_by_edit_list(): assert [] == list(camm_parser.filter_points_by_elst([], [])) points = [ - geo.Point(time=0, lat=0, lon=0, alt=None, angle=None), - geo.Point(time=0.23, lat=0, lon=0, alt=None, angle=None), - geo.Point(time=0.29, lat=0, lon=0, alt=None, angle=None), - geo.Point(time=0.31, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.23, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.29, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.31, lat=0, lon=0, alt=None, angle=None), ] assert points == list(camm_parser.filter_points_by_elst(points, [])) assert [dataclasses.replace(p, time=p.time + 4.4) for p in points] == list( @@ -26,8 +26,8 @@ def test_filter_points_by_edit_list(): ) assert [ - geo.Point(time=0.23 + 4.4, lat=0, lon=0, alt=None, angle=None), - geo.Point(time=0.31 + 4.4, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.23 + 4.4, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.31 + 4.4, lat=0, lon=0, alt=None, angle=None), ] == list( camm_parser.filter_points_by_elst( points, [(-1, 3), (-1, 4.4), (0.21, 0.04), (0.30, 0.04)] @@ -35,8 +35,8 @@ def test_filter_points_by_edit_list(): ) assert [ - geo.Point(time=0.29 + 4.4, lat=0, lon=0, alt=None, angle=None), - geo.Point(time=0.31 + 4.4, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.29 + 4.4, lat=0, lon=0, alt=None, angle=None), + geo.GpsPoint(time=0.31 + 4.4, lat=0, lon=0, alt=None, angle=None), ] == list(camm_parser.filter_points_by_elst(points, [(-1, 4.4), (0.24, 0.3)])) @@ -88,10 +88,10 @@ def approximate(expected, actual): def test_build_and_parse(): points = [ - geo.Point(time=0.1, lat=0.01, lon=0.2, alt=None, angle=None), - geo.Point(time=0.23, lat=0.001, lon=0.21, alt=None, angle=None), - geo.Point(time=0.29, lat=0.002, lon=0.203, alt=None, angle=None), - geo.Point(time=0.31, lat=0.0025, lon=0.2004, alt=None, angle=None), + geo.GpsPoint(time=0.1, lat=0.01, lon=0.2, alt=None, angle=None), + geo.GpsPoint(time=0.23, lat=0.001, lon=0.21, alt=None, angle=None), + geo.GpsPoint(time=0.29, lat=0.002, lon=0.203, alt=None, angle=None), + geo.GpsPoint(time=0.31, lat=0.0025, lon=0.2004, alt=None, angle=None), ] metadata = types.VideoMetadata( Path(""), @@ -106,16 +106,16 @@ def test_build_and_parse(): assert x.model == "test_model" assert approximate( [ - geo.Point( + geo.GpsPoint( time=0.09999999988358467, lat=0.01, lon=0.2, alt=-1.0, angle=None ), - geo.Point( + geo.GpsPoint( time=0.22999999580209396, lat=0.001, lon=0.21, alt=-1.0, angle=None ), - geo.Point( + geo.GpsPoint( time=0.2899999996391125, lat=0.002, lon=0.203, alt=-1.0, angle=None ), - geo.Point( + geo.GpsPoint( time=0.3099999994295649, lat=0.0025, lon=0.2004, alt=-1.0, angle=None ), ], @@ -125,7 +125,7 @@ def test_build_and_parse(): def test_build_and_parse2(): points = [ - geo.Point(time=0.1, lat=0.01, lon=0.2, alt=None, angle=None), + geo.GpsPoint(time=0.1, lat=0.01, lon=0.2, alt=None, angle=None), ] metadata = types.VideoMetadata( Path(""), @@ -139,14 +139,18 @@ def test_build_and_parse2(): assert x.make == "test_make汉字" assert x.model == "test_model汉字" assert approximate( - [geo.Point(time=0.09999999988358468, lat=0.01, lon=0.2, alt=-1.0, angle=None)], + [ + geo.GpsPoint( + time=0.09999999988358468, lat=0.01, lon=0.2, alt=-1.0, angle=None + ) + ], x.points, ) def test_build_and_parse9(): points = [ - geo.Point(time=0.0, lat=0.01, lon=0.2, alt=None, angle=None), + geo.GpsPoint(time=0.0, lat=0.01, lon=0.2, alt=None, angle=None), ] metadata = types.VideoMetadata( Path(""), @@ -157,13 +161,13 @@ def test_build_and_parse9(): model="test_model汉字", ) x = build_mp4(metadata) - assert [geo.Point(time=0.0, lat=0.01, lon=0.2, alt=-1.0, angle=None)] == x.points + assert [geo.GpsPoint(time=0.0, lat=0.01, lon=0.2, alt=-1.0, angle=None)] == x.points def test_build_and_parse10(): points = [ - geo.Point(time=0.0, lat=0.01, lon=0.2, alt=None, angle=None), - geo.Point(time=0.1, lat=0.03, lon=0.2, alt=None, angle=None), + geo.GpsPoint(time=0.0, lat=0.01, lon=0.2, alt=None, angle=None), + geo.GpsPoint(time=0.1, lat=0.03, lon=0.2, alt=None, angle=None), ] metadata = types.VideoMetadata( Path(""), @@ -176,8 +180,8 @@ def test_build_and_parse10(): x = build_mp4(metadata) assert approximate( [ - geo.Point(time=0.0, lat=0.01, lon=0.2, alt=-1.0, angle=None), - geo.Point(time=0.1, lat=0.03, lon=0.2, alt=-1.0, angle=None), + geo.GpsPoint(time=0.0, lat=0.01, lon=0.2, alt=-1.0, angle=None), + geo.GpsPoint(time=0.1, lat=0.03, lon=0.2, alt=-1.0, angle=None), ], x.points, )