Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
"killall",
"linenums",
"Matlike",
"MJPEG",
"Picamera",
"pids",
"pygments",
"pymdownx",
"pyyaml",
"setuptools",
"superfences"
"superfences",
"webrtc"
]
}
72 changes: 63 additions & 9 deletions src/basic_bot/commons/webrtc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import time
from typing import Any, Optional
import numpy as np
import uuid

from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate
from aiortc.contrib.media import MediaRelay, MediaPlayer
from aiortc.mediastreams import MediaStreamTrack
from av import VideoFrame
Expand Down Expand Up @@ -51,7 +52,7 @@ async def recv(self) -> VideoFrame:

class WebrtcPeers:
def __init__(self, camera: BaseCamera):
self.pcs: Any = set()
self.pcs: dict[str, RTCPeerConnection] = dict()
self.relay = MediaRelay()
self.camera = camera
# Audio streaming variables
Expand Down Expand Up @@ -117,7 +118,11 @@ def _initialize_audio(self) -> Optional[MediaStreamTrack]:
# Initialize audio relay for sharing between multiple peers
self.audio_relay = MediaRelay()
log.info("Audio streaming initialized successfully")
return self.audio_relay.subscribe(self.microphone.audio) if self.microphone.audio else None
return (
self.audio_relay.subscribe(self.microphone.audio)
if self.microphone.audio
else None
)

except Exception as e:
log.error(f"Failed to initialize audio streaming: {e}")
Expand Down Expand Up @@ -147,8 +152,8 @@ def _cleanup_audio(self) -> None:
async def close_all_connections(self) -> None:
# close peer connections
log.info("Closing all webrtc peer connections")
coros = [pc.close() for pc in self.pcs]
await asyncio.gather(*coros)
promises = [pc.close() for pc in self.pcs.values()]
await asyncio.gather(*promises)
self.pcs.clear()

# cleanup audio resources
Expand All @@ -157,12 +162,14 @@ async def close_all_connections(self) -> None:

async def respond_to_offer(self, request: web.Request) -> web.Response:
params = await request.json()
log.info(f"Received WebRTC offer from {request.remote}; {params}")
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

pc = RTCPeerConnection()
self.pcs.add(pc)
client_id = str(uuid.uuid4())
self.pcs[client_id] = pc

log.info(f"Creating webrtc offer for {request.remote}")
log.info(f"Creating webrtc answer for {request.remote}")

@pc.on("datachannel")
def on_datachannel(channel: Any) -> None:
Expand All @@ -176,7 +183,7 @@ async def on_connectionstatechange() -> None:
log.info(f"Connection state is {pc.connectionState}")
if pc.connectionState == "failed":
await pc.close()
self.pcs.discard(pc)
self.pcs.pop(client_id, None)

# Add video track
pc.addTrack(CameraStreamTrack(self.camera))
Expand All @@ -199,6 +206,53 @@ async def on_connectionstatechange() -> None:
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
{
"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type,
"client_id": client_id,
}
),
)

async def respond_to_ice_candidate(self, request: web.Request) -> web.Response:
params = await request.json()
client_id = params.get("client_id")
candidate = params.get("candidate")
log.info(
f"Received ICE candidate from {request.remote}; client_id={client_id}, candidate={candidate}"
)

pc = self.pcs.get(client_id)
if not pc:
log.error(f"No peer connection found for client_id={client_id}")
return web.Response(status=404, text="Peer connection not found")

try:
await pc.addIceCandidate(self.create_RTCIceCandidate(candidate))
log.debug(f"Added ICE candidate for client_id={client_id}")
return web.Response(status=200, text="ICE candidate added")
except Exception as e:
log.error(f"Error adding ICE candidate for client_id={client_id}: {e}")
return web.Response(status=500, text="Failed to add ICE candidate")

def create_RTCIceCandidate(self, candidate: dict) -> RTCIceCandidate:
parts = candidate["candidate"].split(" ")
foundation = parts[0].split(":")[1]
component = int(parts[1])
protocol = parts[2]
priority = int(parts[3])
ip = parts[4]
port = int(parts[5])
type = parts[7]

return RTCIceCandidate(
component=component,
foundation=foundation,
ip=ip,
port=port,
priority=priority,
protocol=protocol,
type=type,
sdpMid=candidate["sdpMid"],
sdpMLineIndex=candidate["sdpMLineIndex"],
)
3 changes: 2 additions & 1 deletion src/basic_bot/services/vision.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,9 @@ def main() -> None:
app.router.add_get("/record_video", record_video)
app.router.add_get("/recorded_video", recorded_video)
app.router.add_get("/recorded_video/{filename}", get_recorded_video_file)
# this is for handling WebRTC video handshake
# these are for handling WebRTC video handshake
app.router.add_post("/offer", webrtc_peers.respond_to_offer)
app.router.add_post("/ice_candidate", webrtc_peers.respond_to_ice_candidate)
# this is for MJPEG video streaming
app.router.add_get("/video_feed", video_feed)

Expand Down