Skip to content

Docker: Video recorder/uploader listen on session events#3070

Merged
VietND96 merged 5 commits intotrunkfrom
video-event-driven
Feb 18, 2026
Merged

Docker: Video recorder/uploader listen on session events#3070
VietND96 merged 5 commits intotrunkfrom
video-event-driven

Conversation

@VietND96
Copy link
Member

@VietND96 VietND96 commented Feb 4, 2026

User description

Thanks for contributing to the Docker-Selenium project!
A PR well described will help maintainers to quickly review and merge it

Before submitting your PR, please check our contributing guidelines, applied for this repository.
Avoid large PRs, help reviewers by making them as simple and short as possible.

Description

Fixes #3067

Motivation and Context

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • I have read the contributing document.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have added tests to cover my changes.
  • All new and existing tests passed.

PR Type

Enhancement


Description

  • Implement event-driven video recording and uploading service

    • Unified service subscribes to Grid's ZeroMQ event bus for session lifecycle events
    • Replaces polling-based approach with reactive event handling
  • Add Python entry points for recorder and uploader with fallback support

    • video_recorder.py switches between event-driven and shell-based modes
    • video_uploader.py idles in event-driven mode, delegates to unified service
  • Introduce comprehensive session state management and internal async queue

    • Tracks session lifecycle, recording status, and failure events
    • Decouples recording from uploading via async task queue
  • Add support for selective upload based on session failure status

    • Configurable via SE_UPLOAD_FAILURE_SESSION_ONLY environment variable
  • Update Docker configuration and supervisor configs

    • Add pyzmq dependency to base image
    • Update recorder and uploader supervisor configs to use Python entry points
  • Include test docker-compose file for event-driven setup


Diagram Walkthrough

flowchart LR
  A["ZeroMQ Event Bus"] -->|session-created| B["VideoService"]
  A -->|session-closed| B
  A -->|session-event| B
  B -->|start ffmpeg| C["Recording"]
  B -->|stop ffmpeg| C
  C -->|queue task| D["Upload Queue"]
  D -->|process| E["Rclone Upload"]
  F["video_recorder.py"] -->|SE_EVENT_DRIVEN_SERVICES| B
  G["video_uploader.py"] -->|idles in event-driven| B
Loading

File Walkthrough

Relevant files
Enhancement
video_recorder.py
Video recorder entry point with mode detection                     

Video/video_recorder.py

  • New entry point script for video recording service
  • Detects event-driven mode via SE_EVENT_DRIVEN_SERVICES environment
    variable
  • Imports and runs unified VideoService in event-driven mode
  • Falls back to shell-based /opt/bin/video.sh in traditional mode
  • Includes error handling for missing dependencies with helpful messages
+39/-0   
video_service.py
Unified event-driven video recording and upload service   

Video/video_service.py

  • Comprehensive unified event-driven video service implementation (842
    lines)
  • Subscribes to ZeroMQ event bus for session-created, session-closed,
    and session-event events
  • Manages complete session lifecycle with state tracking (SessionState
    dataclass)
  • Handles ffmpeg recording with configurable codec, bitrate, and audio
    options
  • Implements async upload queue and batch processing with rclone
  • Supports selective upload based on session failure status via
    SE_UPLOAD_FAILURE_SESSION_ONLY
  • Resolves Node ID from Node /status endpoint for distributed Hub-Nodes
    filtering
  • Includes graceful shutdown with signal handling and resource cleanup
+842/-0 
video_uploader.py
Video uploader entry point with mode detection                     

Video/video_uploader.py

  • New entry point script for video upload service
  • Detects event-driven mode via SE_EVENT_DRIVEN_SERVICES environment
    variable
  • In event-driven mode, idles indefinitely (unified service handles
    uploads)
  • Falls back to shell-based /opt/bin/upload.sh in traditional mode
  • Maintains process alive for supervisor compatibility
+39/-0   
Dependencies
Dockerfile
Add pyzmq dependency to base image                                             

Base/Dockerfile

  • Add pyzmq to pip dependencies for ZeroMQ async support
  • Enables Python-based event-driven services to communicate with Grid
    event bus
