VideoReader

High-level video reader with array-style access.

Source code in acvr/reader.py
class VideoReader:
    """High-level video reader with array-style access."""

    def __init__(
        self,
        path: str,
        video_stream_index: int = 0,
        *,
        build_index: bool = False,
        decoded_frame_cache_size: int = 0,
        scrub_bucket_ms: int = 100,
        scrub_bucket_lru_size: int = 4096,
    ) -> None:
        """Create a reader for the given video path."""

        self._backend = PyAVVideoBackend(
            path,
            video_stream_index=video_stream_index,
            build_index=build_index,
            decoded_frame_cache_size=decoded_frame_cache_size,
            scrub_bucket_ms=scrub_bucket_ms,
            scrub_bucket_lru_size=scrub_bucket_lru_size,
        )

    def close(self) -> None:
        """Close the underlying video resources."""

        self._backend.close()

    def __enter__(self) -> "VideoReader":
        """Return self for context manager usage."""

        return self

    def __exit__(self, exc_type, exc, tb) -> None:
        """Close the reader when leaving a context manager."""

        self.close()

    def __len__(self) -> int:
        """Return the number of frames in the video."""

        return self.number_of_frames

    def __getitem__(self, key: IndexKey) -> Union[np.ndarray, List[np.ndarray]]:
        """Return a frame or list of frames for the given index or slice."""

        if isinstance(key, slice):
            start, stop, step = key.indices(self.number_of_frames)
            return [self._backend.frame_at_index(i) for i in range(start, stop, step)]
        return self._backend.frame_at_index(int(key))

    def __iter__(self) -> Iterator[np.ndarray]:
        """Iterate over all frames in the video."""

        return iter(self[:])

    @property
    def frame_height(self) -> int:
        """Return the frame height in pixels."""

        return self._backend.frame_height

    @property
    def frame_width(self) -> int:
        """Return the frame width in pixels."""

        return self._backend.frame_width

    @property
    def frame_rate(self) -> float:
        """Return the video frame rate."""

        return self._backend.frame_rate

    @property
    def fourcc(self) -> int:
        """Return the fourcc codec identifier."""

        return self._backend.fourcc

    @property
    def frame_format(self) -> int:
        """Return the pixel format identifier."""

        return self._backend.frame_format

    @property
    def number_of_frames(self) -> int:
        """Return the total number of frames."""

        return self._backend.number_of_frames

    @property
    def frame_shape(self) -> tuple:
        """Return the expected frame shape (H, W, C)."""

        return self._backend.frame_shape

    @property
    def current_frame_pos(self) -> float:
        """Return the last accessed frame index."""

        return self._backend.current_frame_pos

    def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
        """Build a keyframe index for faster random access."""

        return self._backend.build_keyframe_index(max_packets=max_packets)

    def read_keyframe_at(
        self,
        t_s: float,
        *,
        mode: str = "previous",
        decode_rgb: bool = True,
    ) -> DecodedFrame:
        """Return a nearby keyframe for fast scrubbing."""

        return self._backend.read_keyframe_at(t_s, mode=mode, decode_rgb=decode_rgb)

    def read_frame_at(
        self,
        t_s: float,
        *,
        return_first_after: bool = True,
        max_decode_frames: int = 10_000,
        use_index: bool = True,
    ) -> DecodedFrame:
        """Return a frame at a timestamp with accurate seeking."""

        return self._backend.read_frame_at(
            t_s,
            return_first_after=return_first_after,
            max_decode_frames=max_decode_frames,
            use_index=use_index,
        )

    def read_frame_fast(
        self,
        *,
        index: Optional[int] = None,
        t_s: Optional[float] = None,
        decode_rgb: bool = False,
    ) -> DecodedFrame:
        """Return a fast, approximate frame for an index or timestamp."""

        return self._backend.read_frame_fast(
            index=index,
            t_s=t_s,
            decode_rgb=decode_rgb,
        )

    def read_frame(
        self,
        *,
        index: Optional[int] = None,
        t_s: Optional[float] = None,
        mode: str = "accurate",
        decode_rgb: bool = False,
        keyframe_mode: str = "previous",
    ) -> DecodedFrame:
        """Read a frame using a selectable access mode."""

        if mode not in {"accurate", "fast", "scrub"}:
            raise ValueError("mode must be one of: 'accurate', 'fast', 'scrub'")
        if index is None and t_s is None:
            raise ValueError("Provide either index or t_s")
        if index is not None and t_s is not None:
            raise ValueError("Provide only one of index or t_s")

        if mode == "accurate":
            if index is not None:
                return self._backend.frame_at_index(int(index))
            assert t_s is not None
            return self._backend.read_frame_at(float(t_s))

        if mode == "scrub":
            if t_s is None:
                fps = self.frame_rate or 1.0
                t_s = float(index) / fps
            return self._backend.read_keyframe_at(float(t_s), mode=keyframe_mode, decode_rgb=decode_rgb)

        return self._backend.read_frame_fast(
            index=index,
            t_s=t_s,
            decode_rgb=decode_rgb,
        )

_backend = PyAVVideoBackend(path, video_stream_index=video_stream_index, build_index=build_index, decoded_frame_cache_size=decoded_frame_cache_size, scrub_bucket_ms=scrub_bucket_ms, scrub_bucket_lru_size=scrub_bucket_lru_size) instance-attribute

current_frame_pos property

Return the last accessed frame index.

fourcc property

Return the fourcc codec identifier.

frame_format property

Return the pixel format identifier.

frame_height property

Return the frame height in pixels.

frame_rate property

Return the video frame rate.

frame_shape property

Return the expected frame shape (H, W, C).

frame_width property

Return the frame width in pixels.

number_of_frames property

Return the total number of frames.

__enter__()

Return self for context manager usage.

Source code in acvr/reader.py
def __enter__(self) -> "VideoReader":
    """Return self for context manager usage."""

    return self

__exit__(exc_type, exc, tb)

Close the reader when leaving a context manager.

Source code in acvr/reader.py
def __exit__(self, exc_type, exc, tb) -> None:
    """Close the reader when leaving a context manager."""

    self.close()

__getitem__(key)

Return a frame or list of frames for the given index or slice.

Source code in acvr/reader.py
def __getitem__(self, key: IndexKey) -> Union[np.ndarray, List[np.ndarray]]:
    """Return a frame or list of frames for the given index or slice."""

    if isinstance(key, slice):
        start, stop, step = key.indices(self.number_of_frames)
        return [self._backend.frame_at_index(i) for i in range(start, stop, step)]
    return self._backend.frame_at_index(int(key))

__init__(path, video_stream_index=0, *, build_index=False, decoded_frame_cache_size=0, scrub_bucket_ms=100, scrub_bucket_lru_size=4096)

Create a reader for the given video path.

