Skip to content

Commit

Permalink
feat: extract IMU from GoPro videos (#692)
Browse files Browse the repository at this point in the history
* add imu structs

* add the extraction

* use orin/orio/mtrx to calibrate IMUs

* refactor

* refactor

* fix types

* fix types
  • Loading branch information
ptpt authored Dec 10, 2024
1 parent 8dec895 commit 8bcc73d
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 34 deletions.
251 changes: 221 additions & 30 deletions mapillary_tools/geotag/gpmf_parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import dataclasses
import io
import itertools
import pathlib
import typing as T

import construct as C

from .. import geo
from ..mp4 import mp4_sample_parser as sample_parser
from .. import geo, imu
from ..mp4.mp4_sample_parser import TrackBoxParser, MovieBoxParser, Sample

"""
Parsing GPS from GPMF data format stored in GoPros. See the GPMF spec: https://github.com/gopro/gpmf-parser
Expand Down Expand Up @@ -125,6 +127,14 @@ class KLVDict(T.TypedDict):
GPMFSampleData = C.GreedyRange(KLV)


@dataclasses.dataclass
class TelemetryData:
gps: T.List[geo.PointWithFix]
accl: T.List[imu.AccelerationData]
gyro: T.List[imu.GyroscopeData]
magn: T.List[imu.MagnetometerData]


# A GPS5 stream example:
# key = b'STRM' type = b'\x00' structure_size = 1 repeat = 400
# data = ListContainer:
Expand Down Expand Up @@ -298,8 +308,120 @@ def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.PointWithF
return sample_points


# a sensor matrix with only [1,0,0, 0,-1,0, 0,0,1], is just a form of non-calibrated sensor orientation
def _is_matrix_calibration(matrix: T.Sequence[float]) -> bool:
for v in matrix:
if v not in [0, -1, 1]:
return True
return False


def _build_matrix(
orin: T.Union[bytes, T.Sequence[int]], orio: T.Union[bytes, T.Sequence[int]]
) -> T.Sequence[float]:
matrix = []

# list(b'aA') == [97, 65]
lower_a, upper_A = 97, 65

for out_char in orin:
for in_char in orio:
if in_char == out_char:
matrix.append(1.0)
elif (in_char - lower_a) == (out_char - upper_A):
matrix.append(-1.0)
elif (in_char - upper_A) == (out_char - lower_a):
matrix.append(-1.0)
else:
matrix.append(0.0)

return matrix


def _apply_matrix(
matrix: T.Sequence[float], values: T.Sequence[float]
) -> T.Generator[float, None, None]:
size = len(values)
assert (
len(matrix) == size * size
), f"expecting a square matrix of size {size} x {size} but got {len(matrix)}"

for y in range(size):
row_start = y * size
yield sum(matrix[row_start + x] * values[x] for x in range(size))


def _flatten(nested: T.Sequence[T.Sequence[float]]) -> T.List[float]:
output: T.List[float] = []
for row in nested:
output.extend(row)
return output


def _get_matrix(klv: T.Dict[bytes, KLVDict]) -> T.Optional[T.Sequence[float]]:
mtrx = klv.get(b"MTRX")
if mtrx is not None:
matrix: T.Sequence[float] = _flatten(mtrx["data"])
if _is_matrix_calibration(matrix):
return matrix

orin = klv.get(b"ORIN")
orio = klv.get(b"ORIO")

if orin is not None and orio is not None:
matrix = _build_matrix(b"".join(orin["data"]), b"".join(orio["data"]))
return matrix

return None


def _scale_and_calibrate(
stream: T.Sequence[KLVDict], key: bytes
) -> T.Generator[T.Sequence[float], None, None]:
indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream}

klv = indexed.get(key)
if klv is None:
return

scal_klv = indexed.get(b"SCAL")

if scal_klv is not None:
# replace 0s with 1s to avoid division by zero
scals = [s or 1 for s in _flatten(scal_klv["data"])]

if not scals:
scals = [1]

if len(scals) == 1:
# infinite repeat
scales: T.Iterable[float] = itertools.repeat(scals[0])
else:
scales = scals

matrix = _get_matrix(indexed)

for values in klv["data"]:
if matrix is None:
yield tuple(v / s for v, s in zip(values, scales))
else:
yield tuple(v / s for v, s in zip(_apply_matrix(matrix, values), scales))


def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes):
values: T.List[T.Sequence[float]] = []

for klv in stream:
if klv["key"] == b"STRM":
values = list(_scale_and_calibrate(klv["data"], key))
if values:
break

return values


def _extract_dvnm_from_samples(
fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample]
fp: T.BinaryIO, samples: T.Iterable[Sample]
) -> T.Dict[int, bytes]:
dvnm_by_dvid: T.Dict[int, bytes] = {}

Expand All @@ -322,10 +444,13 @@ def _extract_dvnm_from_samples(


def _extract_points_from_samples(
fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample]
) -> T.List[geo.PointWithFix]:
fp: T.BinaryIO, samples: T.Iterable[Sample]
) -> TelemetryData:
# To keep GPS points from different devices separated
points_by_dvid: T.Dict[int, T.List[geo.PointWithFix]] = {}
accls_by_dvid: T.Dict[int, T.List[imu.AccelerationData]] = {}
gyros_by_dvid: T.Dict[int, T.List[imu.GyroscopeData]] = {}
magns_by_dvid: T.Dict[int, T.List[imu.MagnetometerData]] = {}

for sample in samples:
fp.seek(sample.raw_sample.offset, io.SEEK_SET)
Expand All @@ -335,58 +460,124 @@ def _extract_points_from_samples(
# iterate devices
devices = (klv for klv in gpmf_sample_data if klv["key"] == b"DEVC")
for device in devices:
device_id = _find_first_device_id(device["data"])

sample_points = _find_first_gps_stream(device["data"])
if sample_points:
# interpolate timestamps in between
avg_timedelta = sample.exact_timedelta / len(sample_points)
for idx, point in enumerate(sample_points):
point.time = sample.exact_time + avg_timedelta * idx

device_id = _find_first_device_id(device["data"])
device_points = points_by_dvid.setdefault(device_id, [])
device_points.extend(sample_points)

values = list(points_by_dvid.values())
return values[0] if values else []
sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL")
if sample_accls:
# interpolate timestamps in between
avg_delta = sample.exact_timedelta / len(sample_accls)
accls_by_dvid.setdefault(device_id, []).extend(
imu.AccelerationData(
time=sample.exact_time + avg_delta * idx,
x=x,
y=y,
z=z,
)
for idx, (z, x, y, *_) in enumerate(sample_accls)
)

sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO")
if sample_gyros:
# interpolate timestamps in between
avg_delta = sample.exact_timedelta / len(sample_gyros)
gyros_by_dvid.setdefault(device_id, []).extend(
imu.GyroscopeData(
time=sample.exact_time + avg_delta * idx,
x=x,
y=y,
z=z,
)
for idx, (z, x, y, *_) in enumerate(sample_gyros)
)

sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN")
if sample_magns:
# interpolate timestamps in between
avg_delta = sample.exact_timedelta / len(sample_magns)
magns_by_dvid.setdefault(device_id, []).extend(
imu.MagnetometerData(
time=sample.exact_time + avg_delta * idx,
x=x,
y=y,
z=z,
)
for idx, (z, x, y, *_) in enumerate(sample_magns)
)

return TelemetryData(
gps=list(points_by_dvid.values())[0] if points_by_dvid else [],
accl=list(accls_by_dvid.values())[0] if accls_by_dvid else [],
gyro=list(gyros_by_dvid.values())[0] if gyros_by_dvid else [],
magn=list(magns_by_dvid.values())[0] if magns_by_dvid else [],
)


def _is_gpmd_description(description: T.Dict) -> bool:
return description["format"] == b"gpmd"


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.PointWithFix]]:
def _contains_gpmd_description(track: TrackBoxParser) -> bool:
descriptions = track.extract_sample_descriptions()
return any(_is_gpmd_description(d) for d in descriptions)


def _filter_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, None]:
for sample in track.extract_samples():
if _is_gpmd_description(sample.description):
yield sample


def extract_points(fp: T.BinaryIO) -> T.List[geo.PointWithFix]:
"""
Return a list of points (could be empty) if it is a valid GoPro video,
otherwise None
"""
points = None
moov = sample_parser.MovieBoxParser.parse_stream(fp)
moov = MovieBoxParser.parse_stream(fp)
for track in moov.extract_tracks():
descriptions = track.extract_sample_descriptions()
if any(_is_gpmd_description(d) for d in descriptions):
gpmd_samples = (
sample
for sample in track.extract_samples()
if _is_gpmd_description(sample.description)
)
points = list(_extract_points_from_samples(fp, gpmd_samples))
if _contains_gpmd_description(track):
gpmd_samples = _filter_gpmd_samples(track)
telemetry = _extract_points_from_samples(fp, gpmd_samples)
# return the firstly found non-empty points
if points:
return points
if telemetry.gps:
return telemetry.gps

# points could be empty list or None here
return points
return []


def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[TelemetryData]:
"""
Return the telemetry data from the first found GoPro GPMF track
"""
moov = MovieBoxParser.parse_stream(fp)

for track in moov.extract_tracks():
if _contains_gpmd_description(track):
gpmd_samples = _filter_gpmd_samples(track)
telemetry = _extract_points_from_samples(fp, gpmd_samples)
# return the firstly found non-empty points
if telemetry.gps:
return telemetry

# points could be empty list or None here
return None


def extract_all_device_names(fp: T.BinaryIO) -> T.Dict[int, bytes]:
moov = sample_parser.MovieBoxParser.parse_stream(fp)
moov = MovieBoxParser.parse_stream(fp)
for track in moov.extract_tracks():
descriptions = track.extract_sample_descriptions()
if any(_is_gpmd_description(d) for d in descriptions):
gpmd_samples = (
sample
for sample in track.extract_samples()
if _is_gpmd_description(sample.description)
)
if _contains_gpmd_description(track):
gpmd_samples = _filter_gpmd_samples(track)
device_names = _extract_dvnm_from_samples(fp, gpmd_samples)
if device_names:
return device_names
Expand Down
25 changes: 25 additions & 0 deletions mapillary_tools/imu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import typing as T


# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction.
class GyroscopeData(T.NamedTuple):
time: float
x: float
y: float
z: float


# Accelerometer reading in meters/second^2 along XYZ axes of the camera.
class AccelerationData(T.NamedTuple):
time: float
x: float
y: float
z: float


# Ambient magnetic field.
class MagnetometerData(T.NamedTuple):
time: float
x: float
y: float
z: float
Loading

0 comments on commit 8bcc73d

Please sign in to comment.