+1/-1     
Configuration changes
recorder.conf
Update recorder supervisor config to use Python entry point

Video/recorder.conf

  • Update supervisor command from shell script to Python entry point
  • Changed from /opt/bin/video.sh to python3 /opt/bin/video_recorder.py
  • Maintains same autostart and autorestart behavior
+1/-1     
uploader.conf
Update uploader supervisor config to use Python entry point

Video/uploader.conf

  • Update supervisor command from shell script to Python entry point
  • Changed from /opt/bin/upload.sh to python3 /opt/bin/video_uploader.py
  • Maintains same autostart and autorestart behavior
+1/-1     
Tests
docker-compose-v3-event-driven-arm64.yml
Test docker-compose for event-driven video recording         

tests/docker-compose-v3-event-driven-arm64.yml

  • New test docker-compose file for event-driven video recording setup
  • Configures Chrome and Firefox nodes with SE_EVENT_DRIVEN_SERVICES=true
  • Includes optional environment variables for failure-only upload and
    rclone configuration
  • Sets up file browser service for video management
  • Demonstrates complete event-driven architecture with Hub and Nodes
+73/-0   

Signed-off-by: Viet Nguyen Duc <viet.dnguyen@katalon.com>
@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Feb 4, 2026

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Unauthenticated file browser

Description: The test compose provisions filebrowser/filebrowser with FB_NOAUTH=true, which would
expose any mounted videos over HTTP without authentication if this compose is used beyond
local testing (e.g., deployed on a shared network/host). docker-compose-v3-event-driven-arm64.yml [64-73]

Referred Code
file_browser:
  image: filebrowser/filebrowser:latest
  container_name: file_browser
  restart: always
  ports:
    - "8081:80"
  volumes:
    - /tmp/videos:/srv
  environment:
    - FB_NOAUTH=true
Ticket Compliance
🟡
🎫 #3067
🟢 Switch video recorder/uploader implementation from Bash polling (/status) to a Python
implementation that subscribes to the Grid Event Bus (ZeroMQ/pyzmq) for
SessionCreatedEvent and SessionClosedEvent to start/stop recording.
Support a client-driven “failure-only upload” flow where client code fires a conventional
session event and the uploader decides whether to upload based on that failure signal.
Keep the solution generic so users can implement sidecar services that react to test
lifecycle events in a Grid ecosystem.
Maintain backward compatibility by retaining the legacy Bash-based approach and allowing
switching via environment variables.
🔴 Provide/document a usage example (noted as TBU in the ticket).
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Swallowed exceptions: The display readiness loop suppresses exceptions without logging, which can hide root
causes and impede production debugging.

Referred Code
while not self.shutdown_event.is_set():
    try:
        proc = await asyncio.create_subprocess_exec(
            "xset",
            "b",
            "off",
            env=env,
            stdout=asyncio.subprocess.DEVNULL,
            stderr=asyncio.subprocess.DEVNULL,
        )
        await proc.wait()
        if proc.returncode == 0:
            logger.info(f"Display ready: {self.display}")
            return
    except Exception:
        pass
    await asyncio.sleep(2)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status:
Sensitive stderr logging: Upload failures log full rclone stderr output which can contain sensitive details (e.g.,
remote paths/bucket names or error payloads), and logs are unstructured plain text rather
than structured (e.g., JSON) for auditing.

Referred Code
# Configure logging
LOG_FORMAT = "%(asctime)s [video.service] - %(message)s"
LOG_DATEFMT = os.environ.get("SE_LOG_TIMESTAMP_FORMAT", "%Y-%m-%d %H:%M:%S,%f")[:-3]
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=LOG_DATEFMT)
logger = logging.getLogger(__name__)


class SessionClosedReason(Enum):
    """Reasons why a session was closed."""

    QUIT_COMMAND = "QUIT_COMMAND"
    TIMEOUT = "TIMEOUT"
    NODE_REMOVED = "NODE_REMOVED"
    NODE_RESTARTED = "NODE_RESTARTED"