Source code in acvr/reader.py
def __init__(
    self,
    path: str,
    video_stream_index: int = 0,
    *,
    build_index: bool = False,
    decoded_frame_cache_size: int = 0,
    scrub_bucket_ms: int = 100,
    scrub_bucket_lru_size: int = 4096,
) -> None:
    """Create a reader for the given video path."""

    self._backend = PyAVVideoBackend(
        path,
        video_stream_index=video_stream_index,
        build_index=build_index,
        decoded_frame_cache_size=decoded_frame_cache_size,
        scrub_bucket_ms=scrub_bucket_ms,
        scrub_bucket_lru_size=scrub_bucket_lru_size,
    )

__iter__()

Iterate over all frames in the video.

Source code in acvr/reader.py
def __iter__(self) -> Iterator[np.ndarray]:
    """Iterate over all frames in the video."""

    return iter(self[:])

__len__()

Return the number of frames in the video.

Source code in acvr/reader.py
def __len__(self) -> int:
    """Return the number of frames in the video."""

    return self.number_of_frames

build_keyframe_index(*, max_packets=None)

Build a keyframe index for faster random access.

Source code in acvr/reader.py
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
    """Build a keyframe index for faster random access."""

    return self._backend.build_keyframe_index(max_packets=max_packets)

close()

Close the underlying video resources.

Source code in acvr/reader.py
def close(self) -> None:
    """Close the underlying video resources."""

    self._backend.close()

read_frame(*, index=None, t_s=None, mode='accurate', decode_rgb=False, keyframe_mode='previous')

Read a frame using a selectable access mode.

Source code in acvr/reader.py
def read_frame(
    self,
    *,
    index: Optional[int] = None,
    t_s: Optional[float] = None,
    mode: str = "accurate",
    decode_rgb: bool = False,
    keyframe_mode: str = "previous",
) -> DecodedFrame:
    """Read a frame using a selectable access mode."""

    if mode not in {"accurate", "fast", "scrub"}:
        raise ValueError("mode must be one of: 'accurate', 'fast', 'scrub'")
    if index is None and t_s is None:
        raise ValueError("Provide either index or t_s")
    if index is not None and t_s is not None:
        raise ValueError("Provide only one of index or t_s")

    if mode == "accurate":
        if index is not None:
            return self._backend.frame_at_index(int(index))
        assert t_s is not None
        return self._backend.read_frame_at(float(t_s))

    if mode == "scrub":
        if t_s is None:
            fps = self.frame_rate or 1.0
            t_s = float(index) / fps
        return self._backend.read_keyframe_at(float(t_s), mode=keyframe_mode, decode_rgb=decode_rgb)

    return self._backend.read_frame_fast(
        index=index,
        t_s=t_s,
        decode_rgb=decode_rgb,
    )

read_frame_at(t_s, *, return_first_after=True, max_decode_frames=10000, use_index=True)

Return a frame at a timestamp with accurate seeking.

Source code in acvr/reader.py
def read_frame_at(
    self,
    t_s: float,
    *,
    return_first_after: bool = True,
    max_decode_frames: int = 10_000,
    use_index: bool = True,
) -> DecodedFrame:
    """Return a frame at a timestamp with accurate seeking."""

    return self._backend.read_frame_at(
        t_s,
        return_first_after=return_first_after,
        max_decode_frames=max_decode_frames,
        use_index=use_index,
    )

read_frame_fast(*, index=None, t_s=None, decode_rgb=False)

Return a fast, approximate frame for an index or timestamp.

Source code in acvr/reader.py
def read_frame_fast(
    self,
    *,
    index: Optional[int] = None,
    t_s: Optional[float] = None,
    decode_rgb: bool = False,
) -> DecodedFrame:
    """Return a fast, approximate frame for an index or timestamp."""

    return self._backend.read_frame_fast(
        index=index,
        t_s=t_s,
        decode_rgb=decode_rgb,
    )

read_keyframe_at(t_s, *, mode='previous', decode_rgb=True)

Return a nearby keyframe for fast scrubbing.

Source code in acvr/reader.py
def read_keyframe_at(
    self,
    t_s: float,
    *,
    mode: str = "previous",
    decode_rgb: bool = True,
) -> DecodedFrame:
    """Return a nearby keyframe for fast scrubbing."""

    return self._backend.read_keyframe_at(t_s, mode=mode, decode_rgb=decode_rgb)

Backend implementation

Frame-accurate seeking with keyframe index and scrub acceleration.

