From de3098be76b28314b54b917fb40f0cb6f0c275fd Mon Sep 17 00:00:00 2001 From: Caglar Pir Date: Thu, 15 Jan 2026 10:26:14 +0100 Subject: [PATCH] Use gps time stamps when calculating average speed on videos Summary: In timelapse videos video time and gps track time have different scales. Use the gps track timescale when calculating average speed of the gps track. --- mapillary_tools/geo.py | 176 ++++++++++++++-- tests/unit/test_geo.py | 462 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 620 insertions(+), 18 deletions(-) diff --git a/mapillary_tools/geo.py b/mapillary_tools/geo.py index 1e5fa3893..371210563 100644 --- a/mapillary_tools/geo.py +++ b/mapillary_tools/geo.py @@ -52,17 +52,49 @@ def gps_distance(latlon_1: tuple[float, float], latlon_2: tuple[float, float]) - def avg_speed(sequence: T.Sequence[PointLike]) -> float: + """ + Calculate average speed over a sequence of points. + Uses GPS epoch time when available (from GPSPoint.epoch_time or CAMMGPSPoint.time_gps_epoch), + otherwise falls back to the time field. + Returns 0.0 for empty or single-element sequences. + Returns NaN if time difference is zero (undefined speed). + """ + # Import here to avoid circular dependency + from . import telemetry + + # Need at least 2 points to calculate speed + if len(sequence) < 2: + return 0.0 + total_distance = 0.0 for cur, nxt in pairwise(sequence): total_distance += gps_distance((cur.lat, cur.lon), (nxt.lat, nxt.lon)) - if sequence: - time_diff = sequence[-1].time - sequence[0].time - else: - time_diff = 0.0 + first = sequence[0] + last = sequence[-1] + + # Try to use GPS epoch time if available + time_diff = 0.0 + if isinstance(first, telemetry.GPSPoint) and isinstance(last, telemetry.GPSPoint): + if ( + first.epoch_time is not None + and last.epoch_time is not None + and first.epoch_time > 0 + and last.epoch_time > 0 + ): + time_diff = last.epoch_time - first.epoch_time + elif isinstance(first, telemetry.CAMMGPSPoint) and isinstance( + last, telemetry.CAMMGPSPoint + ): + if first.time_gps_epoch > 0 and last.time_gps_epoch > 0: + time_diff = last.time_gps_epoch - first.time_gps_epoch + + # Fall back to time field if GPS epoch time not available + if time_diff == 0.0: + time_diff = last.time - first.time if time_diff == 0.0: - return float("inf") + return float("nan") return total_distance / time_diff @@ -141,7 +173,7 @@ def as_unix_time(dt: datetime.datetime | int | float) -> float: if sys.version_info < (3, 10): - def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: + def interpolate(points: T.Sequence[PointLike], t: float, lo: int = 0) -> PointLike: """ Interpolate or extrapolate the point at time t along the sequence of points (sorted by time). """ @@ -152,12 +184,14 @@ def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: # for cur, nex in pairwise(points): # assert cur.time <= nex.time, "Points not sorted" - p = Point(time=t, lat=float("-inf"), lon=float("-inf"), alt=None, angle=None) - idx = bisect.bisect_left(points, p, lo=lo) + times = [p.time for p in points] + # Use bisect_left on the times list + idx = bisect.bisect_left(times, t, lo=lo) + return _interpolate_at_segment_idx(points, t, idx) else: - def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: + def interpolate(points: T.Sequence[PointLike], t: float, lo: int = 0) -> PointLike: """ Interpolate or extrapolate the point at time t along the sequence of points (sorted by time). """ @@ -172,18 +206,19 @@ def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: return _interpolate_at_segment_idx(points, t, idx) -class Interpolator: +class Interpolator(T.Generic[PointLike]): """ Interpolator for interpolating a sequence of timestamps incrementally. + Preserves the type of input points (Point, GPSPoint, or CAMMGPSPoint). """ - tracks: T.Sequence[T.Sequence[Point]] + tracks: T.Sequence[T.Sequence[PointLike]] track_idx: int # interpolation starts from the lower bound point index in the current track lo: int prev_time: float | None - def __init__(self, tracks: T.Sequence[T.Sequence[Point]]): + def __init__(self, tracks: T.Sequence[T.Sequence[PointLike]]): # Remove empty tracks self.tracks = [track for track in tracks if track] @@ -204,7 +239,7 @@ def __init__(self, tracks: T.Sequence[T.Sequence[Point]]): @staticmethod def _lsearch_left( - track: T.Sequence[Point], t: float, lo: int = 0, hi: int | None = None + track: T.Sequence[PointLike], t: float, lo: int = 0, hi: int | None = None ) -> int: """ similar to bisect.bisect_left, but faster in the incremental search case @@ -221,14 +256,14 @@ def _lsearch_left( # assert track[lo - 1].time < t <= track[lo].time return lo - def interpolate(self, t: float) -> Point: + def interpolate(self, t: float) -> PointLike: if self.prev_time is not None: if not (self.prev_time <= t): raise ValueError( f"Require times to be monotonically increasing, but got {self.prev_time} then {t}" ) - interpolated: Point | None = None + interpolated: PointLike | None = None while self.track_idx < len(self.tracks): track = self.tracks[self.track_idx] @@ -318,7 +353,14 @@ def _ecef_from_lla2(lat: float, lon: float) -> tuple[float, float, float]: return x, y, z -def _interpolate_segment(start: Point, end: Point, t: float) -> Point: +def _interpolate_segment(start: PointLike, end: PointLike, t: float) -> PointLike: + """ + Interpolate between two points at time t, preserving the type of the input points. + Supports Point, GPSPoint, and CAMMGPSPoint types. + """ + # Import here to avoid circular dependency + from . import telemetry + try: weight = (t - start.time) / (end.time - start.time) except ZeroDivisionError: @@ -333,10 +375,108 @@ def _interpolate_segment(start: Point, end: Point, t: float) -> Point: else: alt = None - return Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle) + # Determine the type and create the appropriate point + if isinstance(start, telemetry.GPSPoint) and isinstance(end, telemetry.GPSPoint): + # Interpolate GPSPoint-specific fields + epoch_time: float | None + if ( + start.epoch_time is not None + and end.epoch_time is not None + and start.epoch_time > 0 + and end.epoch_time > 0 + ): + epoch_time = start.epoch_time + (end.epoch_time - start.epoch_time) * weight + else: + epoch_time = None + + precision: float | None + if start.precision is not None and end.precision is not None: + precision = start.precision + (end.precision - start.precision) * weight + else: + precision = None + + ground_speed: float | None + if start.ground_speed is not None and end.ground_speed is not None: + ground_speed = ( + start.ground_speed + (end.ground_speed - start.ground_speed) * weight + ) + else: + ground_speed = None + + # For fix, use the start point's value (or could use majority voting logic) + fix = start.fix + + return T.cast( + PointLike, + telemetry.GPSPoint( + time=t, + lat=lat, + lon=lon, + alt=alt, + angle=angle, + epoch_time=epoch_time, + fix=fix, + precision=precision, + ground_speed=ground_speed, + ), + ) + + elif isinstance(start, telemetry.CAMMGPSPoint) and isinstance( + end, telemetry.CAMMGPSPoint + ): + # Interpolate CAMMGPSPoint-specific fields + time_gps_epoch = ( + start.time_gps_epoch + (end.time_gps_epoch - start.time_gps_epoch) * weight + ) + horizontal_accuracy = ( + start.horizontal_accuracy + + (end.horizontal_accuracy - start.horizontal_accuracy) * weight + ) + vertical_accuracy = ( + start.vertical_accuracy + + (end.vertical_accuracy - start.vertical_accuracy) * weight + ) + velocity_east = ( + start.velocity_east + (end.velocity_east - start.velocity_east) * weight + ) + velocity_north = ( + start.velocity_north + (end.velocity_north - start.velocity_north) * weight + ) + velocity_up = start.velocity_up + (end.velocity_up - start.velocity_up) * weight + speed_accuracy = ( + start.speed_accuracy + (end.speed_accuracy - start.speed_accuracy) * weight + ) + + # For gps_fix_type, use the start point's value + gps_fix_type = start.gps_fix_type + + return T.cast( + PointLike, + telemetry.CAMMGPSPoint( + time=t, + lat=lat, + lon=lon, + alt=alt, + angle=angle, + time_gps_epoch=time_gps_epoch, + gps_fix_type=gps_fix_type, + horizontal_accuracy=horizontal_accuracy, + vertical_accuracy=vertical_accuracy, + velocity_east=velocity_east, + velocity_north=velocity_north, + velocity_up=velocity_up, + speed_accuracy=speed_accuracy, + ), + ) + + else: + # Return base Point type + return T.cast(PointLike, Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle)) -def _interpolate_at_segment_idx(points: T.Sequence[Point], t: float, idx: int) -> Point: +def _interpolate_at_segment_idx( + points: T.Sequence[PointLike], t: float, idx: int +) -> PointLike: """ Interpolate time t along the segment between idx - 1 and idx. If idx is out of range, extrapolate it to the nearest segment (first or last). diff --git a/tests/unit/test_geo.py b/tests/unit/test_geo.py index f3f1978f6..701701c2b 100644 --- a/tests/unit/test_geo.py +++ b/tests/unit/test_geo.py @@ -754,3 +754,465 @@ def test_negative_timestamps(self): self.assertAlmostEqual(point.lat, 3.05) self.assertAlmostEqual(point.lon, 3.05) self.assertAlmostEqual(point.alt, 305) + + +class TestAvgSpeedWithGPSEpochTime(unittest.TestCase): + """Test cases for avg_speed using GPS epoch time from GPSPoint and CAMMGPSPoint.""" + + def test_avg_speed_with_base_points(self): + """Test avg_speed with base Point types (uses time field).""" + points = [ + Point(time=0.0, lat=0.0, lon=0.0, alt=0.0, angle=None), + Point(time=10.0, lat=0.001, lon=0.0, alt=0.0, angle=None), + ] + speed = geo.avg_speed(points) + # Distance is approximately 111 meters (0.001 degree of latitude) + # Time is 10 seconds + # Speed should be approximately 11.1 m/s + self.assertAlmostEqual(speed, 11.1, delta=0.5) + + def test_avg_speed_with_gps_points_using_epoch_time(self): + """Test avg_speed with GPSPoint using epoch_time field.""" + from mapillary_tools.telemetry import GPSPoint, GPSFix + + # Video time is 0-10 seconds, but GPS epoch time spans 100 seconds + # This simulates timelapse where video time != GPS time + points = [ + GPSPoint( + time=0.0, # Video time + lat=0.0, + lon=0.0, + alt=0.0, + angle=None, + epoch_time=1000.0, # GPS epoch time + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + GPSPoint( + time=10.0, # Video time (10 sec in video) + lat=0.01, + lon=0.0, + alt=0.0, + angle=None, + epoch_time=1100.0, # GPS epoch time (100 sec in real world) + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + ] + speed = geo.avg_speed(points) + # Distance is approximately 1111 meters (0.01 degree of latitude) + # Using GPS epoch time: 1100 - 1000 = 100 seconds + # Speed should be approximately 11.1 m/s + self.assertAlmostEqual(speed, 11.1, delta=0.5) + + def test_avg_speed_with_gps_points_fallback_to_time(self): + """Test avg_speed with GPSPoint falls back to time when epoch_time is None.""" + from mapillary_tools.telemetry import GPSPoint, GPSFix + + points = [ + GPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=0.0, + angle=None, + epoch_time=None, # No epoch time + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + GPSPoint( + time=10.0, + lat=0.001, + lon=0.0, + alt=0.0, + angle=None, + epoch_time=None, # No epoch time + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + ] + speed = geo.avg_speed(points) + # Should use time field: 10 - 0 = 10 seconds + # Distance ~111 meters, speed ~11.1 m/s + self.assertAlmostEqual(speed, 11.1, delta=0.5) + + def test_avg_speed_with_gps_points_zero_epoch_time_fallback(self): + """Test avg_speed falls back to time when epoch_time is 0.""" + from mapillary_tools.telemetry import GPSPoint, GPSFix + + points = [ + GPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=0.0, + angle=None, + epoch_time=0.0, # Zero epoch time should trigger fallback + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + GPSPoint( + time=10.0, + lat=0.001, + lon=0.0, + alt=0.0, + angle=None, + epoch_time=0.0, # Zero epoch time should trigger fallback + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + ] + speed = geo.avg_speed(points) + # Should fall back to time field + self.assertAlmostEqual(speed, 11.1, delta=0.5) + + def test_avg_speed_with_camm_gps_points(self): + """Test avg_speed with CAMMGPSPoint using time_gps_epoch field.""" + from mapillary_tools.telemetry import CAMMGPSPoint + + # Video time is 0-10 seconds, but GPS epoch time spans 50 seconds + points = [ + CAMMGPSPoint( + time=0.0, # Video time + lat=0.0, + lon=0.0, + alt=0.0, + angle=None, + time_gps_epoch=2000.0, # GPS epoch time + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=1.0, + velocity_east=0.0, + velocity_north=10.0, + velocity_up=0.0, + speed_accuracy=0.5, + ), + CAMMGPSPoint( + time=10.0, # Video time + lat=0.005, + lon=0.0, + alt=0.0, + angle=None, + time_gps_epoch=2050.0, # GPS epoch time (50 sec elapsed) + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=1.0, + velocity_east=0.0, + velocity_north=10.0, + velocity_up=0.0, + speed_accuracy=0.5, + ), + ] + speed = geo.avg_speed(points) + # Distance is approximately 555 meters (0.005 degree of latitude) + # Using GPS epoch time: 2050 - 2000 = 50 seconds + # Speed should be approximately 11.1 m/s + self.assertAlmostEqual(speed, 11.1, delta=0.5) + + def test_avg_speed_with_camm_gps_points_zero_epoch_fallback(self): + """Test avg_speed with CAMMGPSPoint falls back when time_gps_epoch is 0.""" + from mapillary_tools.telemetry import CAMMGPSPoint + + points = [ + CAMMGPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=0.0, + angle=None, + time_gps_epoch=0.0, # Zero triggers fallback + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=1.0, + velocity_east=0.0, + velocity_north=10.0, + velocity_up=0.0, + speed_accuracy=0.5, + ), + CAMMGPSPoint( + time=10.0, + lat=0.001, + lon=0.0, + alt=0.0, + angle=None, + time_gps_epoch=0.0, # Zero triggers fallback + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=1.0, + velocity_east=0.0, + velocity_north=10.0, + velocity_up=0.0, + speed_accuracy=0.5, + ), + ] + speed = geo.avg_speed(points) + # Should fall back to time field + self.assertAlmostEqual(speed, 11.1, delta=0.5) + + def test_avg_speed_empty_sequence(self): + """Test avg_speed with empty sequence returns 0.""" + speed = geo.avg_speed([]) + self.assertEqual(speed, 0.0) + + def test_avg_speed_single_point(self): + """Test avg_speed with single point returns 0.""" + points = [Point(time=0.0, lat=0.0, lon=0.0, alt=0.0, angle=None)] + speed = geo.avg_speed(points) + self.assertEqual(speed, 0.0) + + def test_avg_speed_zero_time_diff_returns_nan(self): + """Test avg_speed returns NaN when time difference is zero.""" + import math + + # Two points at the same timestamp + points = [ + Point(time=100.0, lat=0.0, lon=0.0, alt=0.0, angle=None), + Point(time=100.0, lat=1.0, lon=1.0, alt=0.0, angle=None), + ] + speed = geo.avg_speed(points) + self.assertTrue(math.isnan(speed)) + + +class TestInterpolatePreservesPointType(unittest.TestCase): + """Test that interpolation preserves point types (GPSPoint, CAMMGPSPoint).""" + + def test_interpolate_gps_points_returns_gps_point(self): + """Test that interpolating GPSPoints returns a GPSPoint.""" + from mapillary_tools.telemetry import GPSPoint, GPSFix + + points = [ + GPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=100.0, + angle=0.0, + epoch_time=1000.0, + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + GPSPoint( + time=10.0, + lat=1.0, + lon=1.0, + alt=200.0, + angle=45.0, + epoch_time=1010.0, + fix=GPSFix.FIX_3D, + precision=2.0, + ground_speed=20.0, + ), + ] + + result = geo.interpolate(points, 5.0) + + # Check type is preserved + self.assertIsInstance(result, GPSPoint) + + # Check base fields are interpolated + self.assertEqual(result.time, 5.0) + self.assertAlmostEqual(result.lat, 0.5) + self.assertAlmostEqual(result.lon, 0.5) + self.assertAlmostEqual(result.alt, 150.0) + + # Check GPSPoint-specific fields are interpolated + self.assertAlmostEqual(result.epoch_time, 1005.0) + self.assertAlmostEqual(result.precision, 1.5) + self.assertAlmostEqual(result.ground_speed, 15.0) + self.assertEqual(result.fix, GPSFix.FIX_3D) # Fix is taken from start point + + def test_interpolate_gps_points_with_none_epoch_time(self): + """Test interpolating GPSPoints when epoch_time is None.""" + from mapillary_tools.telemetry import GPSPoint, GPSFix + + points = [ + GPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=100.0, + angle=0.0, + epoch_time=None, + fix=GPSFix.FIX_2D, + precision=None, + ground_speed=None, + ), + GPSPoint( + time=10.0, + lat=1.0, + lon=1.0, + alt=200.0, + angle=45.0, + epoch_time=None, + fix=GPSFix.FIX_2D, + precision=None, + ground_speed=None, + ), + ] + + result = geo.interpolate(points, 5.0) + + self.assertIsInstance(result, GPSPoint) + self.assertIsNone(result.epoch_time) + self.assertIsNone(result.precision) + self.assertIsNone(result.ground_speed) + + def test_interpolate_camm_gps_points_returns_camm_gps_point(self): + """Test that interpolating CAMMGPSPoints returns a CAMMGPSPoint.""" + from mapillary_tools.telemetry import CAMMGPSPoint + + points = [ + CAMMGPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=100.0, + angle=0.0, + time_gps_epoch=2000.0, + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=2.0, + velocity_east=10.0, + velocity_north=20.0, + velocity_up=5.0, + speed_accuracy=0.5, + ), + CAMMGPSPoint( + time=10.0, + lat=1.0, + lon=1.0, + alt=200.0, + angle=45.0, + time_gps_epoch=2010.0, + gps_fix_type=3, + horizontal_accuracy=3.0, + vertical_accuracy=4.0, + velocity_east=20.0, + velocity_north=30.0, + velocity_up=10.0, + speed_accuracy=1.0, + ), + ] + + result = geo.interpolate(points, 5.0) + + # Check type is preserved + self.assertIsInstance(result, CAMMGPSPoint) + + # Check base fields are interpolated + self.assertEqual(result.time, 5.0) + self.assertAlmostEqual(result.lat, 0.5) + self.assertAlmostEqual(result.lon, 0.5) + self.assertAlmostEqual(result.alt, 150.0) + + # Check CAMMGPSPoint-specific fields are interpolated + self.assertAlmostEqual(result.time_gps_epoch, 2005.0) + self.assertEqual(result.gps_fix_type, 3) # Taken from start point + self.assertAlmostEqual(result.horizontal_accuracy, 2.0) + self.assertAlmostEqual(result.vertical_accuracy, 3.0) + self.assertAlmostEqual(result.velocity_east, 15.0) + self.assertAlmostEqual(result.velocity_north, 25.0) + self.assertAlmostEqual(result.velocity_up, 7.5) + self.assertAlmostEqual(result.speed_accuracy, 0.75) + + def test_interpolate_base_points_returns_base_point(self): + """Test that interpolating base Points returns a Point.""" + points = [ + Point(time=0.0, lat=0.0, lon=0.0, alt=100.0, angle=0.0), + Point(time=10.0, lat=1.0, lon=1.0, alt=200.0, angle=45.0), + ] + + result = geo.interpolate(points, 5.0) + + # Should return base Point type + self.assertIsInstance(result, Point) + self.assertEqual(result.time, 5.0) + self.assertAlmostEqual(result.lat, 0.5) + self.assertAlmostEqual(result.lon, 0.5) + self.assertAlmostEqual(result.alt, 150.0) + + def test_interpolator_preserves_gps_point_type(self): + """Test that Interpolator preserves GPSPoint type.""" + from mapillary_tools.telemetry import GPSPoint, GPSFix + + track = [ + GPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=100.0, + angle=0.0, + epoch_time=1000.0, + fix=GPSFix.FIX_3D, + precision=1.0, + ground_speed=10.0, + ), + GPSPoint( + time=10.0, + lat=1.0, + lon=1.0, + alt=200.0, + angle=45.0, + epoch_time=1010.0, + fix=GPSFix.FIX_3D, + precision=2.0, + ground_speed=20.0, + ), + ] + + interpolator = geo.Interpolator([track]) + result = interpolator.interpolate(5.0) + + self.assertIsInstance(result, GPSPoint) + self.assertAlmostEqual(result.epoch_time, 1005.0) + + def test_interpolator_preserves_camm_gps_point_type(self): + """Test that Interpolator preserves CAMMGPSPoint type.""" + from mapillary_tools.telemetry import CAMMGPSPoint + + track = [ + CAMMGPSPoint( + time=0.0, + lat=0.0, + lon=0.0, + alt=100.0, + angle=0.0, + time_gps_epoch=2000.0, + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=2.0, + velocity_east=10.0, + velocity_north=20.0, + velocity_up=5.0, + speed_accuracy=0.5, + ), + CAMMGPSPoint( + time=10.0, + lat=1.0, + lon=1.0, + alt=200.0, + angle=45.0, + time_gps_epoch=2010.0, + gps_fix_type=3, + horizontal_accuracy=3.0, + vertical_accuracy=4.0, + velocity_east=20.0, + velocity_north=30.0, + velocity_up=10.0, + speed_accuracy=1.0, + ), + ] + + interpolator = geo.Interpolator([track]) + result = interpolator.interpolate(5.0) + + self.assertIsInstance(result, CAMMGPSPoint) + self.assertAlmostEqual(result.time_gps_epoch, 2005.0) + self.assertAlmostEqual(result.velocity_east, 15.0)