From 3b32f9a32654a9e57b74b2227b22c4709230a6ec Mon Sep 17 00:00:00 2001 From: Bee Date: Mon, 9 Feb 2026 07:38:10 -0800 Subject: [PATCH] Adds ability to specify ice candidates the vision service WebRTC feed --- .vscode/settings.json | 4 +- src/basic_bot/commons/webrtc_server.py | 72 ++++++++++++++++++++++---- src/basic_bot/services/vision.py | 3 +- 3 files changed, 68 insertions(+), 11 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index a13d032..e928bf8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,12 +11,14 @@ "killall", "linenums", "Matlike", + "MJPEG", "Picamera", "pids", "pygments", "pymdownx", "pyyaml", "setuptools", - "superfences" + "superfences", + "webrtc" ] } \ No newline at end of file diff --git a/src/basic_bot/commons/webrtc_server.py b/src/basic_bot/commons/webrtc_server.py index 1d7f246..b229db2 100644 --- a/src/basic_bot/commons/webrtc_server.py +++ b/src/basic_bot/commons/webrtc_server.py @@ -5,9 +5,10 @@ import time from typing import Any, Optional import numpy as np +import uuid from aiohttp import web -from aiortc import RTCPeerConnection, RTCSessionDescription +from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate from aiortc.contrib.media import MediaRelay, MediaPlayer from aiortc.mediastreams import MediaStreamTrack from av import VideoFrame @@ -51,7 +52,7 @@ async def recv(self) -> VideoFrame: class WebrtcPeers: def __init__(self, camera: BaseCamera): - self.pcs: Any = set() + self.pcs: dict[str, RTCPeerConnection] = dict() self.relay = MediaRelay() self.camera = camera # Audio streaming variables @@ -117,7 +118,11 @@ def _initialize_audio(self) -> Optional[MediaStreamTrack]: # Initialize audio relay for sharing between multiple peers self.audio_relay = MediaRelay() log.info("Audio streaming initialized successfully") - return self.audio_relay.subscribe(self.microphone.audio) if self.microphone.audio else None + return ( + self.audio_relay.subscribe(self.microphone.audio) + if self.microphone.audio + else None + ) except Exception as e: log.error(f"Failed to initialize audio streaming: {e}") @@ -147,8 +152,8 @@ def _cleanup_audio(self) -> None: async def close_all_connections(self) -> None: # close peer connections log.info("Closing all webrtc peer connections") - coros = [pc.close() for pc in self.pcs] - await asyncio.gather(*coros) + promises = [pc.close() for pc in self.pcs.values()] + await asyncio.gather(*promises) self.pcs.clear() # cleanup audio resources @@ -157,12 +162,14 @@ async def close_all_connections(self) -> None: async def respond_to_offer(self, request: web.Request) -> web.Response: params = await request.json() + log.info(f"Received WebRTC offer from {request.remote}; {params}") offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) pc = RTCPeerConnection() - self.pcs.add(pc) + client_id = str(uuid.uuid4()) + self.pcs[client_id] = pc - log.info(f"Creating webrtc offer for {request.remote}") + log.info(f"Creating webrtc answer for {request.remote}") @pc.on("datachannel") def on_datachannel(channel: Any) -> None: @@ -176,7 +183,7 @@ async def on_connectionstatechange() -> None: log.info(f"Connection state is {pc.connectionState}") if pc.connectionState == "failed": await pc.close() - self.pcs.discard(pc) + self.pcs.pop(client_id, None) # Add video track pc.addTrack(CameraStreamTrack(self.camera)) @@ -199,6 +206,53 @@ async def on_connectionstatechange() -> None: return web.Response( content_type="application/json", text=json.dumps( - {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type} + { + "sdp": pc.localDescription.sdp, + "type": pc.localDescription.type, + "client_id": client_id, + } ), ) + + async def respond_to_ice_candidate(self, request: web.Request) -> web.Response: + params = await request.json() + client_id = params.get("client_id") + candidate = params.get("candidate") + log.info( + f"Received ICE candidate from {request.remote}; client_id={client_id}, candidate={candidate}" + ) + + pc = self.pcs.get(client_id) + if not pc: + log.error(f"No peer connection found for client_id={client_id}") + return web.Response(status=404, text="Peer connection not found") + + try: + await pc.addIceCandidate(self.create_RTCIceCandidate(candidate)) + log.debug(f"Added ICE candidate for client_id={client_id}") + return web.Response(status=200, text="ICE candidate added") + except Exception as e: + log.error(f"Error adding ICE candidate for client_id={client_id}: {e}") + return web.Response(status=500, text="Failed to add ICE candidate") + + def create_RTCIceCandidate(self, candidate: dict) -> RTCIceCandidate: + parts = candidate["candidate"].split(" ") + foundation = parts[0].split(":")[1] + component = int(parts[1]) + protocol = parts[2] + priority = int(parts[3]) + ip = parts[4] + port = int(parts[5]) + type = parts[7] + + return RTCIceCandidate( + component=component, + foundation=foundation, + ip=ip, + port=port, + priority=priority, + protocol=protocol, + type=type, + sdpMid=candidate["sdpMid"], + sdpMLineIndex=candidate["sdpMLineIndex"], + ) diff --git a/src/basic_bot/services/vision.py b/src/basic_bot/services/vision.py index b53a075..1be9263 100644 --- a/src/basic_bot/services/vision.py +++ b/src/basic_bot/services/vision.py @@ -326,8 +326,9 @@ def main() -> None: app.router.add_get("/record_video", record_video) app.router.add_get("/recorded_video", recorded_video) app.router.add_get("/recorded_video/{filename}", get_recorded_video_file) - # this is for handling WebRTC video handshake + # these are for handling WebRTC video handshake app.router.add_post("/offer", webrtc_peers.respond_to_offer) + app.router.add_post("/ice_candidate", webrtc_peers.respond_to_ice_candidate) # this is for MJPEG video streaming app.router.add_get("/video_feed", video_feed)