Source code in acvr/_pyav_backend.py
class PyAVVideoBackend:
    """Frame-accurate seeking with keyframe index and scrub acceleration."""

    def __init__(
        self,
        path: str,
        video_stream_index: int = 0,
        *,
        build_index: bool = False,
        decoded_frame_cache_size: int = 0,
        scrub_bucket_ms: int = 100,
        scrub_bucket_lru_size: int = 4096,
    ) -> None:
        """Initialize the PyAV-backed decoder."""

        self._path = path
        self._container = av.open(path)
        self._stream = self._container.streams.video[video_stream_index]
        self._codec_ctx = self._stream.codec_context
        self._fast_container: Optional[av.container.InputContainer] = None
        self._fast_stream: Optional[av.video.stream.VideoStream] = None
        self._fast_first_frame_number: Optional[int] = None

        self._time_base: Fraction = self._stream.time_base
        self._start_pts: int = self._stream.start_time if self._stream.start_time is not None else 0

        self._keyframes: List[KeyframeEntry] = []
        self._index_built: bool = False

        self._frame_pts: Optional[List[int]] = None
        self._frame_count: int = int(self._stream.frames or 0)
        self._current_frame_pos: float = 0.0

        self._frame_cache = _LRU(decoded_frame_cache_size)

        self._scrub_bucket_ms = max(1, int(scrub_bucket_ms))
        self._bucket_to_kfidx = _LRU(scrub_bucket_lru_size)

        if build_index:
            self.build_keyframe_index()

        self._frame_height = int(self._stream.height or 0)
        self._frame_width = int(self._stream.width or 0)
        self._frame_shape = (self._frame_height, self._frame_width, 3)
        self._frame_rate = self._compute_frame_rate()
        self._fourcc = self._compute_fourcc()
        self._frame_format = 0

    def close(self) -> None:
        """Close the underlying PyAV container."""

        self._container.close()
        if self._fast_container is not None:
            self._fast_container.close()
            self._fast_container = None
            self._fast_stream = None
        self._fast_first_frame_number = None

    def __enter__(self) -> "PyAVVideoBackend":
        """Return self for context manager usage."""

        return self

    def __exit__(self, exc_type, exc, tb) -> None:
        """Close the backend on exit from a context manager."""

        self.close()

    def _secs_to_pts(self, t_s: float) -> int:
        """Convert seconds to presentation timestamp units."""

        ticks = int(round(t_s / float(self._time_base)))
        return self._start_pts + ticks

    def _pts_to_secs(self, pts: int) -> float:
        """Convert presentation timestamp units to seconds."""

        return float((pts - self._start_pts) * self._time_base)

    def _pts_to_frame_number(self, pts: Optional[int], fps: float) -> Optional[int]:
        """Convert a PTS value to a rounded frame number."""

        if pts is None:
            return None
        return int(round(self._pts_to_secs(int(pts)) * fps))

    def _frame_time_s(self, pts: Optional[int]) -> float:
        """Return the timestamp for a frame PTS."""

        return float("nan") if pts is None else self._pts_to_secs(pts)

    def _flush_decoder(self) -> None:
        """Flush decoder buffers if supported."""

        try:
            self._codec_ctx.flush_buffers()
        except Exception:
            pass

    def _compute_frame_rate(self) -> float:
        """Compute the stream frame rate in frames per second."""

        rate = self._stream.average_rate or self._stream.base_rate
        return float(rate) if rate is not None else 0.0

    def _compute_fourcc(self) -> int:
        """Compute a fourcc code from the stream codec tag."""

        tag = self._stream.codec_context.codec_tag
        if isinstance(tag, str) and len(tag) >= 4:
            tag = tag[:4]
            return (
                ord(tag[0])
                | (ord(tag[1]) << 8)
                | (ord(tag[2]) << 16)
                | (ord(tag[3]) << 24)
            )
        return 0

    def _ensure_frame_pts(self) -> None:
        """Decode the stream once to collect frame PTS values."""

        if self._frame_pts is not None:
            return
        idx_container = av.open(self._path)
        idx_stream = idx_container.streams.video[self._stream.index]
        pts_list: List[int] = []
        for frame in idx_container.decode(idx_stream):
            pts = frame.pts if frame.pts is not None else frame.dts
            if pts is None:
                pts = self._start_pts + len(pts_list)
            pts_list.append(int(pts))
        idx_container.close()
        self._frame_pts = pts_list
        self._frame_count = len(pts_list)

    def _read_frame_by_pts(self, target_pts: int) -> DecodedFrame:
        """Decode the first frame at or after a target PTS."""

        cached = self._frame_cache.get(target_pts)
        if cached is not None:
            return cached  # type: ignore[return-value]

        if self._index_built:
            idx = self._keyframe_index_at_or_before_pts(target_pts)
            seek_pts = self._keyframes[idx].pts
        else:
            seek_pts = target_pts

        container = av.open(self._path)
        stream = container.streams.video[self._stream.index]
        try:
            container.seek(seek_pts, stream=stream, backward=True, any_frame=False)
            try:
                stream.codec_context.flush_buffers()
            except Exception:
                pass

            last: Optional[DecodedFrame] = None
            for packet in container.demux(stream):
                for frame in packet.decode():
                    pts = frame.pts
                    cur = DecodedFrame(
                        image=frame.to_rgb().to_ndarray(),
                        pts=pts,
                        time_s=self._frame_time_s(pts),
                        key_frame=bool(getattr(frame, "key_frame", False)),
                    )
                    if pts is not None:
                        self._frame_cache.put(int(pts), cur)
                    if pts is None:
                        last = cur
                        continue
                    if pts >= target_pts:
                        return cur
                    last = cur
        finally:
            container.close()

        if last is not None:
            return last
        raise RuntimeError("Could not decode any frames after seeking.")

    def frame_at_index(self, index: int) -> np.ndarray:
        """Return the decoded frame at a zero-based index."""

        self._ensure_frame_pts()
        assert self._frame_pts is not None
        if index < 0:
            index += self._frame_count
        if index < 0 or index >= self._frame_count:
            raise IndexError("frame index out of range")
        target_pts = self._frame_pts[index]
        decoded = self._read_frame_by_pts(target_pts)
        self._current_frame_pos = float(index)
        return decoded.image

    @property
    def frame_height(self) -> int:
        """Return the video frame height."""

        return self._frame_height

    @property
    def frame_width(self) -> int:
        """Return the video frame width."""

        return self._frame_width

    @property
    def frame_rate(self) -> float:
        """Return the reported frame rate in frames per second."""

        return self._frame_rate

    @property
    def fourcc(self) -> int:
        """Return the fourcc codec identifier."""

        return self._fourcc

    @property
    def frame_format(self) -> int:
        """Return the frame format identifier."""

        return self._frame_format

    @property
    def number_of_frames(self) -> int:
        """Return the total number of frames, decoding if needed."""

        if self._frame_count <= 0:
            self._ensure_frame_pts()
        return self._frame_count

    @property
    def frame_shape(self) -> tuple:
        """Return the expected frame shape (H, W, C)."""

        return self._frame_shape

    @property
    def current_frame_pos(self) -> float:
        """Return the last frame index accessed."""

        return self._current_frame_pos

    def _seek_to_pts(self, pts: int, *, backward: bool) -> None:
        """Seek to a timestamp in the stream."""

        self._container.seek(pts, stream=self._stream, backward=backward, any_frame=False)
        self._flush_decoder()

    def _ensure_fast_container(self) -> None:
        """Initialize the fast-seek container if needed."""

        if self._fast_container is not None:
            return
        self._fast_container = av.open(self._path)
        self._fast_stream = self._fast_container.streams.video[self._stream.index]
        try:
            codec_ctx = self._fast_stream.codec_context
            codec_ctx.thread_type = "AUTO"
            codec_ctx.thread_count = 0
        except Exception:
            pass

    def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
        """Scan packets and store keyframe pts/time."""

        path = self._container.name
        idx_container = av.open(path)
        idx_stream = idx_container.streams.video[self._stream.index]

        key_pts: List[int] = []
        n = 0
        for packet in idx_container.demux(idx_stream):
            if packet.dts is None and packet.pts is None:
                continue
            if packet.is_keyframe:
                pts = packet.pts if packet.pts is not None else packet.dts
                if pts is not None:
                    key_pts.append(int(pts))
            n += 1
            if max_packets is not None and n >= max_packets:
                break

        idx_container.close()

        key_pts = sorted(set(key_pts))
        if not key_pts:
            key_pts = [self._start_pts]

        self._keyframes = [KeyframeEntry(pts=p, time_s=self._pts_to_secs(p)) for p in key_pts]
        self._index_built = True

        self._bucket_to_kfidx.clear()
        return self._keyframes

    def _keyframe_index_at_or_before_pts(self, target_pts: int) -> int:
        """Return keyframe index at or before the target PTS."""

        kf = self._keyframes
        if not self._index_built or not kf:
            return 0
        if target_pts <= kf[0].pts:
            return 0
        if target_pts >= kf[-1].pts:
            return len(kf) - 1

        lo, hi = 0, len(kf) - 1
        while lo <= hi:
            mid = (lo + hi) // 2
            m = kf[mid].pts
            if m == target_pts:
                return mid
            if m < target_pts:
                lo = mid + 1
            else:
                hi = mid - 1
        return hi

    def _keyframe_index_nearest_pts(self, target_pts: int) -> int:
        """Return nearest keyframe index to the target PTS."""

        kf = self._keyframes
        if not self._index_built or not kf:
            return 0
        i0 = self._keyframe_index_at_or_before_pts(target_pts)
        i1 = min(i0 + 1, len(kf) - 1)
        if i0 == i1:
            return i0
        d0 = abs(kf[i0].pts - target_pts)
        d1 = abs(kf[i1].pts - target_pts)
        return i0 if d0 <= d1 else i1

    def _bucket_key(self, t_s: float) -> int:
        """Return a bucket key for the scrub acceleration cache."""

        return int(round(t_s * 1000.0 / self._scrub_bucket_ms))

    def _keyframe_index_for_time_fast(self, t_s: float, mode: str) -> int:
        """Return a keyframe index using cached time buckets."""

        if not self._index_built:
            raise RuntimeError("Keyframe index not built. Call build_keyframe_index() first.")

        b = self._bucket_key(t_s)

        mode_tag = {"previous": 0, "nearest": 1, "next": 2}.get(mode)
        if mode_tag is None:
            raise ValueError("mode must be one of: 'previous', 'nearest', 'next'")
        cache_key = (b << 2) | mode_tag

        cached = self._bucket_to_kfidx.get(cache_key)
        if cached is not None:
            return int(cached)

        target_pts = self._secs_to_pts(t_s)

        if mode == "previous":
            idx = self._keyframe_index_at_or_before_pts(target_pts)
        elif mode == "nearest":
            idx = self._keyframe_index_nearest_pts(target_pts)
        else:
            i_prev = self._keyframe_index_at_or_before_pts(target_pts)
            if self._keyframes[i_prev].pts >= target_pts:
                idx = i_prev
            else:
                idx = min(i_prev + 1, len(self._keyframes) - 1)

        self._bucket_to_kfidx.put(cache_key, idx)
        return idx

    def read_keyframe_at(
        self,
        t_s: Number,
        *,
        mode: str = "previous",
        decode_rgb: bool = True,
    ) -> DecodedFrame:
        """Return a nearby keyframe without GOP forward decoding."""

        t_s = float(t_s)
        idx = self._keyframe_index_for_time_fast(t_s, mode)
        key_pts = self._keyframes[idx].pts

        cached = self._frame_cache.get(key_pts)
        if cached is not None:
            return cached  # type: ignore[return-value]

        self._seek_to_pts(key_pts, backward=False)

        for packet in self._container.demux(self._stream):
            for frame in packet.decode():
                pts = frame.pts
                img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray()
                cur = DecodedFrame(
                    image=img,
                    pts=pts,
                    time_s=self._frame_time_s(pts),
                    key_frame=bool(getattr(frame, "key_frame", False)),
                )
                if pts is not None:
                    self._frame_cache.put(int(pts), cur)
                return cur

        raise RuntimeError("Failed to decode a frame after keyframe seek.")

    def read_frame_at(
        self,
        t_s: Number,
        *,
        return_first_after: bool = True,
        max_decode_frames: int = 10_000,
        use_index: bool = True,
    ) -> DecodedFrame:
        """Decode a frame near a timestamp with accurate seeking."""

        t_s = float(t_s)
        target_pts = self._secs_to_pts(t_s)

        cached = self._frame_cache.get(target_pts)
        if cached is not None:
            return cached  # type: ignore[return-value]

        if use_index and self._index_built:
            idx = self._keyframe_index_at_or_before_pts(target_pts)
            anchor_pts = self._keyframes[idx].pts
            self._seek_to_pts(anchor_pts, backward=False)
        else:
            self._seek_to_pts(target_pts, backward=True)

        last: Optional[DecodedFrame] = None
        decoded = 0

        for packet in self._container.demux(self._stream):
            for frame in packet.decode():
                decoded += 1
                if decoded > max_decode_frames:
                    raise RuntimeError(
                        "Exceeded max_decode_frames while seeking; timestamps may be broken."
                    )

                pts = frame.pts
                cur = DecodedFrame(
                    image=frame.to_rgb().to_ndarray(),
                    pts=pts,
                    time_s=self._frame_time_s(pts),
                    key_frame=bool(getattr(frame, "key_frame", False)),
                )
                if pts is not None:
                    self._frame_cache.put(int(pts), cur)

                if pts is None:
                    last = cur
                    continue

                if return_first_after:
                    if pts >= target_pts:
                        return cur
                    last = cur
                else:
                    if pts <= target_pts:
                        last = cur
                    elif last is not None:
                        return last

        if last is not None:
            return last
        raise RuntimeError("Could not decode any frames after seeking.")

    def _read_frame_fast_simple(self, target_pts: int, *, decode_rgb: bool) -> DecodedFrame:
        """Fallback fast seek: seek and decode first frame after PTS."""

        self._ensure_fast_container()
        assert self._fast_container is not None
        assert self._fast_stream is not None

        def grab_frame(container: av.container.InputContainer, stream: av.video.stream.VideoStream) -> Optional[DecodedFrame]:
            for frame in container.decode(stream):
                pts = frame.pts if frame.pts is not None else frame.dts
                if pts is None:
                    target_reached = True
                else:
                    target_reached = pts >= target_pts
                if not target_reached:
                    continue
                if decode_rgb:
                    img = frame.to_rgb().to_ndarray()
                else:
                    img = frame.to_ndarray(format="bgr24")
                cur = DecodedFrame(
                    image=img,
                    pts=pts,
                    time_s=self._frame_time_s(pts),
                    key_frame=bool(getattr(frame, "key_frame", False)),
                )
                if pts is not None:
                    self._frame_cache.put(int(pts), cur)
                return cur
            return None

        self._fast_container.seek(
            target_pts,
            stream=self._fast_stream,
            backward=True,
            any_frame=True,
        )
        try:
            self._fast_stream.codec_context.flush_buffers()
        except Exception:
            pass
        grabbed = grab_frame(self._fast_container, self._fast_stream)
        if grabbed is not None:
            return grabbed

        self._fast_container.seek(
            target_pts,
            stream=self._fast_stream,
            backward=True,
            any_frame=False,
        )
        try:
            self._fast_stream.codec_context.flush_buffers()
        except Exception:
            pass
        grabbed = grab_frame(self._fast_container, self._fast_stream)
        if grabbed is not None:
            return grabbed

        raise RuntimeError("Failed to decode a frame after fast seek.")

    def _read_frame_fast_opencv_pyav(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
        """Approximate OpenCV seek behavior using PyAV."""

        fps = self._frame_rate or 1.0
        self._ensure_fast_container()
        assert self._fast_container is not None
        assert self._fast_stream is not None

        def seek_to_frame(frame_index: int) -> None:
            target_pts = self._secs_to_pts(frame_index / fps)
            self._fast_container.seek(
                target_pts,
                stream=self._fast_stream,
                backward=True,
                any_frame=False,
            )
            try:
                self._fast_stream.codec_context.flush_buffers()
            except Exception:
                pass

        def frame_number_from_pts(pts: Optional[int]) -> Optional[int]:
            num = self._pts_to_frame_number(pts, fps)
            if num is None:
                return None
            if self._fast_first_frame_number is None:
                return num
            return num - self._fast_first_frame_number

        first_frame = None
        if self._fast_first_frame_number is None:
            seek_to_frame(0)
            for frame in self._fast_container.decode(self._fast_stream):
                pts = frame.pts if frame.pts is not None else frame.dts
                self._fast_first_frame_number = self._pts_to_frame_number(pts, fps) or 0
                first_frame = frame
                break

        if target_frame <= 0:
            if first_frame is None:
                seek_to_frame(0)
                for frame in self._fast_container.decode(self._fast_stream):
                    first_frame = frame
                    break
            if first_frame is None:
                raise RuntimeError("Failed to decode a frame after fast seek.")
            pts = first_frame.pts if first_frame.pts is not None else first_frame.dts
            img = first_frame.to_rgb().to_ndarray() if decode_rgb else first_frame.to_ndarray(format="bgr24")
            cur = DecodedFrame(
                image=img,
                pts=pts,
                time_s=self._frame_time_s(pts),
                key_frame=bool(getattr(first_frame, "key_frame", False)),
            )
            if pts is not None:
                self._frame_cache.put(int(pts), cur)
            return cur

        delta = 16
        attempts = 0
        while True:
            start_frame = max(target_frame - delta, 0)
            seek_to_frame(start_frame)
            decoder = self._fast_container.decode(self._fast_stream)
            try:
                frame = next(decoder)
            except StopIteration:
                break

            pts = frame.pts if frame.pts is not None else frame.dts
            frame_number = frame_number_from_pts(pts)
            if frame_number is None:
                frame_number = start_frame

            if frame_number < 0 or frame_number > target_frame:
                if start_frame == 0 or delta >= 1 << 30 or attempts > 20:
                    break
                delta = delta * 2 if delta < 16 else int(delta * 1.5)
                attempts += 1
                continue

            while frame_number < target_frame:
                try:
                    frame = next(decoder)
                except StopIteration:
                    frame = None
                    break
                pts = frame.pts if frame.pts is not None else frame.dts
                frame_number = frame_number_from_pts(pts)
                if frame_number is None:
                    frame_number = target_frame

            if frame is None:
                break

            pts = frame.pts if frame.pts is not None else frame.dts
            img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
            cur = DecodedFrame(
                image=img,
                pts=pts,
                time_s=self._frame_time_s(pts),
                key_frame=bool(getattr(frame, "key_frame", False)),
            )
            if pts is not None:
                self._frame_cache.put(int(pts), cur)
            return cur

        target_pts = self._secs_to_pts(target_frame / fps)
        return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)


    def read_frame_fast(
        self,
        *,
        index: Optional[int] = None,
        t_s: Optional[Number] = None,
        decode_rgb: bool = False,
    ) -> DecodedFrame:
        """Return a fast, approximate frame for an index or timestamp."""

        if index is None and t_s is None:
            raise ValueError("Provide either index or t_s")
        if index is not None and t_s is not None:
            raise ValueError("Provide only one of index or t_s")

        if t_s is None:
            if index is None:
                raise ValueError("Provide either index or t_s")
            if index < 0:
                index += self.number_of_frames
            target_index = int(index)
        else:
            t_s = float(t_s)
            target_index = int(round(t_s * (self._frame_rate or 1.0)))

        fps = self._frame_rate or 1.0
        target_pts = self._secs_to_pts(target_index / fps)

        cached = self._frame_cache.get(target_pts)
        if cached is not None:
            return cached  # type: ignore[return-value]

        return self._read_frame_fast_opencv_pyav(target_index, decode_rgb=decode_rgb)

