diff --git a/CHANGELOG.md b/CHANGELOG.md index eca17ff..004252a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ See Git commit messages for full history. - Windows: improve error checking and messages for Win32 API calls (#448) - Mac: fix memory leak (#450, #453) - improve multithreading: allow multiple threads to use the same MSS object, allow multiple MSS objects to concurrently take screenshots, and document multithreading guarantees (#446, #452) +- Add full demos for different ways to use MSS (#444, #456) - :heart: contributors: @jholveck, @halldorfannar ## 10.1.0 (2025-08-16) diff --git a/demos/common/__init__.py b/demos/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demos/common/pipeline.py b/demos/common/pipeline.py new file mode 100644 index 0000000..6ae9d67 --- /dev/null +++ b/demos/common/pipeline.py @@ -0,0 +1,300 @@ +from __future__ import annotations + +import contextlib +import itertools +from collections.abc import Callable, Generator, Iterable, Iterator +from threading import Condition, Lock, Thread +from typing import Generic, TypeVar, overload + +T = TypeVar("T") +U = TypeVar("U") + + +class MailboxShutDown(Exception): # noqa: N818 (An exception, but not an error) + """Exception to indicate that a Mailbox has been shut down. + + This will be raised if Mailbox.get() or Mailbox.put() is run on a + mailbox after its .shutdown() method has been called, or if it is + called while waiting. + """ + + def __init__(self, mailbox: Mailbox) -> None: + #: The mailbox that was shut down + self.mailbox = mailbox + + def __str__(self) -> str: + return f"Mailbox shut down: {self.mailbox}" + + +class Mailbox(Generic[T]): + """Thread-safe container to pass a single object at a time between threads. + + A Mailbox can be shut down to indicate that it is no longer + available. This can be used by a producer to indicate that no + more items will be forthcoming, or by a consumer to indicate that + it is no longer able to accept more objects. + + In Python 3.13, this has the same basic functionality as + queue.Queue(1). Prior to 3.13, there was no + queue.Queue.shutdown() method. The mechanisms for using mailboxes + as iterables, or adding items from iterables, are also not part of + queue.Queue in any version of Python. + """ + + def __init__(self) -> None: + #: Lock to protect mailbox state + self.lock = Lock() + self._condition = Condition(lock=self.lock) + #: Indicates whether an item is present in the mailbox + self.has_item = False + self._item: T | None = None + #: Indicates whether the mailbox has been shut down + self.is_shutdown = False + + def get(self) -> T: + """Return and remove the item being held by the mailbox. + + If an item is not presently available, block until another + thread calls .put(). + """ + with self._condition: + while True: + # We test to see if an item is present before testing if the queue is shut down. This is so that a + # non-immediate shutdown allows the mailbox to be drained. + if self.has_item: + rv = self._item + self._item = None # Don't hold an unnecessary reference + self.has_item = False + self._condition.notify_all() + return rv # type:ignore[return-value] + if self.is_shutdown: + raise MailboxShutDown(self) + self._condition.wait() + + def get_many(self) -> Iterable[T]: + """Yield items as they appear in the mailbox. + + The iterator exits the mailbox is shut down; MailboxShutDown + is not raised into the caller. + """ + return iter(self) + + def put(self, item: T) -> None: + """Store an item in the mailbox. + + If an item is already in the mailbox, block until another + thread calls .get(). + """ + with self._condition: + while True: + if self.is_shutdown: + raise MailboxShutDown(self) + if not self.has_item: + self._item = item + self.has_item = True + self._condition.notify() + return + self._condition.wait() + + def put_many(self, items: Iterable[T]) -> Iterator[T]: + """Put the elements of iterable in the mailbox, one at a time. + + If the mailbox is shut down before all the elements can be put + into it, a MailboxShutDown exception is _not_ raised. + + Returns an iterator containing any remaining items, including + the one that was being processed when the mailbox was shut + down. The first item (if any) of this iterator can be + immediately accessed with next; subsequent items defer to the + input iterable, so may block. + """ + iterator = iter(items) + for item in iterator: + # We put this try/except inside the for loop, to make sure we don't accidentally filter out an exception + # that escaped the items iterator. + try: + self.put(item) + except MailboxShutDown: + return itertools.chain([item], iterator) + # Remove references to the value once it's not needed. This lets objects with advanced buffer semantics + # reclaim the object's memory immediately, without waiting for the next iteration of the iterable. + del item + return iter([]) + + def shutdown(self, *, immediate: bool = False) -> None: + """Shut down the mailbox, marking it as unavailable for future use. + + Any callers currently blocked in .get or .put, or any future + caller to those methods, will recieve a MailboxShutDown + exception. Callers using .get_many or iterating over the + mailbox will see the iteration end. Callers to .put_many will + stop adding items. + + If immediate is False (the default), and an item is currently + in the mailbox, it will be returned by the next call to + .get(), and the one after that will raise MailboxShutDown. + + It is safe to call this method multiple times, including to + promote a non-immediate shutdown to an immediate one. + """ + with self._condition: + # We don't actually need to check whether we've been called already. + self.is_shutdown = True + if immediate: + self._item = None + self.has_item = False + self._condition.notify_all() + + def __iter__(self) -> Iterator[T]: + """Yield items as they appear in the mailbox. + + The iterator exits when the mailbox is shut down; + MailboxShutDown is not raised into the caller. + """ + with contextlib.suppress(MailboxShutDown): + while True: + yield self.get() + + +class PipelineStage(Thread, Generic[T, U]): + """A stage of a multi-threaded pipeline. + + The target function will be called once, and should yield one + value for each element. + + If an in_mailbox is provided, the function will get an iterable of + its successive elements. If an out_mailbox is provided, it will + be supplied with the successive outputs of the target function. + + If the either mailbox is shut down, the target function's loop + will stop being called. Both mailboxes will be shut down when the + target function ends. + + Note to readers adapting this class to their own programs: + + This is designed for linear pipelines: it is not meant to support + fan-in (multiple stages feeding one mailbox) or fan-out (one + mailbox feeding multiple stages). The shutdown semantics of these + sorts of pipelines will depend heavily on what it's used for, and + this demo only needs a simple pipeline. + """ + + # Source stage + @overload + def __init__( + self, + target: Callable[[], Generator[U]], + *, + out_mailbox: Mailbox[U], + name: str | None = None, + ) -> None: ... + + # Transformer stage + @overload + def __init__( + self, + target: Callable[[Iterable[T]], Generator[U]], + *, + in_mailbox: Mailbox[T], + out_mailbox: Mailbox[U], + name: str | None = None, + ) -> None: ... + + # Sink stage + @overload + def __init__( + self, + target: Callable[[Iterable[T]], None], + *, + in_mailbox: Mailbox[T], + name: str | None = None, + ) -> None: ... + + def __init__( + self, + target: Callable[[], Generator[U]] | Callable[[Iterable[T]], Generator[U]] | Callable[[Iterable[T]], None], + *, + in_mailbox: Mailbox[T] | None = None, + out_mailbox: Mailbox[U] | None = None, + name: str | None = None, + ) -> None: + """Initialize the PipelineStage. + + Either :param:`in_mailbox` or :param:`out_mailbox` is + required. Otherwise, it would be a pipeline stage that can't + connect to anything else. (You can always use + :class:`threading.Thread` directly if you need that behavior.) + + :param target: Function to run during the stage. This will be + called once, in a separate thread. This should take one + argument if :param:`in_mailbox` is provided, or no + arguments otherwise. If you want additional arguments + (such as configuration), use :func:`functools.partial`. + :param in_mailbox: An optional :class:`Mailbox` to provide + inputs to the target function. The target function will + be called with one argument, an iterable that you can use + in a for loop or similar construct, to get the successive + values. + :param out_mailbox: An optional :class:`Mailbox` to receive + outputs from the target function. If this is provided, + the target function must be a generator (a function that + uses ``yield`` instead of ``return``). The successive + outputs from the function will be placed in + :param:`out_mailbox`. + :param name: An optional name for debugging purposes; see + :attr:`threading.Thread.name`. + """ + if in_mailbox is None and out_mailbox is None: + msg = "Cannot have a pipeline stage with neither inputs nor outputs" + raise ValueError(msg) + self.in_mailbox = in_mailbox + self.out_mailbox = out_mailbox + self.target = target + #: The exception (if any) raised by the target function + self.exc: Exception | None = None + super().__init__(name=name, daemon=True) + + def run(self) -> None: + """Execute the pipeline stage. + + This should not be run directly. Instead, use the start() + method (inherited from threading.Thread) to run this in a + background thread. + + This will run the target function, managing input and output + mailboxes. When the stage completes, whether normally or with + an error, the mailboxes will be shut down. + """ + try: + if self.out_mailbox is None: + # This is a sink function, the easiest to deal with. Since a mailbox is iterable, we can just pass it + # to the target function. + assert self.in_mailbox is not None # noqa: S101 + self.target(self.in_mailbox) # type:ignore[call-arg] + return + # This is a source or transformation function. + out_iterable = self.target() if self.in_mailbox is None else self.target(self.in_mailbox) # type:ignore[call-arg] + if not isinstance(out_iterable, Generator): + msg = ( + "Pipeline target function was expected to be a generator; " + f"instead, it returned a {type(out_iterable)}." + ) + raise TypeError(msg) # noqa: TRY301 + # Once a generator is closed, the yield call (where they block when they send an object downstream) will + # raise GeneratorExit. That lets finally: blocks, with: exits, etc. run. This happens automatically when + # out_iterable is garbage-collected. We still close it explicitly to so it gets the GeneratorExit, in case + # something (like an exception object) is holding a reference to out_iterable. + with contextlib.closing(out_iterable): + self.out_mailbox.put_many(out_iterable) + except Exception as e: + # We store the exception, so that our caller can choose what to do about it after they call join. + self.exc = e + raise + finally: + if self.in_mailbox is not None: + self.in_mailbox.shutdown() + if self.out_mailbox is not None: + self.out_mailbox.shutdown() + + def __str__(self) -> str: + return f"" diff --git a/demos/tinytv-stream.py b/demos/tinytv-stream.py index dcbcb18..a399389 100755 --- a/demos/tinytv-stream.py +++ b/demos/tinytv-stream.py @@ -132,19 +132,15 @@ from __future__ import annotations import argparse -import contextlib import functools import io -import itertools import logging import os import re import sys import time from collections import deque -from collections.abc import Generator, Iterable, Iterator -from threading import Condition, Lock, Thread -from typing import TYPE_CHECKING, Generic, Literal, TypeVar, overload +from typing import TYPE_CHECKING, Literal import serial from PIL import Image, ImageOps @@ -153,8 +149,10 @@ import mss +from common.pipeline import Mailbox, PipelineStage + if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Generator, Iterable # The keys in this are substrings in the tvType query. Make sure that they're all distinct: having both "TinyTV2" and # "TinyTV2.1" in here would mean that a 2.1 might be misidentified as a 2. We use substrings instead of parsing the @@ -191,302 +189,9 @@ DEFAULT_JPEG_QUALITY = 75 -T = TypeVar("T") -U = TypeVar("U") - LOGGER = logging.getLogger("tinytv-stream") -class MailboxShutDown(Exception): # noqa: N818 (An exception, but not an error) - """Exception to indicate that a Mailbox has been shut down. - - This will be raised if Mailbox.get() or Mailbox.put() is run on a - mailbox after its .shutdown() method has been called, or if it is - called while waiting. - """ - - def __init__(self, mailbox: Mailbox) -> None: - #: The mailbox that was shut down - self.mailbox = mailbox - - def __str__(self) -> str: - return f"Mailbox shut down: {self.mailbox}" - - -class Mailbox(Generic[T]): - """Thread-safe container to pass a single object at a time between threads. - - A Mailbox can be shut down to indicate that it is no longer - available. This can be used by a producer to indicate that no - more items will be forthcoming, or by a consumer to indicate that - it is no longer able to accept more objects. - - In Python 3.13, this has the same basic functionality as - queue.Queue(1). Prior to 3.13, there was no - queue.Queue.shutdown() method. The mechanisms for using mailboxes - as iterables, or adding items from iterables, are also not part of - queue.Queue in any version of Python. - """ - - def __init__(self) -> None: - #: Lock to protect mailbox state - self.lock = Lock() - self._condition = Condition(lock=self.lock) - #: Indicates whether an item is present in the mailbox - self.has_item = False - self._item: T | None = None - #: Indicates whether the mailbox has been shut down - self.is_shutdown = False - - def get(self) -> T: - """Return and remove the item being held by the mailbox. - - If an item is not presently available, block until another - thread calls .put(). - """ - with self._condition: - while True: - # We test to see if an item is present before testing if the queue is shut down. This is so that a - # non-immediate shutdown allows the mailbox to be drained. - if self.has_item: - rv = self._item - self._item = None # Don't hold an unnecessary reference - self.has_item = False - self._condition.notify_all() - return rv # type:ignore[return-value] - if self.is_shutdown: - raise MailboxShutDown(self) - self._condition.wait() - - def get_many(self) -> Iterable[T]: - """Yield items as they appear in the mailbox. - - The iterator exits the mailbox is shut down; MailboxShutDown - is not raised into the caller. - """ - return iter(self) - - def put(self, item: T) -> None: - """Store an item in the mailbox. - - If an item is already in the mailbox, block until another - thread calls .get(). - """ - with self._condition: - while True: - if self.is_shutdown: - raise MailboxShutDown(self) - if not self.has_item: - self._item = item - self.has_item = True - self._condition.notify() - return - self._condition.wait() - - def put_many(self, items: Iterable[T]) -> Iterator[T]: - """Put the elements of iterable in the mailbox, one at a time. - - If the mailbox is shut down before all the elements can be put - into it, a MailboxShutDown exception is _not_ raised. - - Returns an iterator containing any remaining items, including - the one that was being processed when the mailbox was shut - down. The first item (if any) of this iterator can be - immediately accessed with next; subsequent items defer to the - input iterable, so may block. - """ - iterator = iter(items) - for item in iterator: - # We put this try/except inside the for loop, to make sure we don't accidentally filter out an exception - # that escaped the items iterator. - try: - self.put(item) - except MailboxShutDown: - return itertools.chain([item], iterator) - # Remove references to the value once it's not needed. This lets objects with advanced buffer semantics - # reclaim the object's memory immediately, without waiting for the next iteration of the iterable. - del item - return iter([]) - - def shutdown(self, *, immediate: bool = False) -> None: - """Shut down the mailbox, marking it as unavailable for future use. - - Any callers currently blocked in .get or .put, or any future - caller to those methods, will recieve a MailboxShutDown - exception. Callers using .get_many or iterating over the - mailbox will see the iteration end. Callers to .put_many will - stop adding items. - - If immediate is False (the default), and an item is currently - in the mailbox, it will be returned by the next call to - .get(), and the one after that will raise MailboxShutDown. - - It is safe to call this method multiple times, including to - promote a non-immediate shutdown to an immediate one. - """ - with self._condition: - # We don't actually need to check whether we've been called already. - self.is_shutdown = True - if immediate: - self._item = None - self.has_item = False - self._condition.notify_all() - - def __iter__(self) -> Iterator[T]: - """Yield items as they appear in the mailbox. - - The iterator exits when the mailbox is shut down; - MailboxShutDown is not raised into the caller. - """ - with contextlib.suppress(MailboxShutDown): - while True: - yield self.get() - - -class PipelineStage(Thread, Generic[T, U]): - """A stage of a multi-threaded pipeline. - - The target function will be called once, and should yield one - value for each element. - - If an in_mailbox is provided, the function will get an iterable of - its successive elements. If an out_mailbox is provided, it will - be supplied with the successive outputs of the target function. - - If the either mailbox is shut down, the target function's loop - will stop being called. Both mailboxes will be shut down when the - target function ends. - - Note to readers adapting this class to their own programs: - - This is designed for linear pipelines: it is not meant to support - fan-in (multiple stages feeding one mailbox) or fan-out (one - mailbox feeding multiple stages). The shutdown semantics of these - sorts of pipelines will depend heavily on what it's used for, and - this demo only needs a simple pipeline. - """ - - # Source stage - @overload - def __init__( - self, - target: Callable[[], Generator[U]], - *, - out_mailbox: Mailbox[U], - name: str | None = None, - ) -> None: ... - - # Transformer stage - @overload - def __init__( - self, - target: Callable[[Iterable[T]], Generator[U]], - *, - in_mailbox: Mailbox[T], - out_mailbox: Mailbox[U], - name: str | None = None, - ) -> None: ... - - # Sink stage - @overload - def __init__( - self, - target: Callable[[Iterable[T]], None], - *, - in_mailbox: Mailbox[T], - name: str | None = None, - ) -> None: ... - - def __init__( - self, - target: Callable[[], Generator[U]] | Callable[[Iterable[T]], Generator[U]] | Callable[[Iterable[T]], None], - *, - in_mailbox: Mailbox[T] | None = None, - out_mailbox: Mailbox[U] | None = None, - name: str | None = None, - ) -> None: - """Initialize the PipelineStage. - - Either :param:`in_mailbox` or :param:`out_mailbox` is - required. Otherwise, it would be a pipeline stage that can't - connect to anything else. (You can always use - :class:`threading.Thread` directly if you need that behavior.) - - :param target: Function to run during the stage. This will be - called once, in a separate thread. This should take one - argument if :param:`in_mailbox` is provided, or no - arguments otherwise. If you want additional arguments - (such as configuration), use :func:`functools.partial`. - :param in_mailbox: An optional :class:`Mailbox` to provide - inputs to the target function. The target function will - be called with one argument, an iterable that you can use - in a for loop or similar construct, to get the successive - values. - :param out_mailbox: An optional :class:`Mailbox` to receive - outputs from the target function. If this is provided, - the target function must be a generator (a function that - uses ``yield`` instead of ``return``). The successive - outputs from the function will be placed in - :param:`out_mailbox`. - :param name: An optional name for debugging purposes; see - :attr:`threading.Thread.name`. - """ - if in_mailbox is None and out_mailbox is None: - msg = "Cannot have a pipeline stage with neither inputs nor outputs" - raise ValueError(msg) - self.in_mailbox = in_mailbox - self.out_mailbox = out_mailbox - self.target = target - #: The exception (if any) raised by the target function - self.exc: Exception | None = None - super().__init__(name=name, daemon=True) - - def run(self) -> None: - """Execute the pipeline stage. - - This should not be run directly. Instead, use the start() - method (inherited from threading.Thread) to run this in a - background thread. - - This will run the target function, managing input and output - mailboxes. When the stage completes, whether normally or with - an error, the mailboxes will be shut down. - """ - try: - if self.out_mailbox is None: - # This is a sink function, the easiest to deal with. Since a mailbox is iterable, we can just pass it - # to the target function. - assert self.in_mailbox is not None # noqa: S101 - self.target(self.in_mailbox) # type:ignore[call-arg] - return - # This is a source or transformation function. - out_iterable = self.target() if self.in_mailbox is None else self.target(self.in_mailbox) # type:ignore[call-arg] - if not isinstance(out_iterable, Generator): - msg = ( - "Pipeline target function was expected to be a generator; " - f"instead, it returned a {type(out_iterable)}." - ) - raise TypeError(msg) # noqa: TRY301 - # Once a generator is closed, the yield call (where they block when they send an object downstream) will - # raise GeneratorExit. That lets finally: blocks, with: exits, etc. run. This happens automatically when - # out_iterable is garbage-collected. We still close it explicitly to so it gets the GeneratorExit, in case - # something (like an exception object) is holding a reference to out_iterable. - with contextlib.closing(out_iterable): - self.out_mailbox.put_many(out_iterable) - except Exception as e: - # We store the exception, so that our caller can choose what to do about it after they call join. - self.exc = e - raise - finally: - if self.in_mailbox is not None: - self.in_mailbox.shutdown() - if self.out_mailbox is not None: - self.out_mailbox.shutdown() - - def __str__(self) -> str: - return f"" - - def list_devices() -> None: """Display all USB serial ports in a formatted table.""" ports = list(list_ports.comports()) diff --git a/demos/video-capture-simple.py b/demos/video-capture-simple.py new file mode 100755 index 0000000..086dbaa --- /dev/null +++ b/demos/video-capture-simple.py @@ -0,0 +1,170 @@ +#! /usr/bin/env python3 + +# A lot of people want to use MSS to record a video of the screen. Doing it really well can be difficult - there's a +# reason OBS is such a significant program - but the basics are surprisingly easy! +# +# There's a more advanced example, video-capture.py, that has more features, and better performance. But this simple +# demo is easier to understand, because it does everything in a straightforward way, without any complicated features. +# +# Here, we're going to record the screen for 10 seconds, and save the result in capture.mp4, as an H.264 video stream. +# +# Sometimes, in film, cameramen will "undercrank", filming the action at a slower frame rate than how it will +# eventually be projected. In that case, motion appears artificially sped up, either for comedy (like the Benny Hill +# TV show), or for fast and frenetic action (like Mad Max: Fury Road). +# +# In this demo, we put in the file a marker saying that it's at 30 fps. But since this is a simple demo, your +# computer might not be able to keep up with writing video frames at that speed. In that case, you'll see the same +# effect: sped-up motion. +# +# The full demo has several techniques to mitigate that. First, it uses pipelined threads to let the video encoder +# use a full CPU core (often more, internally), rather than having to share a CPU core with all the other tasks. +# Second, it puts a timestamp marker on each frame saying exactly when it's supposed to be shown, rather than just +# saying to show all the frames at 30 fps. +# +# For this simple demo, though, we just record the frames and add them to the file one at a time. +# +# We use three libraries that don't come with Python: Pillow, PyAV, and (of course) MSS. You'll need to install those +# with "pip install pillow av mss". Normally, you'll want to install these into a venv; if you don't know about +# those, there are lots of great tutorials online. + +import logging +import time + +# Install the necessary libraries with "pip install av mss pillow". +import av +from PIL import Image + +import mss + +# These are the options you'd give to ffmpeg that would affect the way the video is encoded. There are comments in +# the full demo that go into more detail. +CODEC_OPTIONS = { + "profile": "high", + "preset": "medium", + "b": "6M", + "rc-lookahead": "40", +} + +# We'll try to capture at 30 fps, if the system can keep up with it (typically, that's possible at 1080p, but not at +# 4k). Regardless of what the system can keep up with, we'll mark the file as being at 30 fps. +FPS = 30 + +# The program will exit after 10 seconds of recording. +CAPTURE_SECONDS = 10 + +# Within an MP4 file, the video can be stored in a lot of different formats. In this demo, we use H.264, since it's +# the most widely supported. +# +# In ffmpeg, and the av libraries that we use here, the best codec for H.264 that doesn't require any specific +# hardware is libx264. There are faster ones that are hardware-accelerated, such as h264_nvenc which uses specialized +# chips on Nvidia video cards. +CODEC = "libx264" + +FILENAME = "capture.mp4" + + +def main() -> None: + logging.basicConfig(level=logging.DEBUG) + # If we don't enable PyAV's own logging, a lot of important error messages from libav won't be shown. + av.logging.set_level(av.logging.VERBOSE) + + with mss.mss() as sct: + monitor = sct.monitors[1] + + with av.open(FILENAME, "w") as avmux: + # The "avmux" object we get back from "av.open" represents the MP4 file. That's a container that holds + # the video, as well as possibly audio and more. These are each called "streams". We only create one + # stream here, since we're just recording video. + video_stream = avmux.add_stream(CODEC, rate=FPS, options=CODEC_OPTIONS) + video_stream.width = monitor["width"] + video_stream.height = monitor["height"] + # There are more options you can set on the video stream; the full demo uses some of those. + + # Count how many frames we're capturing, so we can log the FPS later. + frame_count = 0 + + # Mark the times when we start and end the recording. + capture_start_time = time.monotonic() + capture_end_time = capture_start_time + CAPTURE_SECONDS + + # MSS can capture very fast, and libav can encode very fast, depending on your hardware and screen size. + # We don't want to capture faster than 30 fps (or whatever you set FPS to). To slow down to our desired + # rate, we keep a variable "next_frame_time" to track when it's time to track the next frame. + # + # Some programs will just sleep for 1/30 sec in each loop. But by tracking the time when we want to + # capture the next frame, instead of always sleeping for 1/30 sec, the time that is spent doing the + # capture and encode (which can be substantial) is counted as part of the total time we need to delay. + next_frame_time = capture_start_time + + print("Capturing to", FILENAME, "for", CAPTURE_SECONDS, "seconds") + while True: + # Wait until we reach the time for the next frame. + while (now := time.monotonic()) < next_frame_time: + time.sleep(next_frame_time - now) + + # Try to capture the next frame 1/30 sec after our target time for this frame. We update this based + # on the target time instead of the actual time so that, if we were a little slow capturing this + # frame, we'll be a little fast capturing the next one, and even things out. (There's a slightly + # better, but more complex, way to update next_frame_time in the full demo.) + next_frame_time = next_frame_time + 1 / FPS + + # See if we've finished the requested capture duration. + if now > capture_end_time: + break + + # Print dots for each frame, so you know it's not frozen. + print(".", end="", flush=True) + + # Grab a screenshot. + screenshot = sct.grab(monitor) + frame_count += 1 + + # There are a few ways to get the screenshot into a VideoFrame. The highest-performance way isn't + # hard, and is shown in the full demo: search for from_numpy_buffer. But the most obvious way is to + # use PIL: you can create an Image from the screenshot, and create a VideoFrame from that. That said, + # if you want to boost the fps rate by about 50%, check out the full demo, and search for + # from_numpy_buffer. + img = Image.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX") + frame = av.VideoFrame.from_image(img) + + # When we encode frames, we get back a list of packets. Often, we'll get no packets at first: the + # video encoder wants to wait and see the motion before it decides how it wants to encode the frames. + # Later, once it's decided about the earlier frames, we'll start getting those packets, while it's + # holding on to later frames. + # + # You can imagine that the encoder is a factory. You're providing it frames, one at a time, each as a + # box of raw materials. It cranks out packets as its finished product. But there's some delay while + # it's working. You can imagine these on a conveyor belt moving left to right as time progresses: + # + # FRAMES ENCODER PACKETS + # [1]________-> (Factory) ->____________ + # [3]_[2]_[1]-> (Factory) ->____________ + # [6]_[5]_[4]-> (Factory) ->{1}_________ + # [8]_[7]_[6]-> (Factory) ->{3}_{2}_{1}_ + # + # Sometimes, when you send in a frame, you'll get no packets, sometimes you'll get one, and sometimes + # you'll get a batch of several. It depends on how the encoder works. + # + # The point is, the packets you're getting back from this call are whatever the encoder is ready to + # give you, not necessarily the packets related to the frame you're handing it right now. + packets = video_stream.encode(frame) + + # As we said, the MP4 file is a bunch of packets from possibly many streams, all woven (or "muxed") + # together. So the ultimate destination of the data is to send it to the MP4 file, avmux. + avmux.mux(packets) + + # Print an empty line to end our line of dots. + print() + + # Earlier, we mentioned that the encoder might hold onto some frames, while it decides how to encode them + # based on future frames. Now that we're done sending it frames, we need to get the packets for any + # frames it's still holding onto. This is referred to as "flushing" the stream. We do this by sending + # None instead of a frame object. + packets = video_stream.encode(None) + avmux.mux(packets) + + print(f"Capture complete: {frame_count / CAPTURE_SECONDS:.1f} fps") + + +if __name__ == "__main__": + main() diff --git a/demos/video-capture.py b/demos/video-capture.py new file mode 100755 index 0000000..18ff9f0 --- /dev/null +++ b/demos/video-capture.py @@ -0,0 +1,626 @@ +#! /usr/bin/env python3 + +# This demo shows one common use case for MSS: capture the screen and write a real video file (MP4) rather than saving +# individual images. +# +# It's intentionally not a full "video encoding" course. The goal is to explain the few concepts that show up +# throughout the program so you can read, tweak, and extend it. +# +# What tools are we using? +# ------------------------ +# +# You'll need a few libraries that don't come with Python: PyAV, NumPy, SI-Prefix, and (of course) MSS. You'll need +# to install those with "pip install av mss numpy si-prefix". Normally, you'll want to install these into a venv; if +# you don't know about those, there are lots of great tutorials online. The most critical one we use is PyAV. +# +# Most people first meet video encoding through the `ffmpeg` command. Under the hood, ffmpeg is built on the "libav*" +# C libraries. In this demo we use PyAV (`import av`), which is a Pythonic wrapper around those libraries. +# +# PyAV docs: +# Note: the older docs at pyav.org are outdated; see +# . +# Caveats: +# +# Containers, streams, and codecs +# ------------------------------- +# +# A file like `capture.mp4` is a *container*: it holds one or more *streams* (usually video and/or audio). This demo +# writes one video stream. +# +# The container interleaves ("muxes") stream data so players can read everything in timestamp order. libav calls those +# pieces "packets". (In MP4 they're not literally network-style packets; the term is a longstanding libav +# abstraction.) +# +# A *codec* is the algorithm that compresses/decompresses a stream. For MP4 video, common codecs include H.264 and +# H.265. This demo defaults to H.264 via `libx264`, because it's widely supported. You can switch to hardware +# encoders (e.g. `h264_nvenc`) if available. +# +# Frames and frame reordering (I/P/B) +# ---------------------------------- +# +# Video is encoded as a sequence of frames: +# - I-frames: complete images. +# - P-frames: changes from previous frames. +# - B-frames: changes predicted using both past *and future* frames. +# +# B-frames are why "the order frames are encoded/decoded" can differ from "the order frames are shown". That leads +# directly to timestamps. +# +# Timestamps (PTS/DTS) +# -------------------- +# +# Every frame has a *presentation timestamp* (PTS): when the viewer should see it. (See the next section for how +# these are represented.) +# +# Encoders may output packets in a different order due to B-frames. Those packets also have a *decode timestamp* +# (DTS): when the decoder must decode them so the PTS schedule can be met. +# +# In this demo we set PTS on `VideoFrame`s and let libav/PyAV propagate timestamps into the encoded packets. +# +# Time base +# --------- +# +# Timestamps are integers, and their unit is a fraction of a second called the *time base*. For example, with a time +# base of 1/90000, a timestamp of 90000 means "1 second". PyAV will convert between time bases when needed, but you +# must set them consistently where you generate timestamps. +# +# See +# +# This demo uses a time base of 1/90000 (a common MPEG-derived choice). +# +# Constant Frame Rate (CFR) and Variable Frame Rate (VFR) +# ------------------------------------------------------- +# +# Many video files run at a fixed frame rate, like 30 fps. Each frame is shown at 1/30 sec intervals. This is called +# *constant frame rate*, or *CFR*, and that's what we do in the simple version of this demo. +# +# Applications using CFR usually set the time base to the frame rate, such as 1/30 sec. This lets them just use the +# frame number for the PTS. +# +# One problem with real-time recording to CFR is that, if the encoder can't keep up, the video will appear sped-up +# when played back. The comments at the beginning of the simple version of this demo go into more detail about that +# problem. +# +# In this advanced version, we use *variable frame rate*, or *VFR*. That's because we can't be sure that the encoder +# will be able to work fast enough: we haven't tuned its settings for your screen resolution and hardware. While the +# encoder might be fast enough, it might only be able to operate at 18 fps, or even less. +# +# Instead, we mark each frame with the correct time that it should be shown. Even if the encoder is falling behind, +# its frames are still marked with the right times, so the player will just keep the previous frame on the screen a +# little longer. +# +# Some video editing software historically has had problems with VFR video. It's much better now than it was a few +# years ago, but if you plan to edit the video, you may need to convert it to CFR. There are many resources online +# about how to do that. +# +# Performance (why multiple threads?) +# ---------------------------------- +# +# Capturing frames, converting them to `VideoFrame`s, encoding, and muxing are separate stages. This demo pipelines +# those stages across threads so that (for example) encoding can run while the next screen grab is happening. The +# comments at the top of common/pipeline.py describe pipelining in detail. +# +# The slowest stage typically limits overall FPS. Usually, that's the encoder. +# +# On an idle system (rough guide; will vary widely): +# - libx264, 1920x1080: ~80 fps +# - libx264, 3840x2160: ~18 fps +# - h264_nvenc, 1920x1080: ~190 fps +# - h264_nvenc, 3840x2160: ~41 fps + +import argparse +import logging +import signal +import time +from collections import deque +from collections.abc import Generator, Iterable, Sequence +from fractions import Fraction +from functools import partial +from math import floor +from threading import Event +from typing import Any + +# Install the necessary libraries with "pip install av mss numpy si-prefix". +import av +import numpy as np +from si_prefix import si_format + +import mss +from common.pipeline import Mailbox, PipelineStage + + +# These are the options you'd give to ffmpeg that it sends to the video codec. Because ffmpeg and PyAV both use the +# libav libraries, you can get the list of available flags with `ffmpeg -help encoder=libx264`, or whatever encoder +# you're using for this demo's `--codec` flag. The options for each encoder are described in more detail in `man +# ffmpeg-codecs`. +CODEC_OPTIONS = { + # The "high" profile means that the encoder can use some H.264 features that are widely supported, but not + # mandatory. If you're using a codec other than H.264, you'll need to comment out this line: the relevant + # features are already part of the main profile in later codecs like H.265, VP8, VP9, and AV1. + "profile": "high", + # The "medium" preset is as good of a preset as any for a demo like this. Different codecs have different + # presets; the h264_nvenc actually prefers "p4", but accepts "medium" as a similar preset. You might prefer + # "fast" if you're not getting enough FPS. + "preset": "medium", + # 6 Mbit/sec is vaguely the ballpark for a good-quality video at 1080p and 30 fps, but there's a lot of variation. + # We're just giving the target bitrate: the second-to-second bitrate will vary a lot, and slowly approach this + # bitrate. If you're trying this on a nearly-still screen, though, then the actual bitrate will be much lower, + # since there's not much motion to encode! + "b": "6M", + # Let the encoder hold some frames for analysis, and flush them later. This especially helps with the + # hardware-accelerated codecs. + "rc-lookahead": "40", +} + + +TIME_BASE = Fraction(1, 90000) + +# Currently, MSS doesn't give us information about the display's colorspace. See where this is used below for more +# information. +DISPLAY_IS_SRGB = False + +LOGGER = logging.getLogger("video-capture") + + +def video_capture( + fps: int, + sct: mss.base.MSSBase, + monitor: mss.models.Monitor, + shutdown_requested: Event, +) -> Generator[tuple[mss.screenshot.ScreenShot, float], None, None]: + # Keep track of the time when we want to get the next frame. We limit the frame time this way instead of sleeping + # 1/fps sec each frame, since we want to also account for the time taken to get the screenshot and other overhead. + # + # Repeatedly adding small floating-point numbers to a total does cause some numeric inaccuracies, but it's small + # enough for our purposes. The program would have to run for three months to accumulate one millisecond of + # inaccuracy. + next_frame_at = time.monotonic() + + # Keep running this loop until the main thread says we should stop. + while not shutdown_requested.is_set(): + # Wait until we're ready. This should, ideally, happen every 1/fps second. + while (now := time.monotonic()) < next_frame_at: + time.sleep(next_frame_at - now) + + # Capture a frame, and send it to the next processing stage. + screenshot = sct.grab(monitor) + yield screenshot, now + + # We try to keep the capture rate at the desired fps on average. If we can't quite keep up for a moment (such + # as if the computer is a little overloaded), then we'll accumulate a bit of "timing debt" in next_frame_at: + # it'll be a little sooner than now + one frame. We'll hopefully be able to catch up soon. + next_frame_at = next_frame_at + (1 / fps) + + # If we've accumulated over one frame's worth of timing debt, then that will say that next_frame_at is sooner + # than now. If we're accumulating too much debt, we want to wipe it out, rather than having a huge burst of + # closely-spaced captures as soon as we can get back to our desired capture rate. When we wipe that out, we + # still try to preserve the timing cycle's phase to keep the capture cadence smooth, rather than having a + # jittery burst of closely-spaced captures. In other words, we increment next_frame_at by a multiple of the + # desired capture period. + if next_frame_at < now: + missed_frames = floor((now - next_frame_at) * fps) + next_frame_at += (missed_frames + 1) / fps + + +def video_process( + screenshot_and_timestamp: Iterable[tuple[mss.screenshot.ScreenShot, float]], +) -> Generator[av.VideoFrame, None, None]: + # We track when the first frame happened so we can make PTS start at 0. Many video players and other tools expect + # that. + first_frame_at: float | None = None + + for screenshot, timestamp in screenshot_and_timestamp: + # Avoiding extra pixel copies + # --------------------------- + # + # Copying a full frame of pixels is expensive. On typical hardware, a plain CPU memcpy of a 4K BGRA image can + # cost on the order of ~3ms by itself, which is a big chunk of a 30fps budget (33ms) and an even bigger chunk + # of a 60fps budget (16.7ms). + # + # So we want to be careful about the *conversion* step from an MSS `ScreenShot` to a PyAV `VideoFrame`. + # Ideally, that step should reuse the same underlying bytes rather than creating additional intermediate + # copies. + # + # Buffers in Python + # ----------------- + # + # Many Python objects expose their underlying memory via the "buffer protocol". A buffer is just a view of + # raw bytes that other libraries can interpret without copying. + # + # Common buffer objects include: `bytes`, `bytearray`, `memoryview`, and `array.array`. `screenshot.bgra` is + # also a buffer (currently it is a `bytes` object, though that detail may change in the future). + # + # Minimum-copy path: ScreenShot -> NumPy -> VideoFrame + # ---------------------------------------------------- + # + # `np.frombuffer()` creates an ndarray *view* of an existing buffer (no copy). Reshaping also stays as a + # view. + # + # PyAV's `VideoFrame.from_ndarray()` always copies the data into a new frame-owned buffer. For this demo we + # use the undocumented `VideoFrame.from_numpy_buffer()`, which creates a `VideoFrame` that shares memory with + # the ndarray. + ndarray = np.frombuffer(screenshot.bgra, dtype=np.uint8) + ndarray = ndarray.reshape(screenshot.height, screenshot.width, 4) + frame = av.VideoFrame.from_numpy_buffer(ndarray, format="bgra") + + # Set the PTS and time base for the frame. + # + # We compute PTS based on the actual time we captured the screenshot, relative to when we got the first + # frame. This gives us variable frame rate (VFR) video that accurately reflects the times the frames were + # captured. + # + # However, if we were muxing in an audio stream as well, we'd want to use a common clock for both audio and + # video PTS, preferably based on the audio clock. That's because audio glitches are more noticeable than + # video glitches, so audio timing should be prioritized. In that case, the video PTS would be based on the + # audio clock, not the actual capture time. + # + # The easiest way to do that is to record the monotonic clock in both the video and audio capture stages + # (taking the audio latency into account), record the audio PTS based on how many audio samples have been + # captured, and then adjust the video PTS based on the skew between the audio and monotonic clocks. + if first_frame_at is None: + first_frame_at = timestamp + frame.pts = int((timestamp - first_frame_at) / TIME_BASE) + frame.time_base = TIME_BASE + + # If we know the colorspace of our frames, mark them accordingly. See the comment where we set these + # attributes on video_stream for details. + if DISPLAY_IS_SRGB: + frame.colorspace = av.video.reformatter.Colorspace.ITU709 + frame.color_range = av.video.reformatter.ColorRange.JPEG + + yield frame + + +def video_encode( + video_stream: av.video.stream.VideoStream, frames: Iterable[av.VideoFrame] +) -> Generator[Sequence[av.Packet], None, None]: + for frame in frames: + yield video_stream.encode(frame) + # Our input has run out. Flush the frames that the encoder still is holding internally (such as to compute + # B-frames). + yield video_stream.encode(None) + + +def show_stats( + packet_batches: Iterable[Sequence[av.Packet]], +) -> Iterable[Sequence[av.Packet]]: + """Display streaming statistics (FPS and throughput). + + Statistics are displayed over a 100-frame sliding window. + + FPS indicates how fast the entire pipeline can run as a whole, not + any individual stage. + """ + # The start time is only used for showing the clock. The actual timing stats use packet timestamps (ultimately + # derived from the frame PTS we compute during capture). + start_time = time.monotonic() + time_deque: deque[int] = deque(maxlen=100) + bit_count_deque: deque[int] = deque(maxlen=100) + next_display_update = 0.0 + last_status_len = 0 + + for frame_count, packet_batch in enumerate(packet_batches): + # Yield the packet data immediately, so the mux gets it as soon as possible, while we update our stats. + yield packet_batch + + for packet in packet_batch: + # FPS from timestamps: why DTS, not PTS? + # + # Intuitively, you'd expect to compute FPS from PTS (the time the viewer should *see* each frame). But + # encoders can reorder frames internally (especially with B-frames), so packets may come out in a + # different order than PTS. + # + # If we update a sliding window with out-of-order PTS values, the window start/end can "wiggle" even when + # the pipeline is steady, which makes the displayed FPS noisy. + # + # DTS is the time order the decoder must process packets. Packets are emitted in DTS order, so using DTS + # gives a stable, monotonic timeline for the sliding window. + time_deque.append(packet.dts) + bit_count = packet.size * 8 + bit_count_deque.append(bit_count) + + now = time.monotonic() + if now >= next_display_update and len(time_deque) > 1: + next_display_update = now + 0.1 + running_time = now - start_time + running_minutes = int(running_time / 60) + running_seconds = int(running_time % 60) + window_secs = (time_deque[-1] - time_deque[0]) * TIME_BASE + # We can't use the last frame in the window when we divide by window_secs; that would be a fencepost + # error. + window_frames = len(time_deque) - 1 + window_bits = sum(bit_count_deque) - bit_count_deque[-1] + fps = window_frames / window_secs + bits_per_sec = int(window_bits / window_secs) + line = ( + f"{running_minutes:02d}:{running_seconds:02d} " + f"frame {frame_count}: {fps:.2f} fps, " + f"{si_format(bits_per_sec, precision=2)}bps" + ) + this_status_len = len(line) + full_line = f"\r{line}{' ' * (last_status_len - this_status_len)}" + print(full_line, end="") + last_status_len = this_status_len + + # At shutdown, the encoder flush can emit packets in large bursts, and we also throttle status updates (to avoid + # spamming the terminal). That combination means the last displayed line may be stale or not representative of + # the final frames. Rather than leaving potentially misleading numbers on screen, erase the status display. + print(f"\r{' ' * last_status_len}\r", end="") + + +def mux( + avmux: av.container.OutputContainer, + packet_batches: Iterable[Sequence[av.Packet]], +) -> None: + for packet_batch in packet_batches: + avmux.mux(packet_batch) + + +def parse_region(s: str) -> tuple[int, int, int, int]: + """Parse comma-separated region string into (left, top, right, bottom).""" + parts = s.split(",") + if len(parts) != 4: + msg = "region must be four comma-separated integers" + raise argparse.ArgumentTypeError(msg) + try: + return tuple(int(p.strip()) for p in parts) # type: ignore[return-value] + except ValueError as e: + msg = "region values must be integers" + raise argparse.ArgumentTypeError(msg) from e + + +def main() -> None: + logging.basicConfig(level=logging.DEBUG) + # If we don't enable PyAV's own logging, a lot of important error messages from libav won't be shown. + av.logging.set_level(av.logging.VERBOSE) + + parser = argparse.ArgumentParser(description="Capture screen video to MP4 file") + parser.add_argument( + "-f", + "--fps", + type=int, + default=30, + help="frames per second (default: 30)", + ) + monitor_group = parser.add_mutually_exclusive_group() + monitor_group.add_argument( + "-m", + "--monitor", + type=int, + default=1, + help="monitor ID to capture (default: 1)", + ) + monitor_group.add_argument( + "-r", + "--region", + type=parse_region, + metavar="LEFT,TOP,RIGHT,BOTTOM", + help="region to capture as comma-separated coordinates", + ) + parser.add_argument( + "-c", + "--codec", + default="libx264", + help=( + 'video codec implementation, same as the ffmpeg "-c:v" flag. ' + 'Run "python3 -m av --codecs" for a full list. ' + "(default: libx264. Try h264_nvenc for Nvidia " + "hardware encoding.)" + ), + ) + parser.add_argument( + "-d", + "--duration-secs", + type=float, + help="Duration to record (default: no limit)", + ) + parser.add_argument( + "-o", + "--output", + default="capture.mp4", + help="output filename (default: capture.mp4)", + ) + args = parser.parse_args() + + fps = args.fps + codec = args.codec + filename = args.output + duration_secs = args.duration_secs + + with mss.mss() as sct: + if args.region: + left, top, right, bottom = args.region + monitor = { + "left": left, + "top": top, + "width": right - left, + "height": bottom - top, + } + else: + monitor = sct.monitors[args.monitor] + + # We don't pass the container format to av.open here, so it will choose it based on the extension: .mp4, .mkv, + # etc. + with av.open(filename, "w") as avmux: + # We could initialize video_stream in video_encode, but doing it here means that we can open it before + # starting the capture thread, which avoids a warmup frame (one that takes longer to encode because the + # encoder is just starting). + # + # The rate= parameter here is just the nominal frame rate: some tools (like file browsers) might display + # this as the frame rate. But we actually control timing via the pts and time_base values on the frames + # themselves. + video_stream = avmux.add_stream(codec, rate=fps, options=CODEC_OPTIONS) + + # Ideally, we would set attributes such as colorspace, color_range, color_primaries, and color_trc here to + # describe the colorspace accurately. Otherwise, the player has to guess whether this was recorded on an + # sRGB Windows machine, a Display P3 Mac, or if it's using linear RGB. Currently, MSS doesn't give us + # colorspace information (DISPLAY_IS_SRGB is always False in this demo), so we don't try to specify a + # particular colorspace. However, if your application knows the colorspace you're recording from, then + # you can set those attributes on the stream and the frames accordingly. + # + # These properties on the stream (actually, they're attached to its CodecContext) are used to tell the + # stream and container how to label the video stream's colorspace. There are similar attributes on the + # frame itself; those are used to identify its colorspace, so the codec can do the correct RGB to YUV + # conversion. + if DISPLAY_IS_SRGB: + # color_primaries=1 is libavutil's AVCOL_PRI_BT709; PyAV doesn't define named constants for color + # primaries. + video_stream.color_primaries = 1 + # What PyAV refers to as ITU709 is more commonly known as BT.709. + video_stream.colorspace = av.video.reformatter.Colorspace.ITU709 + # The "JPEG" color range is saying that we're using a color range like a computer, not like broadcast + # TV. + video_stream.color_range = av.video.reformatter.ColorRange.JPEG + # PyAV doesn't define named constants for TRCs, so we pass it a numeric value. Technically, sRGB's + # transformation characteristic is AVCOL_TRC_IEC61966_2_1 (13). It's nearly the same as BT.709's TRC, + # so some video encoders will tag it as AVCOL_TRC_BT709 (1) instead. + video_stream.color_trc = 13 + + video_stream.width = monitor["width"] + video_stream.height = monitor["height"] + # There are multiple time bases in play (stream, codec context, per-frame). Depending on the container + # and codec, some of these might be ignored or overridden. We set the desired time base consistently + # everywhere, so that the saved timestamps are correct regardless of what format we're saving to. + video_stream.time_base = TIME_BASE + video_stream.codec_context.time_base = TIME_BASE + # `pix_fmt` here describes the pixel format we will *feed* into the encoder (not necessarily what the + # encoder will store in the bitstream). H.264 encoders ultimately convert to a YUV 4:2:0 format + # internally. + # + # If the encoder accepts BGRx input (e.g., h264_nvenc), we can hand it MSS's BGRx frames directly and + # avoid an extra pre-conversion step on our side. For a hardware encoder, that lets specialized hardware + # do the conversion to YUV efficiently. + # + # If the encoder doesn't accept BGRx input (e.g., libx264), PyAV will insert a conversion step + # automatically. In that case, we let the codec choose the pix_fmt it wants. + # + # Note: the alpha channel is ignored by H.264. We usually are sending sending BGRx/BGR0. But PyAV's + # VideoFrame only exposes "bgra" as the closest supported format, so that's how we tag our frames, and + # what we tell the codec to expect, if possible. You might need to change this for codecs like VP9 that + # can handle alpha channels. + if any(f.name == "bgra" for f in video_stream.codec.video_formats): + video_stream.pix_fmt = "bgra" + # We open (initialize) the codec explicitly here. PyAV will automatically open it the first time we + # call video_stream.encode, but the time it takes to set the codec up means the first frame would be + # particularly slow. + video_stream.open() + + shutdown_requested = Event() + + mailbox_screenshot: Mailbox[tuple[mss.screenshot.ScreenShot, float]] = Mailbox() + mailbox_frame: Mailbox[av.VideoFrame] = Mailbox() + mailbox_packet_to_stats: Mailbox[Sequence[av.Packet]] = Mailbox() + mailbox_packet_to_mux: Mailbox[Sequence[av.Packet]] = Mailbox() + + stage_video_capture = PipelineStage( + name="video_capture", + target=partial( + video_capture, + fps, + sct, + monitor, + shutdown_requested, + ), + out_mailbox=mailbox_screenshot, + ) + stage_video_process = PipelineStage( + name="video_process", + in_mailbox=mailbox_screenshot, + target=partial(video_process), + out_mailbox=mailbox_frame, + ) + stage_video_encode = PipelineStage( + name="video_encode", + in_mailbox=mailbox_frame, + target=partial(video_encode, video_stream), + out_mailbox=mailbox_packet_to_stats, + ) + stage_show_stats = PipelineStage( + name="show_stats", + in_mailbox=mailbox_packet_to_stats, + target=show_stats, + out_mailbox=mailbox_packet_to_mux, + ) + stage_mux = PipelineStage( + name="stream_mux", + in_mailbox=mailbox_packet_to_mux, + target=partial(mux, avmux), + ) + + stage_mux.start() + stage_show_stats.start() + stage_video_process.start() + stage_video_encode.start() + stage_video_capture.start() + + LOGGER.debug("Native thread IDs:") + LOGGER.debug(" Capture: %s", stage_video_capture.native_id) + LOGGER.debug(" Preprocess: %s", stage_video_process.native_id) + LOGGER.debug(" Encode: %s", stage_video_encode.native_id) + LOGGER.debug(" Mux: %s", stage_mux.native_id) + + # Handle Ctrl-C gracefully by requesting shutdown. + # + # Python always routes signals to the main thread, so we don't have to worry about another thread getting + # a SIGINT (the Ctrl-C signal). That's significant because if the video capture stage tried to set the + # shutdown_requested event (which requires the event lock) while it was already waiting for it (hence + # holding the lock), it could end up deadlocked. The main thread doesn't ever acquire that lock. As + # another point of safety, Python only will invoke our signal handler at a "safe" point, such as between + # bytecode instructions. + + # We set old_sigint_handler twice: once here, and once when we change the handler. The first time is + # just in case a signal arrives in the tiny window between when we set the new handler (by calling + # signal.signal), and when we assign it to old_sigint_handler (with "="). Signal handling, like + # threading, is tricky to get right. + old_sigint_handler = signal.getsignal(signal.SIGINT) + + def sigint_handler(_signum: int, _frame: Any) -> None: + # Restore the default behavior, so if our shutdown doesn't work because of a bug in our code, the user + # can still press ^C again to terminate the program. (The default handler is also in + # signal.default_int_handler, but that's not documented.) + signal.signal(signal.SIGINT, old_sigint_handler) + # The status line will typically be visible, so start a fresh line for this message. + print("\nShutting down") + shutdown_requested.set() + + old_sigint_handler = signal.signal(signal.SIGINT, sigint_handler) + + print("Starting video capture. Press Ctrl-C to stop.") + + if duration_secs is not None: + # Wait for up to the specified duration. If the pipeline shuts down for other reasons (such as an + # exception), then we'll recognize it sooner with this join. + stage_video_capture.join(timeout=duration_secs) + # Either the join timed out, or we processed a ^C and requested it exit. Either way, it's safe to set + # the shutdown event again, and return to our normal processing loop. + shutdown_requested.set() + + stage_video_capture.join() + stage_video_process.join() + stage_video_encode.join() + stage_show_stats.join() + stage_mux.join() + + # PyAV may insert an implicit conversion step between the frames we provide and what the encoder actually + # accepts (pixel format, colorspace, etc.). When that happens, `video_stream.reformatter` gets set. + # + # This is useful to know for performance: those conversions are typically CPU-side work and can become a + # bottleneck. Hardware-accelerated encoders, such as `h264_nvenc`, often accept BGRx, and can perform the + # conversion using specialized hardware. + # + # We already know that libx264 doesn't accept RGB input, so we don't warn about that. (There is a + # libx264rgb, but that writes to a different H.264 format.) We just want to warn about other codecs, + # since some of them might have ways to use BGRx input, and the programmer might want to investigate. + # + # Note: `reformatter` is created lazily, so it may only be set after frames have been sent through the + # encoder, which is why we check it at the end. + if video_stream.reformatter is not None and codec != "libx264": + LOGGER.warning( + "PyAV inserted a CPU-side pixel-format/colorspace conversion step; this can reduce FPS. " + "Check the acceptable pix_fmts for this codec, and see if one of them can accept some " + "variation of BGRx input directly." + ) + + +if __name__ == "__main__": + main() diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 437e0f3..7b636bb 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -209,3 +209,19 @@ Different possibilities to convert raw BGRA values to RGB:: ... .. versionadded:: 3.2.0 + + +Demos +===== + +In addition to these simple examples, there are full demos of more complex use cases in the ``demos/`` directory of the +source code. The demos are not installed with the package, but you can run them directly from the source tree after +cloning the repository. + +These are complete, working programs that use MSS for screen capture as a key part of their functionality. They show +not only how to invoke MSS, but also some of the techniques for using the captured frames efficiently, in real-world +scenarios. + +These include: +- MP4 video capture with encoding using PyAV (FFmpeg bindings) +- Live streaming to a TinyTV as MJPEG