Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions mapillary_tools/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pynmea2

from . import geo
from . import telemetry
from .mp4 import simple_mp4_parser as sparser


Expand All @@ -31,7 +31,7 @@
class BlackVueInfo:
# None and [] are equivalent here. Use None as default because:
# ValueError: mutable default <class 'list'> for field gps is not allowed: use default_factory
gps: list[geo.Point] | None = None
gps: list[telemetry.GPSPoint] | None = None
make: str = "BlackVue"
model: str = ""

Expand All @@ -49,9 +49,11 @@ def extract_blackvue_info(fp: T.BinaryIO) -> BlackVueInfo | None:
points.sort(key=lambda p: p.time)

if points:
# Convert the time field to relative time to the first point
# epoch_time stays as the original time in seconds
first_point_time = points[0].time
for p in points:
p.time = (p.time - first_point_time) / 1000
p.time = p.time - first_point_time

# Camera model
try:
Expand Down Expand Up @@ -113,24 +115,24 @@ def _extract_camera_model_from_cprt(cprt_bytes: bytes) -> str:
return ""


def _parse_gps_box(gps_data: bytes) -> list[geo.Point]:
def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
"""
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
[Point(time=1623057074211, lat=51.150436666666664, lon=-114.03067833333333, alt=1097.36, angle=None)]
[GPSPoint(time=1623057074.211, lat=51.150436666666664, lon=-114.03067833333333, alt=1097.36, angle=None, epoch_time=1623057074.211, fix=<GPSFix.FIX_3D: 3>, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1629874404069]$GNGGA,175322.00,3244.53126,N,11710.97811,W,1,12,0.84,17.4,M,-34.0,M,,*45"))
[Point(time=1629874404069, lat=32.742187666666666, lon=-117.1829685, alt=17.4, angle=None)]
[GPSPoint(time=1629874404.069, lat=32.742187666666666, lon=-117.1829685, alt=17.4, angle=None, epoch_time=1629874404.069, fix=<GPSFix.FIX_3D: 3>, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1629874404069]$GNGLL,4404.14012,N,12118.85993,W,001037.00,A,A*67"))
[Point(time=1629874404069, lat=44.069002, lon=-121.31433216666667, alt=None, angle=None)]
[GPSPoint(time=1629874404.069, lat=44.069002, lon=-121.31433216666667, alt=None, angle=None, epoch_time=1629874404.069, fix=None, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1629874404069]$GNRMC,001031.00,A,4404.13993,N,12118.86023,W,0.146,,100117,,,A*7B"))
[Point(time=1629874404069, lat=44.06899883333333, lon=-121.31433716666666, alt=None, angle=None)]
[GPSPoint(time=1629874404.069, lat=44.06899883333333, lon=-121.31433716666666, alt=None, angle=None, epoch_time=1629874404.069, fix=None, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
[]
"""
points_by_sentence_type: dict[str, list[geo.Point]] = {}
points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {}

for line_bytes in gps_data.splitlines():
match = NMEA_LINE_REGEX.match(line_bytes)
Expand Down Expand Up @@ -160,24 +162,32 @@ def _parse_gps_box(gps_data: bytes) -> list[geo.Point]:
if message.sentence_type in ["GGA"]:
if not message.is_valid:
continue
point = geo.Point(
time=epoch_ms,
point = telemetry.GPSPoint(
time=epoch_ms / 1000,
lat=message.latitude,
lon=message.longitude,
alt=message.altitude,
angle=None,
epoch_time=epoch_ms / 1000,
fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None,
precision=None,
ground_speed=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)

elif message.sentence_type in ["RMC", "GLL"]:
if not message.is_valid:
continue
point = geo.Point(
time=epoch_ms,
point = telemetry.GPSPoint(
time=epoch_ms / 1000,
lat=message.latitude,
lon=message.longitude,
alt=None,
angle=None,
epoch_time=epoch_ms / 1000,
fix=None,
precision=None,
ground_speed=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)

Expand Down
12 changes: 9 additions & 3 deletions tests/cli/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@

import gpxpy
import gpxpy.gpx
from mapillary_tools import blackvue_parser, geo, utils
from mapillary_tools import blackvue_parser, telemetry, utils


def _convert_points_to_gpx_segment(
points: T.Sequence[geo.Point],
points: T.Sequence[telemetry.GPSPoint],
) -> gpxpy.gpx.GPXTrackSegment:
gpx_segment = gpxpy.gpx.GPXTrackSegment()
for point in points:
# Use epoch_time for the timestamp if available, otherwise fall back to time
timestamp = (
point.epoch_time
if (point.epoch_time is not None and point.epoch_time > 0)
else point.time
)
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
point.lat,
point.lon,
elevation=point.alt,
time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc),
time=datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc),
)
)
return gpx_segment
Expand Down
39 changes: 36 additions & 3 deletions tests/unit/test_blackvue_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io

import mapillary_tools.geo as geo
from mapillary_tools import blackvue_parser
from mapillary_tools import blackvue_parser, telemetry
from mapillary_tools.mp4 import construct_mp4_parser as cparser


Expand Down Expand Up @@ -41,10 +41,43 @@ def test_parse_points():
info = blackvue_parser.extract_blackvue_info(io.BytesIO(data))
assert info == blackvue_parser.BlackVueInfo(
gps=[
geo.Point(
time=0.0, lat=38.88615816666667, lon=-76.992434, alt=None, angle=None
telemetry.GPSPoint(
time=0.0,
lat=38.88615816666667,
lon=-76.992434,
alt=None,
angle=None,
epoch_time=1623057129.256,
fix=None,
precision=None,
ground_speed=None,
)
],
make="BlackVue",
model="",
)


def test_gpspoint_gga():
gps_data = b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
point = points[0]
assert point.time == 1623057074.211
assert point.lat == 51.150436666666664
assert point.lon == -114.03067833333333
assert point.epoch_time == 1623057074.211
assert point.fix == telemetry.GPSFix.FIX_3D


def test_gpspoint_gll():
gps_data = b"[1629874404069]$GNGLL,4404.14012,N,12118.85993,W,001037.00,A,A*67"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
point = points[0]
assert point.time == 1629874404.069
assert point.lat == 44.069002
assert point.lon == -121.31433216666667
assert point.epoch_time == 1629874404.069