_bucket_to_kfidx = _LRU(scrub_bucket_lru_size) instance-attribute

_codec_ctx = self._stream.codec_context instance-attribute

_container = av.open(path) instance-attribute

_current_frame_pos = 0.0 instance-attribute

_fast_container = None instance-attribute

_fast_first_frame_number = None instance-attribute

_fast_stream = None instance-attribute

_fourcc = self._compute_fourcc() instance-attribute

_frame_cache = _LRU(decoded_frame_cache_size) instance-attribute

_frame_count = int(self._stream.frames or 0) instance-attribute

_frame_format = 0 instance-attribute

_frame_height = int(self._stream.height or 0) instance-attribute

_frame_pts = None instance-attribute

_frame_rate = self._compute_frame_rate() instance-attribute

_frame_shape = (self._frame_height, self._frame_width, 3) instance-attribute

_frame_width = int(self._stream.width or 0) instance-attribute

_index_built = False instance-attribute

_keyframes = [] instance-attribute

_path = path instance-attribute

_scrub_bucket_ms = max(1, int(scrub_bucket_ms)) instance-attribute

_start_pts = self._stream.start_time if self._stream.start_time is not None else 0 instance-attribute

_stream = self._container.streams.video[video_stream_index] instance-attribute

_time_base = self._stream.time_base instance-attribute