class SessionStatus(Enum):
    """Session lifecycle status."""

    CREATED = auto()
    RECORDING = auto()


 ... (clipped 426 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing user context: Recording/upload actions are logged but the events do not include any user identifier
(only session_id/nodeId), so audit trails may be insufficient if a user identity is
required for compliance.

Referred Code
        logger.info(f"Started recording: session={session.session_id}, file={session.video_file}")
        return True
    except Exception as e:
        logger.error(f"Failed to start recording for {session.session_id}: {e}")
        session.status = SessionStatus.CREATED
        return False

async def stop_recording(self, session: SessionState) -> bool:
    """Stop ffmpeg recording for a session."""
    if session.ffmpeg_process is None:
        logger.warning(f"No recording in progress for session {session.session_id}")
        return False

    session.status = SessionStatus.STOPPING
    session.end_time = datetime.now()

    try:
        session.ffmpeg_process.terminate()
        try:
            await asyncio.wait_for(session.ffmpeg_process.wait(), timeout=10.0)
        except asyncio.TimeoutError:


 ... (clipped 63 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status:
Stack trace exposure: Fatal errors are logged with exc_info=True, which may expose internal stack traces to
non-secure stdout logs depending on deployment log aggregation/access controls.

Referred Code
    await service.run()
except KeyboardInterrupt:
    logger.info("Interrupted")
except Exception as e:
    logger.error(f"Fatal error: {e}", exc_info=True)
    sys.exit(1)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Unvalidated env inputs: Environment-provided values like SE_AUDIO_SOURCE and SE_UPLOAD_OPTS are split into
subprocess arguments without validation/allowlisting, which may be unsafe if these inputs
can be influenced by untrusted sources.

Referred Code
if self.record_audio and self.audio_source:
    cmd.extend(self.audio_source.split())

cmd.extend(
    [
        "-codec:v",
        self.codec,
        *self.preset.split(),
        "-tune",
        "zerolatency",
        "-crf",
        self.crf,
        "-maxrate",
        self.maxrate,
        "-bufsize",
        self.bufsize,
        "-pix_fmt",
        "yuv420p",
        "-movflags",
        "+faststart",
        video_path,


 ... (clipped 106 lines)

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Feb 4, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix incorrect secret validation logic

Fix the secret validation logic to prevent type mismatches during comparison.
Compare the raw secret string from the event bus with the environment variable
after stripping potential JSON quotes.

Video/video_service.py [691-698]

 # Validate secret
 if self.registration_secret:
-    try:
-        received = json.loads(secret)
-        if received != self.registration_secret:
-            continue
-    except json.JSONDecodeError:
+    # The secret from the event bus is a JSON-encoded string.
+    # For a simple string secret like "foo", it will be "\"foo\"".
+    # We need to compare the raw string values.
+    secret_from_bus = secret.strip('"')
+    if secret_from_bus != self.registration_secret:
         continue
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a bug in the secret validation logic where a type mismatch can occur, causing authentication to fail. This is a critical fix for the security feature to work as intended.

Medium
Use a semaphore for robust concurrency control

Refactor the upload_worker to use an asyncio.Semaphore for managing concurrency.
This provides a more robust way to limit the number of parallel uploads and
simplifies task management.

Video/video_service.py [499-535]

 async def upload_worker(self) -> None:
     """Background worker that processes upload queue."""
     logger.info("Upload worker started")
-    active_tasks: List[asyncio.Task] = []
+    semaphore = asyncio.Semaphore(self.upload_batch_size)
+    tasks = []
+
+    async def _process_with_semaphore(task: UploadTask):
+        async with semaphore:
+            await self.process_upload(task)
 
     try:
         while not self.shutdown_event.is_set() or not self.upload_queue.empty():
             try:
                 # Get task with timeout to check shutdown
-                try:
-                    task = await asyncio.wait_for(self.upload_queue.get(), timeout=1.0)
-                except asyncio.TimeoutError:
-                    continue
-
-                # Process upload (could run multiple in parallel up to batch_size)
-                upload_task = asyncio.create_task(self.process_upload(task))
-                active_tasks.append(upload_task)
-
-                # Clean up completed tasks
-                active_tasks = [t for t in active_tasks if not t.done()]
-
-                # Wait if we've hit batch limit
-                if len(active_tasks) >= self.upload_batch_size:
-                    done, pending = await asyncio.wait(active_tasks, return_when=asyncio.FIRST_COMPLETED)
-                    active_tasks = list(pending)
-
+                task = await asyncio.wait_for(self.upload_queue.get(), timeout=1.0)
+                upload_task = asyncio.create_task(_process_with_semaphore(task))
+                tasks.append(upload_task)
+            except asyncio.TimeoutError:
+                continue
             except Exception as e:
                 logger.error(f"Upload worker error: {e}")
 
         # Wait for remaining uploads
-        if active_tasks:
-            logger.info(f"Waiting for {len(active_tasks)} pending uploads...")
-            await asyncio.gather(*active_tasks, return_exceptions=True)
+        if tasks:
+            logger.info(f"Waiting for {len(tasks)} pending uploads...")
+            await asyncio.gather(*tasks, return_exceptions=True)
 
     finally:
         self.uploader_done.set()
         logger.info("Upload worker stopped")
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential issue with unbounded concurrency in the upload_worker and proposes a more robust solution using asyncio.Semaphore to strictly control the number of parallel uploads, improving resource management.

Medium
Guarantee active upload cleanup

Wrap the proc.communicate() call and subsequent removal of proc from
self.active_uploads in a try/finally block. This ensures the process is always
removed from the list, even if an error occurs.

Video/video_service.py [487-489]

 self.active_uploads.append(proc)
-stdout, stderr = await proc.communicate()
-self.active_uploads.remove(proc)
+try:
+    stdout, stderr = await proc.communicate()
+finally:
+    self.active_uploads.remove(proc)

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 6

__

Why: This suggestion correctly identifies a potential resource leak where a process might not be removed from self.active_uploads if an exception occurs. Using a try/finally block ensures cleanup happens, making the shutdown process more reliable.

Low
High-level
Consolidate to a single supervisor service

Refactor the deployment to use a single supervisor service for the event-driven
mode. This change would eliminate the unnecessary idling uploader process,
aligning the deployment with the unified service architecture.

Examples:

Video/video_uploader.py [20-32]
    if event_driven:
        print("Event-driven mode enabled.")
        print("Upload is handled by the unified video_service.py - this process will idle.")
        print("To disable this, set SE_EVENT_DRIVEN_SERVICES=false")

        # Keep process alive but idle (supervisord expects it to run)
        # The actual uploading is done by video_service.py
        try:
            while True:
                time.sleep(60)

 ... (clipped 3 lines)
Video/uploader.conf [3]
command=python3 /opt/bin/video_uploader.py

Solution Walkthrough:

Before:

// recorder.conf
[program:video-recording]
command=python3 /opt/bin/video_recorder.py
...

// uploader.conf
[program:video-upload]
command=python3 /opt/bin/video_uploader.py
...

// video_recorder.py
if event_driven:
    run_unified_video_service() // Handles recording and uploading
else:
    run_legacy_video_sh()

// video_uploader.py
if event_driven:
    while True: // Idles forever
        sleep(60)
else:
    run_legacy_upload_sh()

After:

// video.conf (single supervisor config)
[program:video-service]
command=python3 /opt/bin/video_entrypoint.py
...

// video_entrypoint.py
event_driven = os.environ.get("SE_EVENT_DRIVEN_SERVICES") == "true"

if event_driven:
    run_unified_video_service() // Handles both recording and uploading
else:
    // Start legacy services as separate background processes
    start_process("/opt/bin/video.sh")
    start_process("/opt/bin/upload.sh")
    // Keep main process alive to manage them
    wait_for_children()
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a design flaw where a redundant, idling process is used, which is inefficient and confusing; improving this would significantly clean up the architecture.

Medium
General
Add broad fallback for service errors

Add a broad except Exception block around the service_main() call. This ensures
the script falls back to the shell-based recorder on any runtime error, not just
ImportError.

Video/video_recorder.py [22-32]

 try:
     import asyncio
     from video_service import main as service_main
     asyncio.run(service_main())
 except ImportError as e:
     print(f"Failed to import video service: {e}")
     print("Ensure pyzmq is installed: pip install pyzmq")
     print("Falling back to shell-based recording...")
     subprocess.run(["/opt/bin/video.sh"], check=True)
+except Exception as e:
+    print(f"Error running video service: {e}")
+    print("Falling back to shell-based recording...")
+    subprocess.run(["/opt/bin/video.sh"], check=True)
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion improves the robustness of the script by adding a generic exception handler. This ensures that any unexpected error in the new event-driven service will trigger the fallback to the stable shell-based recorder, preventing a total failure of video recording.

Medium
Update deprecated asyncio usage and improve signal handling

Replace the deprecated asyncio.get_event_loop() with asyncio.get_running_loop().
Also, improve signal handling by removing handlers during cleanup to prevent
unexpected behavior.

Video/video_service.py [824-838]

 async def main():
     """Main entry point."""
     service = VideoService()
 
-    loop = asyncio.get_event_loop()
+    loop = asyncio.get_running_loop()
+    shutdown_handler = lambda: service.shutdown_event.set()
+
     for sig in (signal.SIGTERM, signal.SIGINT):
-        loop.add_signal_handler(sig, lambda: service.shutdown_event.set())
+        loop.add_signal_handler(sig, shutdown_handler)
 
     try:
         await service.run()
     except KeyboardInterrupt:
         logger.info("Interrupted")
     except Exception as e:
         logger.error(f"Fatal error: {e}", exc_info=True)
+        # Ensure cleanup is attempted on fatal error
+        if not service.shutdown_event.is_set():
+            service.shutdown_event.set()
+        await service.cleanup()
         sys.exit(1)
+    finally:
+        for sig in (signal.SIGTERM, signal.SIGINT):
+            loop.remove_signal_handler(sig)
  • Apply / Chat
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly points out the use of a deprecated function and proposes using asyncio.get_running_loop(). It also improves signal handling robustness by removing handlers in a finally block, which is a good practice.

Low
Learned
best practice
Pin dependency versions

Pin pyzmq (and ideally the other pip deps) to an explicit version to avoid
unexpected breakages from upstream releases.

Base/Dockerfile [195-200]

 RUN python3 -m venv $VENV_PATH \
-    && $VENV_PATH/bin/python3 -m pip install --upgrade pip psutil requests pyzmq \
+    && $VENV_PATH/bin/python3 -m pip install --upgrade pip psutil requests "pyzmq==26.2.0" \
     && wget -q https://github.com/Supervisor/supervisor/archive/refs/heads/main.zip -O /tmp/supervisor.zip \
     && unzip /tmp/supervisor.zip -d /tmp \
     && cd /tmp/supervisor-main \
     && $VENV_PATH/bin/python3 -m pip install . \
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why:
Relevant best practice - Pin versions for externally fetched/installed tooling (e.g., pip packages) to ensure reproducible builds.

Low
Avoid floating Docker image tags

Replace latest with a specific, known-good version tag (or immutable digest) so
test environments are stable.

tests/docker-compose-v3-event-driven-arm64.yml [64-65]

 file_browser:
-  image: filebrowser/filebrowser:latest
+  image: filebrowser/filebrowser:v2.27.0
  • Apply / Chat
Suggestion importance[1-10]: 5

__

Why:
Relevant best practice - Avoid using floating image tags like latest; pin Docker images to a specific version for reproducible deployments.

Low
  • Update

@VietND96 VietND96 force-pushed the video-event-driven branch 4 times, most recently from 3eafe9c to abc5320 Compare February 8, 2026 19:27
@VietND96 VietND96 merged commit b5550d3 into trunk Feb 18, 2026
29 checks passed
@VietND96 VietND96 deleted the video-event-driven branch February 18, 2026 06:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[🚀 Feature]: Refactor video recorder & uploader subscribe to session events

1 participant

Comments