current_frame_pos property

Return the last frame index accessed.

fourcc property

Return the fourcc codec identifier.

frame_format property

Return the frame format identifier.

frame_height property

Return the video frame height.

frame_rate property

Return the reported frame rate in frames per second.

frame_shape property

Return the expected frame shape (H, W, C).

frame_width property

Return the video frame width.

number_of_frames property

Return the total number of frames, decoding if needed.

__enter__()

Return self for context manager usage.

Source code in acvr/_pyav_backend.py
def __enter__(self) -> "PyAVVideoBackend":
    """Return self for context manager usage."""

    return self

__exit__(exc_type, exc, tb)

Close the backend on exit from a context manager.

Source code in acvr/_pyav_backend.py
def __exit__(self, exc_type, exc, tb) -> None:
    """Close the backend on exit from a context manager."""

    self.close()

__init__(path, video_stream_index=0, *, build_index=False, decoded_frame_cache_size=0, scrub_bucket_ms=100, scrub_bucket_lru_size=4096)

Initialize the PyAV-backed decoder.

Source code in acvr/_pyav_backend.py
def __init__(
    self,
    path: str,
    video_stream_index: int = 0,
    *,
    build_index: bool = False,
    decoded_frame_cache_size: int = 0,
    scrub_bucket_ms: int = 100,
    scrub_bucket_lru_size: int = 4096,
) -> None:
    """Initialize the PyAV-backed decoder."""

    self._path = path
    self._container = av.open(path)
    self._stream = self._container.streams.video[video_stream_index]
    self._codec_ctx = self._stream.codec_context
    self._fast_container: Optional[av.container.InputContainer] = None
    self._fast_stream: Optional[av.video.stream.VideoStream] = None
    self._fast_first_frame_number: Optional[int] = None

    self._time_base: Fraction = self._stream.time_base
    self._start_pts: int = self._stream.start_time if self._stream.start_time is not None else 0

    self._keyframes: List[KeyframeEntry] = []
    self._index_built: bool = False

    self._frame_pts: Optional[List[int]] = None
    self._frame_count: int = int(self._stream.frames or 0)
    self._current_frame_pos: float = 0.0

    self._frame_cache = _LRU(decoded_frame_cache_size)

    self._scrub_bucket_ms = max(1, int(scrub_bucket_ms))
    self._bucket_to_kfidx = _LRU(scrub_bucket_lru_size)

    if build_index:
        self.build_keyframe_index()

    self._frame_height = int(self._stream.height or 0)
    self._frame_width = int(self._stream.width or 0)
    self._frame_shape = (self._frame_height, self._frame_width, 3)
    self._frame_rate = self._compute_frame_rate()
    self._fourcc = self._compute_fourcc()
    self._frame_format = 0

_bucket_key(t_s)

Return a bucket key for the scrub acceleration cache.

Source code in acvr/_pyav_backend.py
def _bucket_key(self, t_s: float) -> int:
    """Return a bucket key for the scrub acceleration cache."""

    return int(round(t_s * 1000.0 / self._scrub_bucket_ms))

_compute_fourcc()

Compute a fourcc code from the stream codec tag.

Source code in acvr/_pyav_backend.py
def _compute_fourcc(self) -> int:
    """Compute a fourcc code from the stream codec tag."""

    tag = self._stream.codec_context.codec_tag
    if isinstance(tag, str) and len(tag) >= 4:
        tag = tag[:4]
        return (
            ord(tag[0])
            | (ord(tag[1]) << 8)
            | (ord(tag[2]) << 16)
            | (ord(tag[3]) << 24)
        )
    return 0

_compute_frame_rate()

Compute the stream frame rate in frames per second.

Source code in acvr/_pyav_backend.py
def _compute_frame_rate(self) -> float:
    """Compute the stream frame rate in frames per second."""

    rate = self._stream.average_rate or self._stream.base_rate
    return float(rate) if rate is not None else 0.0

_ensure_fast_container()

Initialize the fast-seek container if needed.

Source code in acvr/_pyav_backend.py
def _ensure_fast_container(self) -> None:
    """Initialize the fast-seek container if needed."""

    if self._fast_container is not None:
        return
    self._fast_container = av.open(self._path)
    self._fast_stream = self._fast_container.streams.video[self._stream.index]
    try:
        codec_ctx = self._fast_stream.codec_context
        codec_ctx.thread_type = "AUTO"
        codec_ctx.thread_count = 0
    except Exception:
        pass

_ensure_frame_pts()

Decode the stream once to collect frame PTS values.

Source code in acvr/_pyav_backend.py
def _ensure_frame_pts(self) -> None:
    """Decode the stream once to collect frame PTS values."""

    if self._frame_pts is not None:
        return
    idx_container = av.open(self._path)
    idx_stream = idx_container.streams.video[self._stream.index]
    pts_list: List[int] = []
    for frame in idx_container.decode(idx_stream):
        pts = frame.pts if frame.pts is not None else frame.dts
        if pts is None:
            pts = self._start_pts + len(pts_list)
        pts_list.append(int(pts))
    idx_container.close()
    self._frame_pts = pts_list
    self._frame_count = len(pts_list)

_flush_decoder()

Flush decoder buffers if supported.

Source code in acvr/_pyav_backend.py
def _flush_decoder(self) -> None:
    """Flush decoder buffers if supported."""

    try:
        self._codec_ctx.flush_buffers()
    except Exception:
        pass

_frame_time_s(pts)

Return the timestamp for a frame PTS.

Source code in acvr/_pyav_backend.py
def _frame_time_s(self, pts: Optional[int]) -> float:
    """Return the timestamp for a frame PTS."""

    return float("nan") if pts is None else self._pts_to_secs(pts)

_keyframe_index_at_or_before_pts(target_pts)

Return keyframe index at or before the target PTS.

Source code in acvr/_pyav_backend.py
def _keyframe_index_at_or_before_pts(self, target_pts: int) -> int:
    """Return keyframe index at or before the target PTS."""

    kf = self._keyframes
    if not self._index_built or not kf:
        return 0
    if target_pts <= kf[0].pts:
        return 0
    if target_pts >= kf[-1].pts:
        return len(kf) - 1

    lo, hi = 0, len(kf) - 1
    while lo <= hi:
        mid = (lo + hi) // 2
        m = kf[mid].pts
        if m == target_pts:
            return mid
        if m < target_pts:
            lo = mid + 1
        else:
            hi = mid - 1
    return hi

_keyframe_index_for_time_fast(t_s, mode)

Return a keyframe index using cached time buckets.

Source code in acvr/_pyav_backend.py
def _keyframe_index_for_time_fast(self, t_s: float, mode: str) -> int:
    """Return a keyframe index using cached time buckets."""

    if not self._index_built:
        raise RuntimeError("Keyframe index not built. Call build_keyframe_index() first.")

    b = self._bucket_key(t_s)

    mode_tag = {"previous": 0, "nearest": 1, "next": 2}.get(mode)
    if mode_tag is None:
        raise ValueError("mode must be one of: 'previous', 'nearest', 'next'")
    cache_key = (b << 2) | mode_tag

    cached = self._bucket_to_kfidx.get(cache_key)
    if cached is not None:
        return int(cached)

    target_pts = self._secs_to_pts(t_s)

    if mode == "previous":
        idx = self._keyframe_index_at_or_before_pts(target_pts)
    elif mode == "nearest":
        idx = self._keyframe_index_nearest_pts(target_pts)
    else:
        i_prev = self._keyframe_index_at_or_before_pts(target_pts)
        if self._keyframes[i_prev].pts >= target_pts:
            idx = i_prev
        else:
            idx = min(i_prev + 1, len(self._keyframes) - 1)

    self._bucket_to_kfidx.put(cache_key, idx)
    return idx

_keyframe_index_nearest_pts(target_pts)

Return nearest keyframe index to the target PTS.

Source code in acvr/_pyav_backend.py
def _keyframe_index_nearest_pts(self, target_pts: int) -> int:
    """Return nearest keyframe index to the target PTS."""

    kf = self._keyframes
    if not self._index_built or not kf:
        return 0
    i0 = self._keyframe_index_at_or_before_pts(target_pts)
    i1 = min(i0 + 1, len(kf) - 1)
    if i0 == i1:
        return i0
    d0 = abs(kf[i0].pts - target_pts)
    d1 = abs(kf[i1].pts - target_pts)
    return i0 if d0 <= d1 else i1

_pts_to_frame_number(pts, fps)

Convert a PTS value to a rounded frame number.

Source code in acvr/_pyav_backend.py
def _pts_to_frame_number(self, pts: Optional[int], fps: float) -> Optional[int]:
    """Convert a PTS value to a rounded frame number."""

    if pts is None:
        return None
    return int(round(self._pts_to_secs(int(pts)) * fps))

_pts_to_secs(pts)

Convert presentation timestamp units to seconds.

Source code in acvr/_pyav_backend.py
def _pts_to_secs(self, pts: int) -> float:
    """Convert presentation timestamp units to seconds."""

    return float((pts - self._start_pts) * self._time_base)

_read_frame_by_pts(target_pts)

Decode the first frame at or after a target PTS.

Source code in acvr/_pyav_backend.py
def _read_frame_by_pts(self, target_pts: int) -> DecodedFrame:
    """Decode the first frame at or after a target PTS."""

    cached = self._frame_cache.get(target_pts)
    if cached is not None:
        return cached  # type: ignore[return-value]

    if self._index_built:
        idx = self._keyframe_index_at_or_before_pts(target_pts)
        seek_pts = self._keyframes[idx].pts
    else:
        seek_pts = target_pts

    container = av.open(self._path)
    stream = container.streams.video[self._stream.index]
    try:
        container.seek(seek_pts, stream=stream, backward=True, any_frame=False)
        try:
            stream.codec_context.flush_buffers()
        except Exception:
            pass

        last: Optional[DecodedFrame] = None
        for packet in container.demux(stream):
            for frame in packet.decode():
                pts = frame.pts
                cur = DecodedFrame(
                    image=frame.to_rgb().to_ndarray(),
                    pts=pts,
                    time_s=self._frame_time_s(pts),
                    key_frame=bool(getattr(frame, "key_frame", False)),
                )
                if pts is not None:
                    self._frame_cache.put(int(pts), cur)
                if pts is None:
                    last = cur
                    continue
                if pts >= target_pts:
                    return cur
                last = cur
    finally:
        container.close()

    if last is not None:
        return last
    raise RuntimeError("Could not decode any frames after seeking.")

_read_frame_fast_opencv_pyav(target_frame, *, decode_rgb)

Approximate OpenCV seek behavior using PyAV.

Source code in acvr/_pyav_backend.py
def _read_frame_fast_opencv_pyav(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
    """Approximate OpenCV seek behavior using PyAV."""

    fps = self._frame_rate or 1.0
    self._ensure_fast_container()
    assert self._fast_container is not None
    assert self._fast_stream is not None

    def seek_to_frame(frame_index: int) -> None:
        target_pts = self._secs_to_pts(frame_index / fps)
        self._fast_container.seek(
            target_pts,
            stream=self._fast_stream,
            backward=True,
            any_frame=False,
        )
        try:
            self._fast_stream.codec_context.flush_buffers()
        except Exception:
            pass

    def frame_number_from_pts(pts: Optional[int]) -> Optional[int]:
        num = self._pts_to_frame_number(pts, fps)
        if num is None:
            return None
        if self._fast_first_frame_number is None:
            return num
        return num - self._fast_first_frame_number

    first_frame = None
    if self._fast_first_frame_number is None:
        seek_to_frame(0)
        for frame in self._fast_container.decode(self._fast_stream):
            pts = frame.pts if frame.pts is not None else frame.dts
            self._fast_first_frame_number = self._pts_to_frame_number(pts, fps) or 0
            first_frame = frame
            break

    if target_frame <= 0:
        if first_frame is None:
            seek_to_frame(0)
            for frame in self._fast_container.decode(self._fast_stream):
                first_frame = frame
                break
        if first_frame is None:
            raise RuntimeError("Failed to decode a frame after fast seek.")
        pts = first_frame.pts if first_frame.pts is not None else first_frame.dts
        img = first_frame.to_rgb().to_ndarray() if decode_rgb else first_frame.to_ndarray(format="bgr24")
        cur = DecodedFrame(
            image=img,
            pts=pts,
            time_s=self._frame_time_s(pts),
            key_frame=bool(getattr(first_frame, "key_frame", False)),
        )
        if pts is not None:
            self._frame_cache.put(int(pts), cur)
        return cur

    delta = 16
    attempts = 0
    while True:
        start_frame = max(target_frame - delta, 0)
        seek_to_frame(start_frame)
        decoder = self._fast_container.decode(self._fast_stream)
        try:
            frame = next(decoder)
        except StopIteration:
            break

        pts = frame.pts if frame.pts is not None else frame.dts
        frame_number = frame_number_from_pts(pts)
        if frame_number is None:
            frame_number = start_frame

        if frame_number < 0 or frame_number > target_frame:
            if start_frame == 0 or delta >= 1 << 30 or attempts > 20:
                break
            delta = delta * 2 if delta < 16 else int(delta * 1.5)
            attempts += 1
            continue

        while frame_number < target_frame:
            try:
                frame = next(decoder)
            except StopIteration:
                frame = None
                break
            pts = frame.pts if frame.pts is not None else frame.dts
            frame_number = frame_number_from_pts(pts)
            if frame_number is None:
                frame_number = target_frame

        if frame is None:
            break

        pts = frame.pts if frame.pts is not None else frame.dts
        img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
        cur = DecodedFrame(
            image=img,
            pts=pts,
            time_s=self._frame_time_s(pts),
            key_frame=bool(getattr(frame, "key_frame", False)),
        )
        if pts is not None:
            self._frame_cache.put(int(pts), cur)
        return cur

    target_pts = self._secs_to_pts(target_frame / fps)
    return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)

_read_frame_fast_simple(target_pts, *, decode_rgb)

Fallback fast seek: seek and decode first frame after PTS.

Source code in acvr/_pyav_backend.py
def _read_frame_fast_simple(self, target_pts: int, *, decode_rgb: bool) -> DecodedFrame:
    """Fallback fast seek: seek and decode first frame after PTS."""

    self._ensure_fast_container()
    assert self._fast_container is not None
    assert self._fast_stream is not None

    def grab_frame(container: av.container.InputContainer, stream: av.video.stream.VideoStream) -> Optional[DecodedFrame]:
        for frame in container.decode(stream):
            pts = frame.pts if frame.pts is not None else frame.dts
            if pts is None:
                target_reached = True
            else:
                target_reached = pts >= target_pts
            if not target_reached:
                continue
            if decode_rgb:
                img = frame.to_rgb().to_ndarray()
            else:
                img = frame.to_ndarray(format="bgr24")
            cur = DecodedFrame(
                image=img,
                pts=pts,
                time_s=self._frame_time_s(pts),
                key_frame=bool(getattr(frame, "key_frame", False)),
            )
            if pts is not None:
                self._frame_cache.put(int(pts), cur)
            return cur
        return None

    self._fast_container.seek(
        target_pts,
        stream=self._fast_stream,
        backward=True,
        any_frame=True,
    )
    try:
        self._fast_stream.codec_context.flush_buffers()
    except Exception:
        pass
    grabbed = grab_frame(self._fast_container, self._fast_stream)
    if grabbed is not None:
        return grabbed

    self._fast_container.seek(
        target_pts,
        stream=self._fast_stream,
        backward=True,
        any_frame=False,
    )
    try:
        self._fast_stream.codec_context.flush_buffers()
    except Exception:
        pass
    grabbed = grab_frame(self._fast_container, self._fast_stream)
    if grabbed is not None:
        return grabbed

    raise RuntimeError("Failed to decode a frame after fast seek.")

_secs_to_pts(t_s)

Convert seconds to presentation timestamp units.

Source code in acvr/_pyav_backend.py
def _secs_to_pts(self, t_s: float) -> int:
    """Convert seconds to presentation timestamp units."""

    ticks = int(round(t_s / float(self._time_base)))
    return self._start_pts + ticks

_seek_to_pts(pts, *, backward)

Seek to a timestamp in the stream.

Source code in acvr/_pyav_backend.py
def _seek_to_pts(self, pts: int, *, backward: bool) -> None:
    """Seek to a timestamp in the stream."""

    self._container.seek(pts, stream=self._stream, backward=backward, any_frame=False)
    self._flush_decoder()

build_keyframe_index(*, max_packets=None)

Scan packets and store keyframe pts/time.

Source code in acvr/_pyav_backend.py
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
    """Scan packets and store keyframe pts/time."""

    path = self._container.name
    idx_container = av.open(path)
    idx_stream = idx_container.streams.video[self._stream.index]

    key_pts: List[int] = []
    n = 0
    for packet in idx_container.demux(idx_stream):
        if packet.dts is None and packet.pts is None:
            continue
        if packet.is_keyframe:
            pts = packet.pts if packet.pts is not None else packet.dts
            if pts is not None:
                key_pts.append(int(pts))
        n += 1
        if max_packets is not None and n >= max_packets:
            break

    idx_container.close()

    key_pts = sorted(set(key_pts))
    if not key_pts:
        key_pts = [self._start_pts]

    self._keyframes = [KeyframeEntry(pts=p, time_s=self._pts_to_secs(p)) for p in key_pts]
    self._index_built = True

    self._bucket_to_kfidx.clear()
    return self._keyframes

close()

Close the underlying PyAV container.

Source code in acvr/_pyav_backend.py
def close(self) -> None:
    """Close the underlying PyAV container."""

    self._container.close()
    if self._fast_container is not None:
        self._fast_container.close()
        self._fast_container = None
        self._fast_stream = None
    self._fast_first_frame_number = None

frame_at_index(index)

Return the decoded frame at a zero-based index.

Source code in acvr/_pyav_backend.py
def frame_at_index(self, index: int) -> np.ndarray:
    """Return the decoded frame at a zero-based index."""

    self._ensure_frame_pts()
    assert self._frame_pts is not None
    if index < 0:
        index += self._frame_count
    if index < 0 or index >= self._frame_count:
        raise IndexError("frame index out of range")
    target_pts = self._frame_pts[index]
    decoded = self._read_frame_by_pts(target_pts)
    self._current_frame_pos = float(index)
    return decoded.image

read_frame_at(t_s, *, return_first_after=True, max_decode_frames=10000, use_index=True)

Decode a frame near a timestamp with accurate seeking.

Source code in acvr/_pyav_backend.py
def read_frame_at(
    self,
    t_s: Number,
    *,
    return_first_after: bool = True,
    max_decode_frames: int = 10_000,
    use_index: bool = True,
) -> DecodedFrame:
    """Decode a frame near a timestamp with accurate seeking."""

    t_s = float(t_s)
    target_pts = self._secs_to_pts(t_s)

    cached = self._frame_cache.get(target_pts)
    if cached is not None:
        return cached  # type: ignore[return-value]

    if use_index and self._index_built:
        idx = self._keyframe_index_at_or_before_pts(target_pts)
        anchor_pts = self._keyframes[idx].pts
        self._seek_to_pts(anchor_pts, backward=False)
    else:
        self._seek_to_pts(target_pts, backward=True)

    last: Optional[DecodedFrame] = None
    decoded = 0

    for packet in self._container.demux(self._stream):
        for frame in packet.decode():
            decoded += 1
            if decoded > max_decode_frames:
                raise RuntimeError(
                    "Exceeded max_decode_frames while seeking; timestamps may be broken."
                )

            pts = frame.pts
            cur = DecodedFrame(
                image=frame.to_rgb().to_ndarray(),
                pts=pts,
                time_s=self._frame_time_s(pts),
                key_frame=bool(getattr(frame, "key_frame", False)),
            )
            if pts is not None:
                self._frame_cache.put(int(pts), cur)

            if pts is None:
                last = cur
                continue

            if return_first_after:
                if pts >= target_pts:
                    return cur
                last = cur
            else:
                if pts <= target_pts:
                    last = cur
                elif last is not None:
                    return last

    if last is not None:
        return last
    raise RuntimeError("Could not decode any frames after seeking.")

read_frame_fast(*, index=None, t_s=None, decode_rgb=False)

Return a fast, approximate frame for an index or timestamp.

Source code in acvr/_pyav_backend.py
def read_frame_fast(
    self,
    *,
    index: Optional[int] = None,
    t_s: Optional[Number] = None,
    decode_rgb: bool = False,
) -> DecodedFrame:
    """Return a fast, approximate frame for an index or timestamp."""

    if index is None and t_s is None:
        raise ValueError("Provide either index or t_s")
    if index is not None and t_s is not None:
        raise ValueError("Provide only one of index or t_s")

    if t_s is None:
        if index is None:
            raise ValueError("Provide either index or t_s")
        if index < 0:
            index += self.number_of_frames
        target_index = int(index)
    else:
        t_s = float(t_s)
        target_index = int(round(t_s * (self._frame_rate or 1.0)))

    fps = self._frame_rate or 1.0
    target_pts = self._secs_to_pts(target_index / fps)

    cached = self._frame_cache.get(target_pts)
    if cached is not None:
        return cached  # type: ignore[return-value]

    return self._read_frame_fast_opencv_pyav(target_index, decode_rgb=decode_rgb)

read_keyframe_at(t_s, *, mode='previous', decode_rgb=True)

Return a nearby keyframe without GOP forward decoding.

Source code in acvr/_pyav_backend.py
def read_keyframe_at(
    self,
    t_s: Number,
    *,
    mode: str = "previous",
    decode_rgb: bool = True,
) -> DecodedFrame:
    """Return a nearby keyframe without GOP forward decoding."""

    t_s = float(t_s)
    idx = self._keyframe_index_for_time_fast(t_s, mode)
    key_pts = self._keyframes[idx].pts

    cached = self._frame_cache.get(key_pts)
    if cached is not None:
        return cached  # type: ignore[return-value]

    self._seek_to_pts(key_pts, backward=False)

    for packet in self._container.demux(self._stream):
        for frame in packet.decode():
            pts = frame.pts
            img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray()
            cur = DecodedFrame(
                image=img,
                pts=pts,
                time_s=self._frame_time_s(pts),
                key_frame=bool(getattr(frame, "key_frame", False)),
            )
            if pts is not None:
                self._frame_cache.put(int(pts), cur)
            return cur

    raise RuntimeError("Failed to decode a frame after keyframe seek.")