API
VideoReader
High-level video reader with array-style access.
Source code in acvr/reader.py
class VideoReader:
"""High-level video reader with array-style access."""
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = True,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 25,
scrub_bucket_lru_size: int = 4096,
threading: bool = True,
thread_count: int = 0,
index_policy: str = "decode",
) -> None:
"""Create a reader for the given video path.
Args:
path: Path to the video file to open.
video_stream_index: Video stream index to decode.
build_index: Whether to build a keyframe index on initialization (default
True); can speed up accurate random seeks but adds upfront cost.
speed up accurate random seeks but adds upfront cost.
decoded_frame_cache_size: Number of decoded frames to keep in an
in-memory LRU cache; helpful for repeated access to nearby frames.
scrub_bucket_ms: Bucket size (milliseconds) used to group timestamps
for fast scrub queries.
scrub_bucket_lru_size: LRU size for the scrub bucket cache.
threading: Whether to enable threaded decoding in the backend.
thread_count: Number of decoding threads (0 lets backend decide).
index_policy: Indexing policy, either ``"decode"`` for decode-order
frames or ``"timeline"`` for timestamp-based access.
Raises:
ValueError: If ``index_policy`` is not ``"decode"`` or ``"timeline"``.
"""
self._backend = PyAVVideoBackend(
path,
video_stream_index=video_stream_index,
build_index=build_index,
decoded_frame_cache_size=decoded_frame_cache_size,
scrub_bucket_ms=scrub_bucket_ms,
scrub_bucket_lru_size=scrub_bucket_lru_size,
threading=threading,
thread_count=thread_count,
)
if index_policy not in {"decode", "timeline"}:
raise ValueError("index_policy must be 'decode' or 'timeline'")
self._index_policy = index_policy
def close(self) -> None:
"""Close the underlying video resources."""
self._backend.close()
def __enter__(self) -> "VideoReader":
"""Return self for context manager usage."""
return self
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the reader when leaving a context manager.
Args:
exc_type: Exception type, if any.
exc: Exception instance, if any.
tb: Traceback, if any.
"""
self.close()
def __len__(self) -> int:
"""Return the number of frames in the video."""
return self.number_of_frames
def __getitem__(self, key: IndexKey) -> Union[np.ndarray, List[np.ndarray]]:
"""Return a frame or list of frames for the given index or slice.
Indexing semantics are controlled by `index_policy`:
- 'decode' (default): index refers to decode-order frame number.
- 'timeline': index is mapped to timestamp using nominal FPS, and an accurate
timestamp seek is performed.
Args:
key: Frame index or slice to retrieve.
Returns:
A single frame array or list of frame arrays.
"""
if isinstance(key, slice):
start, stop, step = key.indices(self.number_of_frames)
return [self[i] for i in range(start, stop, step)]
i = int(key)
if self._index_policy == "decode":
return self._backend.frame_at_index(i)
# timeline policy
fps = self.nominal_frame_rate or self.frame_rate or 1.0
t_s = float(i) / fps
return self._backend.read_frame_at(t_s).image
def __iter__(self) -> Iterator[np.ndarray]:
"""Iterate over all frames in the video."""
return self.iter_frames()
@property
def frame_height(self) -> int:
"""Return the frame height in pixels."""
return self._backend.frame_height
@property
def frame_width(self) -> int:
"""Return the frame width in pixels."""
return self._backend.frame_width
@property
def frame_rate(self) -> float:
"""Return the video frame rate."""
return self._backend.frame_rate
@property
def nominal_frame_rate(self) -> float:
"""Return the nominal video frame rate (guessed_rate when available)."""
return self._backend.nominal_frame_rate
@property
def fourcc(self) -> int:
"""Return the fourcc codec identifier."""
return self._backend.fourcc
@property
def frame_format(self) -> int:
"""Return the pixel format identifier."""
return self._backend.frame_format
@property
def number_of_frames(self) -> int:
"""Return the total number of frames."""
return self._backend.number_of_frames
@property
def frame_shape(self) -> tuple:
"""Return the expected frame shape (H, W, C)."""
return self._backend.frame_shape
@property
def current_frame_pos(self) -> float:
"""Return the last accessed frame index."""
return self._backend.current_frame_pos
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Build a keyframe index for faster random access.
Args:
max_packets: Optional cap on packets to inspect.
Returns:
A list of keyframe entries.
"""
return self._backend.build_keyframe_index(max_packets=max_packets)
def read_keyframe_at(
self,
t_s: float,
*,
mode: str = "nearest",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe for fast scrubbing.
Args:
t_s: Timestamp in seconds to seek around.
mode: Selection mode (``"nearest"``, ``"before"``, or ``"after"``).
decode_rgb: Whether to decode into RGB arrays.
Returns:
The decoded keyframe.
"""
return self._backend.read_keyframe_at(t_s, mode=mode, decode_rgb=decode_rgb)
def read_frame_at(
self,
t_s: float,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Return a frame at a timestamp with accurate seeking.
Args:
t_s: Timestamp in seconds to seek to.
return_first_after: Return the first frame after the timestamp.
max_decode_frames: Cap on frames to decode while seeking.
use_index: Whether to use the keyframe index if available.
Returns:
The decoded frame at the target timestamp.
"""
return self._backend.read_frame_at(
t_s,
return_first_after=return_first_after,
max_decode_frames=max_decode_frames,
use_index=use_index,
)
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
decode_rgb: bool = True,
use_sequential: bool = True,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp.
Args:
index: Decode-order frame index to seek to.
t_s: Timestamp in seconds to seek to.
decode_rgb: Whether to decode into RGB arrays.
use_sequential: Allow sequential decoding when available.
Returns:
The decoded frame closest to the request.
"""
return self._backend.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
use_sequential=use_sequential,
)
def read_next(self, *, decode_rgb: bool = True) -> np.ndarray:
"""Return the next frame using sequential decoding.
Args:
decode_rgb: Whether to decode into RGB arrays.
Returns:
The next decoded frame image.
"""
return self._backend.read_next_frame(decode_rgb=decode_rgb).image
def iter_frames(self, *, decode_rgb: bool = True) -> Iterator[np.ndarray]:
"""Iterate frames sequentially without seeking."""
for frame in self._backend.iter_frames(decode_rgb=decode_rgb):
yield frame.image
# Public PTS/time helpers
def pts_at_index(self, index: int) -> Optional[int]:
"""Return the PTS for a given frame index."""
return self._backend.pts_at_index(int(index))
def time_at_index(self, index: int) -> float:
"""Return the timestamp (seconds) for a given frame index."""
return self._backend.time_at_index(int(index))
def index_from_pts(self, pts: int) -> int:
"""Return the nearest frame index for a PTS value."""
return self._backend.index_from_pts(int(pts))
def index_from_time(self, t_s: float) -> int:
"""Return the nearest frame index for a timestamp in seconds."""
return self._backend.index_from_time(float(t_s))
def read_frame(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
mode: str = "accurate",
decode_rgb: bool = True,
keyframe_mode: str = "nearest",
use_sequential: bool = True,
) -> DecodedFrame:
"""Read a frame using a selectable access mode."""
return self._backend.read_frame(
index=index,
t_s=t_s,
mode=mode,
decode_rgb=decode_rgb,
keyframe_mode=keyframe_mode,
use_sequential=use_sequential,
)
_backend = PyAVVideoBackend(path, video_stream_index=video_stream_index, build_index=build_index, decoded_frame_cache_size=decoded_frame_cache_size, scrub_bucket_ms=scrub_bucket_ms, scrub_bucket_lru_size=scrub_bucket_lru_size, threading=threading, thread_count=thread_count)
instance-attribute
_index_policy = index_policy
instance-attribute
current_frame_pos
property
Return the last accessed frame index.
fourcc
property
Return the fourcc codec identifier.
frame_format
property
Return the pixel format identifier.
frame_height
property
Return the frame height in pixels.
frame_rate
property
Return the video frame rate.
frame_shape
property
Return the expected frame shape (H, W, C).
frame_width
property
Return the frame width in pixels.
nominal_frame_rate
property
Return the nominal video frame rate (guessed_rate when available).
number_of_frames
property
Return the total number of frames.
__enter__()
Return self for context manager usage.
Source code in acvr/reader.py
def __enter__(self) -> "VideoReader":
"""Return self for context manager usage."""
return self
__exit__(exc_type, exc, tb)
Close the reader when leaving a context manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc_type
|
Exception type, if any. |
required | |
exc
|
Exception instance, if any. |
required | |
tb
|
Traceback, if any. |
required |
Source code in acvr/reader.py
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the reader when leaving a context manager.
Args:
exc_type: Exception type, if any.
exc: Exception instance, if any.
tb: Traceback, if any.
"""
self.close()
__getitem__(key)
Return a frame or list of frames for the given index or slice.
Indexing semantics are controlled by index_policy:
- 'decode' (default): index refers to decode-order frame number.
- 'timeline': index is mapped to timestamp using nominal FPS, and an accurate
timestamp seek is performed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
IndexKey
|
Frame index or slice to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Union[ndarray, List[ndarray]]
|
A single frame array or list of frame arrays. |
Source code in acvr/reader.py
def __getitem__(self, key: IndexKey) -> Union[np.ndarray, List[np.ndarray]]:
"""Return a frame or list of frames for the given index or slice.
Indexing semantics are controlled by `index_policy`:
- 'decode' (default): index refers to decode-order frame number.
- 'timeline': index is mapped to timestamp using nominal FPS, and an accurate
timestamp seek is performed.
Args:
key: Frame index or slice to retrieve.
Returns:
A single frame array or list of frame arrays.
"""
if isinstance(key, slice):
start, stop, step = key.indices(self.number_of_frames)
return [self[i] for i in range(start, stop, step)]
i = int(key)
if self._index_policy == "decode":
return self._backend.frame_at_index(i)
# timeline policy
fps = self.nominal_frame_rate or self.frame_rate or 1.0
t_s = float(i) / fps
return self._backend.read_frame_at(t_s).image
__init__(path, video_stream_index=0, *, build_index=True, decoded_frame_cache_size=0, scrub_bucket_ms=25, scrub_bucket_lru_size=4096, threading=True, thread_count=0, index_policy='decode')
Create a reader for the given video path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to the video file to open. |
required |
video_stream_index
|
int
|
Video stream index to decode. |
0
|
build_index
|
bool
|
Whether to build a keyframe index on initialization (default True); can speed up accurate random seeks but adds upfront cost. speed up accurate random seeks but adds upfront cost. |
True
|
decoded_frame_cache_size
|
int
|
Number of decoded frames to keep in an in-memory LRU cache; helpful for repeated access to nearby frames. |
0
|
scrub_bucket_ms
|
int
|
Bucket size (milliseconds) used to group timestamps for fast scrub queries. |
25
|
scrub_bucket_lru_size
|
int
|
LRU size for the scrub bucket cache. |
4096
|
threading
|
bool
|
Whether to enable threaded decoding in the backend. |
True
|
thread_count
|
int
|
Number of decoding threads (0 lets backend decide). |
0
|
index_policy
|
str
|
Indexing policy, either |
'decode'
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in acvr/reader.py
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = True,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 25,
scrub_bucket_lru_size: int = 4096,
threading: bool = True,
thread_count: int = 0,
index_policy: str = "decode",
) -> None:
"""Create a reader for the given video path.
Args:
path: Path to the video file to open.
video_stream_index: Video stream index to decode.
build_index: Whether to build a keyframe index on initialization (default
True); can speed up accurate random seeks but adds upfront cost.
speed up accurate random seeks but adds upfront cost.
decoded_frame_cache_size: Number of decoded frames to keep in an
in-memory LRU cache; helpful for repeated access to nearby frames.
scrub_bucket_ms: Bucket size (milliseconds) used to group timestamps
for fast scrub queries.
scrub_bucket_lru_size: LRU size for the scrub bucket cache.
threading: Whether to enable threaded decoding in the backend.
thread_count: Number of decoding threads (0 lets backend decide).
index_policy: Indexing policy, either ``"decode"`` for decode-order
frames or ``"timeline"`` for timestamp-based access.
Raises:
ValueError: If ``index_policy`` is not ``"decode"`` or ``"timeline"``.
"""
self._backend = PyAVVideoBackend(
path,
video_stream_index=video_stream_index,
build_index=build_index,
decoded_frame_cache_size=decoded_frame_cache_size,
scrub_bucket_ms=scrub_bucket_ms,
scrub_bucket_lru_size=scrub_bucket_lru_size,
threading=threading,
thread_count=thread_count,
)
if index_policy not in {"decode", "timeline"}:
raise ValueError("index_policy must be 'decode' or 'timeline'")
self._index_policy = index_policy
__iter__()
Iterate over all frames in the video.
Source code in acvr/reader.py
def __iter__(self) -> Iterator[np.ndarray]:
"""Iterate over all frames in the video."""
return self.iter_frames()
__len__()
Return the number of frames in the video.
Source code in acvr/reader.py
def __len__(self) -> int:
"""Return the number of frames in the video."""
return self.number_of_frames
build_keyframe_index(*, max_packets=None)
Build a keyframe index for faster random access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_packets
|
Optional[int]
|
Optional cap on packets to inspect. |
None
|
Returns:
| Type | Description |
|---|---|
List[KeyframeEntry]
|
A list of keyframe entries. |
Source code in acvr/reader.py
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Build a keyframe index for faster random access.
Args:
max_packets: Optional cap on packets to inspect.
Returns:
A list of keyframe entries.
"""
return self._backend.build_keyframe_index(max_packets=max_packets)
close()
Close the underlying video resources.
Source code in acvr/reader.py
def close(self) -> None:
"""Close the underlying video resources."""
self._backend.close()
index_from_pts(pts)
Return the nearest frame index for a PTS value.
Source code in acvr/reader.py
def index_from_pts(self, pts: int) -> int:
"""Return the nearest frame index for a PTS value."""
return self._backend.index_from_pts(int(pts))
index_from_time(t_s)
Return the nearest frame index for a timestamp in seconds.
Source code in acvr/reader.py
def index_from_time(self, t_s: float) -> int:
"""Return the nearest frame index for a timestamp in seconds."""
return self._backend.index_from_time(float(t_s))
iter_frames(*, decode_rgb=True)
Iterate frames sequentially without seeking.
Source code in acvr/reader.py
def iter_frames(self, *, decode_rgb: bool = True) -> Iterator[np.ndarray]:
"""Iterate frames sequentially without seeking."""
for frame in self._backend.iter_frames(decode_rgb=decode_rgb):
yield frame.image
pts_at_index(index)
Return the PTS for a given frame index.
Source code in acvr/reader.py
def pts_at_index(self, index: int) -> Optional[int]:
"""Return the PTS for a given frame index."""
return self._backend.pts_at_index(int(index))
read_frame(*, index=None, t_s=None, mode='accurate', decode_rgb=True, keyframe_mode='nearest', use_sequential=True)
Read a frame using a selectable access mode.
Source code in acvr/reader.py
def read_frame(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
mode: str = "accurate",
decode_rgb: bool = True,
keyframe_mode: str = "nearest",
use_sequential: bool = True,
) -> DecodedFrame:
"""Read a frame using a selectable access mode."""
return self._backend.read_frame(
index=index,
t_s=t_s,
mode=mode,
decode_rgb=decode_rgb,
keyframe_mode=keyframe_mode,
use_sequential=use_sequential,
)
read_frame_at(t_s, *, return_first_after=True, max_decode_frames=10000, use_index=True)
Return a frame at a timestamp with accurate seeking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
t_s
|
float
|
Timestamp in seconds to seek to. |
required |
return_first_after
|
bool
|
Return the first frame after the timestamp. |
True
|
max_decode_frames
|
int
|
Cap on frames to decode while seeking. |
10000
|
use_index
|
bool
|
Whether to use the keyframe index if available. |
True
|
Returns:
| Type | Description |
|---|---|
DecodedFrame
|
The decoded frame at the target timestamp. |
Source code in acvr/reader.py
def read_frame_at(
self,
t_s: float,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Return a frame at a timestamp with accurate seeking.
Args:
t_s: Timestamp in seconds to seek to.
return_first_after: Return the first frame after the timestamp.
max_decode_frames: Cap on frames to decode while seeking.
use_index: Whether to use the keyframe index if available.
Returns:
The decoded frame at the target timestamp.
"""
return self._backend.read_frame_at(
t_s,
return_first_after=return_first_after,
max_decode_frames=max_decode_frames,
use_index=use_index,
)
read_frame_fast(*, index=None, t_s=None, decode_rgb=True, use_sequential=True)
Return a fast, approximate frame for an index or timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
index
|
Optional[int]
|
Decode-order frame index to seek to. |
None
|
t_s
|
Optional[float]
|
Timestamp in seconds to seek to. |
None
|
decode_rgb
|
bool
|
Whether to decode into RGB arrays. |
True
|
use_sequential
|
bool
|
Allow sequential decoding when available. |
True
|
Returns:
| Type | Description |
|---|---|
DecodedFrame
|
The decoded frame closest to the request. |
Source code in acvr/reader.py
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
decode_rgb: bool = True,
use_sequential: bool = True,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp.
Args:
index: Decode-order frame index to seek to.
t_s: Timestamp in seconds to seek to.
decode_rgb: Whether to decode into RGB arrays.
use_sequential: Allow sequential decoding when available.
Returns:
The decoded frame closest to the request.
"""
return self._backend.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
use_sequential=use_sequential,
)
read_keyframe_at(t_s, *, mode='nearest', decode_rgb=True)
Return a nearby keyframe for fast scrubbing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
t_s
|
float
|
Timestamp in seconds to seek around. |
required |
mode
|
str
|
Selection mode ( |
'nearest'
|
decode_rgb
|
bool
|
Whether to decode into RGB arrays. |
True
|
Returns:
| Type | Description |
|---|---|
DecodedFrame
|
The decoded keyframe. |
Source code in acvr/reader.py
def read_keyframe_at(
self,
t_s: float,
*,
mode: str = "nearest",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe for fast scrubbing.
Args:
t_s: Timestamp in seconds to seek around.
mode: Selection mode (``"nearest"``, ``"before"``, or ``"after"``).
decode_rgb: Whether to decode into RGB arrays.
Returns:
The decoded keyframe.
"""
return self._backend.read_keyframe_at(t_s, mode=mode, decode_rgb=decode_rgb)
read_next(*, decode_rgb=True)
Return the next frame using sequential decoding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
decode_rgb
|
bool
|
Whether to decode into RGB arrays. |
True
|
Returns:
| Type | Description |
|---|---|
ndarray
|
The next decoded frame image. |
Source code in acvr/reader.py
def read_next(self, *, decode_rgb: bool = True) -> np.ndarray:
"""Return the next frame using sequential decoding.
Args:
decode_rgb: Whether to decode into RGB arrays.
Returns:
The next decoded frame image.
"""
return self._backend.read_next_frame(decode_rgb=decode_rgb).image
time_at_index(index)
Return the timestamp (seconds) for a given frame index.
Source code in acvr/reader.py
def time_at_index(self, index: int) -> float:
"""Return the timestamp (seconds) for a given frame index."""
return self._backend.time_at_index(int(index))
Backend implementation
Frame-accurate seeking with keyframe index and scrub acceleration.
Source code in acvr/_pyav_backend.py
class PyAVVideoBackend:
"""Frame-accurate seeking with keyframe index and scrub acceleration."""
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = False,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 25,
scrub_bucket_lru_size: int = 4096,
threading: bool = True,
thread_count: int = 0,
) -> None:
"""Initialize the PyAV-backed decoder."""
self._path = path
self._container = av.open(path)
self._stream = self._container.streams.video[video_stream_index]
self._codec_ctx = self._stream.codec_context
self._fast_container: Optional[av.container.InputContainer] = None
self._fast_stream: Optional[av.video.stream.VideoStream] = None
self._fast_first_frame_number: Optional[int] = None
self._fast_decoder = None
self._fast_last_pts: Optional[int] = None
self._seq_container: Optional[av.container.InputContainer] = None
self._seq_stream: Optional[av.video.stream.VideoStream] = None
self._seq_decoder = None
self._seq_frame_index: int = 0
self._last_index: Optional[int] = None
self._last_fast_index: Optional[int] = None
self._time_base: Fraction = self._stream.time_base
self._start_pts: int = self._stream.start_time if self._stream.start_time is not None else 0
self._keyframes: List[KeyframeEntry] = []
self._index_built: bool = False
self._frame_pts: Optional[List[int]] = None
self._frame_count: int = int(self._stream.frames or 0)
self._current_frame_pos: float = 0.0
self._frame_cache = _LRU(decoded_frame_cache_size)
self._scrub_bucket_ms = max(1, int(scrub_bucket_ms))
self._bucket_to_kfidx = _LRU(scrub_bucket_lru_size)
self._threading = bool(threading)
self._thread_count = int(thread_count)
if build_index:
self.build_keyframe_index()
self._frame_height = int(self._stream.height or 0)
self._frame_width = int(self._stream.width or 0)
self._frame_shape = (self._frame_height, self._frame_width, 3)
self._frame_rate = self._compute_frame_rate()
self._nominal_frame_rate = self._compute_nominal_frame_rate()
self._fourcc = self._compute_fourcc()
self._frame_format = 0
def close(self) -> None:
"""Close the underlying PyAV container."""
self._container.close()
if self._fast_container is not None:
self._fast_container.close()
self._fast_container = None
self._fast_stream = None
self._fast_decoder = None
self._fast_last_pts = None
self._fast_first_frame_number = None
if self._seq_container is not None:
self._seq_container.close()
self._seq_container = None
self._seq_stream = None
self._seq_decoder = None
self._seq_frame_index = 0
self._last_index = None
self._last_fast_index = None
def __enter__(self) -> "PyAVVideoBackend":
"""Return self for context manager usage."""
return self
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the backend on exit from a context manager."""
self.close()
def _secs_to_pts(self, t_s: float) -> int:
"""Convert seconds to presentation timestamp units."""
ticks = int(round(t_s / float(self._time_base)))
return self._start_pts + ticks
def _pts_to_secs(self, pts: int) -> float:
"""Convert presentation timestamp units to seconds."""
return float((pts - self._start_pts) * self._time_base)
def _pts_to_frame_number(self, pts: Optional[int], fps: float) -> Optional[int]:
"""Convert a PTS value to a rounded frame number."""
if pts is None:
return None
return int(round(self._pts_to_secs(int(pts)) * fps))
def _frame_time_s(self, pts: Optional[int]) -> float:
"""Return the timestamp for a frame PTS."""
return float("nan") if pts is None else self._pts_to_secs(pts)
def _flush_decoder(self) -> None:
"""Flush decoder buffers if supported."""
try:
self._codec_ctx.flush_buffers()
except Exception:
pass
def _compute_frame_rate(self) -> float:
"""Compute the stream frame rate in frames per second."""
rate = self._stream.average_rate or self._stream.base_rate
return float(rate) if rate is not None else 0.0
def _compute_nominal_frame_rate(self) -> float:
"""Compute a nominal frame rate preferring guessed_rate (useful for VFR)."""
rate = getattr(self._stream, "guessed_rate", None) or self._stream.average_rate or self._stream.base_rate
return float(rate) if rate is not None else 0.0
@property
def nominal_frame_rate(self) -> float:
"""Return the nominal frame rate (guessed_rate when available)."""
return self._nominal_frame_rate
def _compute_fourcc(self) -> int:
"""Compute a fourcc code from the stream codec tag."""
tag = self._stream.codec_context.codec_tag
if isinstance(tag, str) and len(tag) >= 4:
tag = tag[:4]
return (
ord(tag[0])
| (ord(tag[1]) << 8)
| (ord(tag[2]) << 16)
| (ord(tag[3]) << 24)
)
return 0
def _ensure_frame_pts(self) -> None:
"""Decode the stream once to collect frame PTS values."""
if self._frame_pts is not None:
return
idx_container = av.open(self._path)
idx_stream = idx_container.streams.video[self._stream.index]
self._configure_codec_context(idx_stream)
self._configure_codec_context(idx_stream)
pts_list: List[int] = []
for frame in idx_container.decode(idx_stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._start_pts + len(pts_list)
pts_list.append(int(pts))
idx_container.close()
self._frame_pts = pts_list
self._frame_count = len(pts_list)
def _read_frame_by_pts(self, target_pts: int, *, decode_rgb: bool = True) -> DecodedFrame:
"""Decode the first frame at or after a target PTS."""
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
if self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
seek_pts = self._keyframes[idx].pts
else:
seek_pts = target_pts
container = av.open(self._path)
stream = container.streams.video[self._stream.index]
self._configure_codec_context(stream)
try:
container.seek(seek_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
last: Optional[DecodedFrame] = None
for packet in container.demux(stream):
for frame in packet.decode():
pts = frame.pts
image = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=image,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if pts >= target_pts:
return cur
last = cur
finally:
container.close()
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
def _read_frame_at_index(
self,
index: int,
*,
decode_rgb: bool = True,
use_sequential: bool = True,
) -> DecodedFrame:
"""Return the decoded frame at a zero-based index."""
if use_sequential and index >= 0:
if self._seq_decoder is not None and index == self._seq_frame_index:
return self.read_next_frame(decode_rgb=decode_rgb)
if self._seq_decoder is None and index == 0:
self.reset_sequence()
return self.read_next_frame(decode_rgb=decode_rgb)
self._ensure_frame_pts()
assert self._frame_pts is not None
if index < 0:
index += self._frame_count
if index < 0 or index >= self._frame_count:
raise IndexError("frame index out of range")
target_pts = self._frame_pts[index]
if use_sequential and self._seq_decoder is not None and index == self._seq_frame_index:
try:
decoded = self.read_next_frame(decode_rgb=decode_rgb)
except StopIteration:
decoded = self._seek_seq_to_pts(target_pts, target_index=index, decode_rgb=decode_rgb)
elif use_sequential and self._last_index is not None and index == self._last_index + 1:
decoded = self._seek_seq_to_pts(target_pts, target_index=index, decode_rgb=decode_rgb)
else:
decoded = self._read_frame_by_pts(target_pts, decode_rgb=decode_rgb)
self._current_frame_pos = float(index)
self._last_index = index
return decoded
def frame_at_index(self, index: int) -> np.ndarray:
"""Return the decoded frame at a zero-based index."""
return self._read_frame_at_index(index, decode_rgb=True, use_sequential=True).image
#
# Public helpers for PTS/time mapping
#
def pts_at_index(self, index: int) -> Optional[int]:
"""Return the presentation timestamp (PTS) for a frame index."""
self._ensure_frame_pts()
assert self._frame_pts is not None
if index < 0:
index += self._frame_count
if index < 0 or index >= self._frame_count:
raise IndexError("frame index out of range")
return int(self._frame_pts[index])
def time_at_index(self, index: int) -> float:
"""Return the timestamp in seconds for a frame index."""
pts = self.pts_at_index(index)
return float("nan") if pts is None else self._pts_to_secs(int(pts))
def index_from_pts(self, pts: int) -> int:
"""Map a PTS value to the nearest frame index."""
self._ensure_frame_pts()
assert self._frame_pts is not None
fps = self._frame_pts
if not fps:
return 0
lo, hi = 0, len(fps) - 1
if pts <= fps[0]:
return 0
if pts >= fps[-1]:
return hi
while lo <= hi:
mid = (lo + hi) // 2
m = fps[mid]
if m == pts:
return mid
if m < pts:
lo = mid + 1
else:
hi = mid - 1
# choose nearest between hi and lo
if lo >= len(fps):
return hi
if hi < 0:
return lo
return lo if abs(fps[lo] - pts) < abs(fps[hi] - pts) else hi
def index_from_time(self, t_s: float) -> int:
"""Map a timestamp in seconds to the nearest frame index."""
pts = self._secs_to_pts(float(t_s))
return self.index_from_pts(int(pts))
@property
def frame_height(self) -> int:
"""Return the video frame height."""
return self._frame_height
@property
def frame_width(self) -> int:
"""Return the video frame width."""
return self._frame_width
@property
def frame_rate(self) -> float:
"""Return the reported frame rate in frames per second."""
return self._frame_rate
@property
def fourcc(self) -> int:
"""Return the fourcc codec identifier."""
return self._fourcc
@property
def frame_format(self) -> int:
"""Return the frame format identifier."""
return self._frame_format
@property
def number_of_frames(self) -> int:
"""Return the total number of frames, decoding if needed."""
if self._frame_count <= 0:
self._ensure_frame_pts()
return self._frame_count
@property
def frame_shape(self) -> tuple:
"""Return the expected frame shape (H, W, C)."""
return self._frame_shape
@property
def current_frame_pos(self) -> float:
"""Return the last frame index accessed."""
return self._current_frame_pos
def _seek_to_pts(self, pts: int, *, backward: bool) -> None:
"""Seek to a timestamp in the stream."""
self._container.seek(pts, stream=self._stream, backward=backward, any_frame=False)
self._flush_decoder()
def _ensure_fast_container(self) -> None:
"""Initialize the fast-seek container if needed."""
if self._fast_container is not None:
return
self._fast_container = av.open(self._path)
self._fast_stream = self._fast_container.streams.video[self._stream.index]
self._configure_codec_context(self._fast_stream)
def _configure_codec_context(self, stream: av.video.stream.VideoStream) -> None:
"""Apply conservative threading settings to a codec context."""
try:
codec_ctx = stream.codec_context
if self._threading:
codec_ctx.thread_type = "AUTO"
codec_ctx.thread_count = self._thread_count
else:
codec_ctx.thread_type = "NONE"
codec_ctx.thread_count = 1
except Exception:
pass
def _fast_rewind(self) -> None:
"""Rewind the fast-seek container and reset decoder state."""
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
self._fast_container.seek(0)
self._fast_decoder = self._fast_container.decode(self._fast_stream)
self._fast_last_pts = None
def _fast_frame_to_pts(self, frame_index: int, fps: float) -> int:
"""Convert a frame index to expected PTS for fast reads."""
if frame_index <= 0:
return self._start_pts
return self._secs_to_pts(frame_index / fps)
def _read_frame_fast_like(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
"""Approximate FastVideoReader behavior for fast sequential reads."""
fps = self._nominal_frame_rate or self._frame_rate or 1.0
pts_per_frame = 1.0 / (fps * float(self._time_base)) if fps else 1.0
wiggle = pts_per_frame / 10.0
if target_frame <= 0:
self._fast_rewind()
assert self._fast_decoder is not None
frame = next(self._fast_decoder)
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._fast_frame_to_pts(0, fps)
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
expected_prev_pts = self._fast_frame_to_pts(target_frame - 1, fps)
if self._fast_decoder is not None and self._fast_last_pts == expected_prev_pts:
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
if frame is not None:
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._fast_frame_to_pts(target_frame, fps)
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
self._fast_last_pts = int(pts) if pts is not None else None
return cur
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
target_pts = self._fast_frame_to_pts(target_frame, fps)
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
self._fast_decoder = self._fast_container.decode(self._fast_stream)
self._fast_last_pts = None
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
if frame is None:
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
cur_pts = frame.pts if frame.pts is not None else frame.dts
if cur_pts is None:
cur_pts = target_pts
if cur_pts > target_pts:
back = max(1, int(round(100)))
back_pts = self._fast_frame_to_pts(max(0, target_frame - back), fps)
self._fast_container.seek(
back_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
self._fast_decoder = self._fast_container.decode(self._fast_stream)
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
if frame is None:
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
cur_pts = frame.pts if frame.pts is not None else frame.dts
if cur_pts is None:
cur_pts = target_pts
while float(cur_pts) < (float(target_pts) - wiggle):
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
break
if frame is None:
break
cur_pts = frame.pts if frame.pts is not None else frame.dts
if cur_pts is None:
cur_pts = target_pts
break
if frame is None:
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=cur_pts,
time_s=self._frame_time_s(cur_pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if cur_pts is not None:
self._frame_cache.put(int(cur_pts), cur)
self._fast_last_pts = int(cur_pts)
else:
self._fast_last_pts = None
return cur
def _ensure_seq_container(self) -> None:
"""Initialize a sequential decode container if needed."""
if self._seq_container is not None:
return
self._seq_container = av.open(self._path)
self._seq_stream = self._seq_container.streams.video[self._stream.index]
self._configure_codec_context(self._seq_stream)
self._seq_decoder = self._seq_container.decode(self._seq_stream)
self._seq_frame_index = 0
def reset_sequence(self) -> None:
"""Reset sequential decoding to the first frame."""
self._ensure_seq_container()
assert self._seq_container is not None
assert self._seq_stream is not None
try:
self._seq_container.seek(0)
except Exception:
pass
self._seq_decoder = self._seq_container.decode(self._seq_stream)
self._seq_frame_index = 0
def _seek_seq_to_pts(
self,
target_pts: int,
*,
target_index: int,
decode_rgb: bool,
any_frame: bool = False,
) -> DecodedFrame:
"""Seek the sequential decoder to a PTS and return the first match."""
self._ensure_seq_container()
assert self._seq_container is not None
assert self._seq_stream is not None
seek_pts = target_pts
if self._keyframes and not any_frame:
idx = self._keyframe_index_at_or_before_pts(target_pts)
seek_pts = self._keyframes[idx].pts
self._seq_container.seek(
seek_pts,
stream=self._seq_stream,
backward=True,
any_frame=any_frame,
)
try:
self._seq_stream.codec_context.flush_buffers()
except Exception:
pass
decoder = self._seq_container.decode(self._seq_stream)
last: Optional[DecodedFrame] = None
for frame in decoder:
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if pts >= target_pts:
self._seq_decoder = decoder
self._seq_frame_index = target_index + 1
self._current_frame_pos = float(target_index)
return cur
last = cur
if last is not None:
self._seq_decoder = decoder
self._seq_frame_index = target_index + 1
self._current_frame_pos = float(target_index)
return last
raise RuntimeError("Could not decode any frames after seeking.")
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Scan packets and store keyframe pts/time."""
path = self._container.name
idx_container = av.open(path)
idx_stream = idx_container.streams.video[self._stream.index]
key_pts: List[int] = []
n = 0
for packet in idx_container.demux(idx_stream):
if packet.dts is None and packet.pts is None:
continue
if packet.is_keyframe:
pts = packet.pts if packet.pts is not None else packet.dts
if pts is not None:
key_pts.append(int(pts))
n += 1
if max_packets is not None and n >= max_packets:
break
idx_container.close()
key_pts = sorted(set(key_pts))
if not key_pts:
key_pts = [self._start_pts]
self._keyframes = [KeyframeEntry(pts=p, time_s=self._pts_to_secs(p)) for p in key_pts]
self._index_built = True
self._bucket_to_kfidx.clear()
return self._keyframes
def _keyframe_index_at_or_before_pts(self, target_pts: int) -> int:
"""Return keyframe index at or before the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
if target_pts <= kf[0].pts:
return 0
if target_pts >= kf[-1].pts:
return len(kf) - 1
lo, hi = 0, len(kf) - 1
while lo <= hi:
mid = (lo + hi) // 2
m = kf[mid].pts
if m == target_pts:
return mid
if m < target_pts:
lo = mid + 1
else:
hi = mid - 1
return hi
def _keyframe_index_nearest_pts(self, target_pts: int) -> int:
"""Return nearest keyframe index to the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
i0 = self._keyframe_index_at_or_before_pts(target_pts)
i1 = min(i0 + 1, len(kf) - 1)
if i0 == i1:
return i0
d0 = abs(kf[i0].pts - target_pts)
d1 = abs(kf[i1].pts - target_pts)
return i0 if d0 <= d1 else i1
def _bucket_key(self, t_s: float) -> int:
"""Return a bucket key for the scrub acceleration cache."""
return int(round(t_s * 1000.0 / self._scrub_bucket_ms))
def _keyframe_index_for_time_fast(self, t_s: float, mode: str) -> int:
"""Return a keyframe index using cached time buckets."""
if not self._index_built:
raise RuntimeError("Keyframe index not built. Call build_keyframe_index() first.")
b = self._bucket_key(t_s)
mode_tag = {"previous": 0, "nearest": 1, "next": 2}.get(mode)
if mode_tag is None:
raise ValueError("mode must be one of: 'previous', 'nearest', 'next'")
cache_key = (b << 2) | mode_tag
cached = self._bucket_to_kfidx.get(cache_key)
if cached is not None:
return int(cached)
target_pts = self._secs_to_pts(t_s)
if mode == "previous":
idx = self._keyframe_index_at_or_before_pts(target_pts)
elif mode == "nearest":
idx = self._keyframe_index_nearest_pts(target_pts)
else:
i_prev = self._keyframe_index_at_or_before_pts(target_pts)
if self._keyframes[i_prev].pts >= target_pts:
idx = i_prev
else:
idx = min(i_prev + 1, len(self._keyframes) - 1)
self._bucket_to_kfidx.put(cache_key, idx)
return idx
def read_keyframe_at(
self,
t_s: Number,
*,
mode: str = "nearest",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe without GOP forward decoding."""
t_s = float(t_s)
idx = self._keyframe_index_for_time_fast(t_s, mode)
key_pts = self._keyframes[idx].pts
cached = self._frame_cache.get(key_pts)
if cached is not None:
return cached # type: ignore[return-value]
# Use a fresh container for reliable keyframe seek, avoiding stateful issues
container = av.open(self._path)
try:
stream = container.streams.video[self._stream.index]
self._configure_codec_context(stream)
# Use backward seek to land on or before the requested keyframe PTS reliably
container.seek(key_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
for packet in container.demux(stream):
for frame in packet.decode():
pts = frame.pts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray()
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
finally:
container.close()
raise RuntimeError("Failed to decode a frame after keyframe seek.")
def read_frame_at(
self,
t_s: Number,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Decode a frame near a timestamp with accurate seeking.
Simplified and robust: always uses a fresh container and backward keyframe seek.
"""
t_s = float(t_s)
target_pts = self._secs_to_pts(t_s)
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
container = av.open(self._path)
try:
stream = container.streams.video[self._stream.index]
self._configure_codec_context(stream)
if use_index and self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
anchor_pts = self._keyframes[idx].pts
container.seek(anchor_pts, stream=stream, backward=True, any_frame=False)
else:
container.seek(target_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
last: Optional[DecodedFrame] = None
decoded = 0
for packet in container.demux(stream):
for frame in packet.decode():
decoded += 1
if decoded > max_decode_frames:
raise RuntimeError(
"Exceeded max_decode_frames while seeking; timestamps may be broken."
)
pts = frame.pts
cur = DecodedFrame(
image=frame.to_rgb().to_ndarray(),
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if return_first_after:
if pts >= target_pts:
return cur
last = cur
else:
if pts <= target_pts:
last = cur
elif last is not None:
return last
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
finally:
container.close()
def _read_frame_fast_simple(self, target_pts: int, *, decode_rgb: bool) -> DecodedFrame:
"""Fallback fast seek: seek and decode first frame after PTS."""
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def grab_frame(container: av.container.InputContainer, stream: av.video.stream.VideoStream) -> Optional[DecodedFrame]:
for frame in container.decode(stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
target_reached = True
else:
target_reached = pts >= target_pts
if not target_reached:
continue
if decode_rgb:
img = frame.to_rgb().to_ndarray()
else:
img = frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
return None
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=True,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
raise RuntimeError("Failed to decode a frame after fast seek.")
def _read_frame_fast_opencv_pyav(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
"""Approximate OpenCV seek behavior using PyAV."""
fps = self._nominal_frame_rate or self._frame_rate or 1.0
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def seek_to_frame(frame_index: int) -> None:
target_pts = self._secs_to_pts(frame_index / fps)
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
def frame_number_from_pts(pts: Optional[int]) -> Optional[int]:
num = self._pts_to_frame_number(pts, fps)
if num is None:
return None
if self._fast_first_frame_number is None:
return num
return num - self._fast_first_frame_number
first_frame = None
if self._fast_first_frame_number is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
pts = frame.pts if frame.pts is not None else frame.dts
self._fast_first_frame_number = self._pts_to_frame_number(pts, fps) or 0
first_frame = frame
break
if target_frame <= 0:
if first_frame is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
first_frame = frame
break
if first_frame is None:
raise RuntimeError("Failed to decode a frame after fast seek.")
pts = first_frame.pts if first_frame.pts is not None else first_frame.dts
img = first_frame.to_rgb().to_ndarray() if decode_rgb else first_frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(first_frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
delta = 16
attempts = 0
while True:
start_frame = max(target_frame - delta, 0)
seek_to_frame(start_frame)
decoder = self._fast_container.decode(self._fast_stream)
try:
frame = next(decoder)
except StopIteration:
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = start_frame
if frame_number < 0 or frame_number > target_frame:
if start_frame == 0 or delta >= 1 << 30 or attempts > 20:
break
delta = delta * 2 if delta < 16 else int(delta * 1.5)
attempts += 1
continue
while frame_number < target_frame:
try:
frame = next(decoder)
except StopIteration:
frame = None
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = target_frame
if frame is None:
break
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
target_pts = self._secs_to_pts(target_frame / fps)
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[Number] = None,
decode_rgb: bool = True,
use_sequential: bool = True,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp."""
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if t_s is None:
if index is None:
raise ValueError("Provide either index or t_s")
if index < 0:
index += self.number_of_frames
target_frame = int(index)
else:
t_s = float(t_s)
target_frame = int(round(t_s * (self._nominal_frame_rate or self._frame_rate or 1.0)))
if use_sequential:
decoded = self._read_frame_fast_like(target_frame, decode_rgb=decode_rgb)
self._last_fast_index = target_frame
return decoded
fps = self._nominal_frame_rate or self._frame_rate or 1.0
target_pts = self._secs_to_pts(target_frame / fps)
cached = self._frame_cache.get(target_pts)
if cached is not None:
self._last_fast_index = target_frame
return cached # type: ignore[return-value]
decoded = self._read_frame_fast_opencv_pyav(target_frame, decode_rgb=decode_rgb)
self._last_fast_index = target_frame
return decoded
def read_frame(
self,
*,
index: Optional[int] = None,
t_s: Optional[Number] = None,
mode: str = "accurate",
decode_rgb: bool = True,
keyframe_mode: str = "nearest",
use_sequential: bool = True,
) -> DecodedFrame:
"""Read a frame using a selectable access mode."""
if mode not in {"accurate", "accurate_timeline", "fast", "scrub"}:
raise ValueError("mode must be one of: 'accurate', 'accurate_timeline', 'fast', 'scrub'")
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if mode == "accurate":
if index is not None:
return self._read_frame_at_index(
int(index),
decode_rgb=decode_rgb,
use_sequential=use_sequential,
)
assert t_s is not None
return self.read_frame_at(float(t_s), use_index=False)
if mode == "accurate_timeline":
if t_s is None:
fps = self._nominal_frame_rate or self._frame_rate or 1.0
t_s = float(index) / fps
return self.read_frame_at(float(t_s))
if mode == "scrub":
if t_s is None:
fps = self._frame_rate or 1.0
t_s = float(index) / fps
return self.read_keyframe_at(float(t_s), mode=keyframe_mode, decode_rgb=decode_rgb)
return self.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
use_sequential=use_sequential,
)
def read_next_frame(self, *, decode_rgb: bool = True) -> DecodedFrame:
"""Return the next frame using sequential decoding."""
self._ensure_seq_container()
assert self._seq_decoder is not None
try:
frame = next(self._seq_decoder)
except StopIteration:
raise
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
self._current_frame_pos = float(self._seq_frame_index)
self._last_index = int(self._seq_frame_index)
self._last_fast_index = int(self._seq_frame_index)
self._seq_frame_index += 1
return cur
def iter_frames(self, *, decode_rgb: bool = True) -> Iterator[DecodedFrame]:
"""Iterate through frames sequentially."""
self.reset_sequence()
assert self._seq_decoder is not None
for frame in self._seq_decoder:
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
self._current_frame_pos = float(self._seq_frame_index)
self._seq_frame_index += 1
yield cur
_bucket_to_kfidx = _LRU(scrub_bucket_lru_size)
instance-attribute
_codec_ctx = self._stream.codec_context
instance-attribute
_container = av.open(path)
instance-attribute
_current_frame_pos = 0.0
instance-attribute
_fast_container = None
instance-attribute
_fast_decoder = None
instance-attribute
_fast_first_frame_number = None
instance-attribute
_fast_last_pts = None
instance-attribute
_fast_stream = None
instance-attribute
_fourcc = self._compute_fourcc()
instance-attribute
_frame_cache = _LRU(decoded_frame_cache_size)
instance-attribute
_frame_count = int(self._stream.frames or 0)
instance-attribute
_frame_format = 0
instance-attribute
_frame_height = int(self._stream.height or 0)
instance-attribute
_frame_pts = None
instance-attribute
_frame_rate = self._compute_frame_rate()
instance-attribute
_frame_shape = (self._frame_height, self._frame_width, 3)
instance-attribute
_frame_width = int(self._stream.width or 0)
instance-attribute
_index_built = False
instance-attribute
_keyframes = []
instance-attribute
_last_fast_index = None
instance-attribute
_last_index = None
instance-attribute
_nominal_frame_rate = self._compute_nominal_frame_rate()
instance-attribute
_path = path
instance-attribute
_scrub_bucket_ms = max(1, int(scrub_bucket_ms))
instance-attribute
_seq_container = None
instance-attribute
_seq_decoder = None
instance-attribute
_seq_frame_index = 0
instance-attribute
_seq_stream = None
instance-attribute
_start_pts = self._stream.start_time if self._stream.start_time is not None else 0
instance-attribute
_stream = self._container.streams.video[video_stream_index]
instance-attribute
_thread_count = int(thread_count)
instance-attribute
_threading = bool(threading)
instance-attribute
_time_base = self._stream.time_base
instance-attribute
current_frame_pos
property
Return the last frame index accessed.
fourcc
property
Return the fourcc codec identifier.
frame_format
property
Return the frame format identifier.
frame_height
property
Return the video frame height.
frame_rate
property
Return the reported frame rate in frames per second.
frame_shape
property
Return the expected frame shape (H, W, C).
frame_width
property
Return the video frame width.
nominal_frame_rate
property
Return the nominal frame rate (guessed_rate when available).
number_of_frames
property
Return the total number of frames, decoding if needed.
__enter__()
Return self for context manager usage.
Source code in acvr/_pyav_backend.py
def __enter__(self) -> "PyAVVideoBackend":
"""Return self for context manager usage."""
return self
__exit__(exc_type, exc, tb)
Close the backend on exit from a context manager.
Source code in acvr/_pyav_backend.py
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the backend on exit from a context manager."""
self.close()
__init__(path, video_stream_index=0, *, build_index=False, decoded_frame_cache_size=0, scrub_bucket_ms=25, scrub_bucket_lru_size=4096, threading=True, thread_count=0)
Initialize the PyAV-backed decoder.
Source code in acvr/_pyav_backend.py
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = False,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 25,
scrub_bucket_lru_size: int = 4096,
threading: bool = True,
thread_count: int = 0,
) -> None:
"""Initialize the PyAV-backed decoder."""
self._path = path
self._container = av.open(path)
self._stream = self._container.streams.video[video_stream_index]
self._codec_ctx = self._stream.codec_context
self._fast_container: Optional[av.container.InputContainer] = None
self._fast_stream: Optional[av.video.stream.VideoStream] = None
self._fast_first_frame_number: Optional[int] = None
self._fast_decoder = None
self._fast_last_pts: Optional[int] = None
self._seq_container: Optional[av.container.InputContainer] = None
self._seq_stream: Optional[av.video.stream.VideoStream] = None
self._seq_decoder = None
self._seq_frame_index: int = 0
self._last_index: Optional[int] = None
self._last_fast_index: Optional[int] = None
self._time_base: Fraction = self._stream.time_base
self._start_pts: int = self._stream.start_time if self._stream.start_time is not None else 0
self._keyframes: List[KeyframeEntry] = []
self._index_built: bool = False
self._frame_pts: Optional[List[int]] = None
self._frame_count: int = int(self._stream.frames or 0)
self._current_frame_pos: float = 0.0
self._frame_cache = _LRU(decoded_frame_cache_size)
self._scrub_bucket_ms = max(1, int(scrub_bucket_ms))
self._bucket_to_kfidx = _LRU(scrub_bucket_lru_size)
self._threading = bool(threading)
self._thread_count = int(thread_count)
if build_index:
self.build_keyframe_index()
self._frame_height = int(self._stream.height or 0)
self._frame_width = int(self._stream.width or 0)
self._frame_shape = (self._frame_height, self._frame_width, 3)
self._frame_rate = self._compute_frame_rate()
self._nominal_frame_rate = self._compute_nominal_frame_rate()
self._fourcc = self._compute_fourcc()
self._frame_format = 0
_bucket_key(t_s)
Return a bucket key for the scrub acceleration cache.
Source code in acvr/_pyav_backend.py
def _bucket_key(self, t_s: float) -> int:
"""Return a bucket key for the scrub acceleration cache."""
return int(round(t_s * 1000.0 / self._scrub_bucket_ms))
_compute_fourcc()
Compute a fourcc code from the stream codec tag.
Source code in acvr/_pyav_backend.py
def _compute_fourcc(self) -> int:
"""Compute a fourcc code from the stream codec tag."""
tag = self._stream.codec_context.codec_tag
if isinstance(tag, str) and len(tag) >= 4:
tag = tag[:4]
return (
ord(tag[0])
| (ord(tag[1]) << 8)
| (ord(tag[2]) << 16)
| (ord(tag[3]) << 24)
)
return 0
_compute_frame_rate()
Compute the stream frame rate in frames per second.
Source code in acvr/_pyav_backend.py
def _compute_frame_rate(self) -> float:
"""Compute the stream frame rate in frames per second."""
rate = self._stream.average_rate or self._stream.base_rate
return float(rate) if rate is not None else 0.0
_compute_nominal_frame_rate()
Compute a nominal frame rate preferring guessed_rate (useful for VFR).
Source code in acvr/_pyav_backend.py
def _compute_nominal_frame_rate(self) -> float:
"""Compute a nominal frame rate preferring guessed_rate (useful for VFR)."""
rate = getattr(self._stream, "guessed_rate", None) or self._stream.average_rate or self._stream.base_rate
return float(rate) if rate is not None else 0.0
_configure_codec_context(stream)
Apply conservative threading settings to a codec context.
Source code in acvr/_pyav_backend.py
def _configure_codec_context(self, stream: av.video.stream.VideoStream) -> None:
"""Apply conservative threading settings to a codec context."""
try:
codec_ctx = stream.codec_context
if self._threading:
codec_ctx.thread_type = "AUTO"
codec_ctx.thread_count = self._thread_count
else:
codec_ctx.thread_type = "NONE"
codec_ctx.thread_count = 1
except Exception:
pass
_ensure_fast_container()
Initialize the fast-seek container if needed.
Source code in acvr/_pyav_backend.py
def _ensure_fast_container(self) -> None:
"""Initialize the fast-seek container if needed."""
if self._fast_container is not None:
return
self._fast_container = av.open(self._path)
self._fast_stream = self._fast_container.streams.video[self._stream.index]
self._configure_codec_context(self._fast_stream)
_ensure_frame_pts()
Decode the stream once to collect frame PTS values.
Source code in acvr/_pyav_backend.py
def _ensure_frame_pts(self) -> None:
"""Decode the stream once to collect frame PTS values."""
if self._frame_pts is not None:
return
idx_container = av.open(self._path)
idx_stream = idx_container.streams.video[self._stream.index]
self._configure_codec_context(idx_stream)
self._configure_codec_context(idx_stream)
pts_list: List[int] = []
for frame in idx_container.decode(idx_stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._start_pts + len(pts_list)
pts_list.append(int(pts))
idx_container.close()
self._frame_pts = pts_list
self._frame_count = len(pts_list)
_ensure_seq_container()
Initialize a sequential decode container if needed.
Source code in acvr/_pyav_backend.py
def _ensure_seq_container(self) -> None:
"""Initialize a sequential decode container if needed."""
if self._seq_container is not None:
return
self._seq_container = av.open(self._path)
self._seq_stream = self._seq_container.streams.video[self._stream.index]
self._configure_codec_context(self._seq_stream)
self._seq_decoder = self._seq_container.decode(self._seq_stream)
self._seq_frame_index = 0
_fast_frame_to_pts(frame_index, fps)
Convert a frame index to expected PTS for fast reads.
Source code in acvr/_pyav_backend.py
def _fast_frame_to_pts(self, frame_index: int, fps: float) -> int:
"""Convert a frame index to expected PTS for fast reads."""
if frame_index <= 0:
return self._start_pts
return self._secs_to_pts(frame_index / fps)
_fast_rewind()
Rewind the fast-seek container and reset decoder state.
Source code in acvr/_pyav_backend.py
def _fast_rewind(self) -> None:
"""Rewind the fast-seek container and reset decoder state."""
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
self._fast_container.seek(0)
self._fast_decoder = self._fast_container.decode(self._fast_stream)
self._fast_last_pts = None
_flush_decoder()
Flush decoder buffers if supported.
Source code in acvr/_pyav_backend.py
def _flush_decoder(self) -> None:
"""Flush decoder buffers if supported."""
try:
self._codec_ctx.flush_buffers()
except Exception:
pass
_frame_time_s(pts)
Return the timestamp for a frame PTS.
Source code in acvr/_pyav_backend.py
def _frame_time_s(self, pts: Optional[int]) -> float:
"""Return the timestamp for a frame PTS."""
return float("nan") if pts is None else self._pts_to_secs(pts)
_keyframe_index_at_or_before_pts(target_pts)
Return keyframe index at or before the target PTS.
Source code in acvr/_pyav_backend.py
def _keyframe_index_at_or_before_pts(self, target_pts: int) -> int:
"""Return keyframe index at or before the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
if target_pts <= kf[0].pts:
return 0
if target_pts >= kf[-1].pts:
return len(kf) - 1
lo, hi = 0, len(kf) - 1
while lo <= hi:
mid = (lo + hi) // 2
m = kf[mid].pts
if m == target_pts:
return mid
if m < target_pts:
lo = mid + 1
else:
hi = mid - 1
return hi
_keyframe_index_for_time_fast(t_s, mode)
Return a keyframe index using cached time buckets.
Source code in acvr/_pyav_backend.py
def _keyframe_index_for_time_fast(self, t_s: float, mode: str) -> int:
"""Return a keyframe index using cached time buckets."""
if not self._index_built:
raise RuntimeError("Keyframe index not built. Call build_keyframe_index() first.")
b = self._bucket_key(t_s)
mode_tag = {"previous": 0, "nearest": 1, "next": 2}.get(mode)
if mode_tag is None:
raise ValueError("mode must be one of: 'previous', 'nearest', 'next'")
cache_key = (b << 2) | mode_tag
cached = self._bucket_to_kfidx.get(cache_key)
if cached is not None:
return int(cached)
target_pts = self._secs_to_pts(t_s)
if mode == "previous":
idx = self._keyframe_index_at_or_before_pts(target_pts)
elif mode == "nearest":
idx = self._keyframe_index_nearest_pts(target_pts)
else:
i_prev = self._keyframe_index_at_or_before_pts(target_pts)
if self._keyframes[i_prev].pts >= target_pts:
idx = i_prev
else:
idx = min(i_prev + 1, len(self._keyframes) - 1)
self._bucket_to_kfidx.put(cache_key, idx)
return idx
_keyframe_index_nearest_pts(target_pts)
Return nearest keyframe index to the target PTS.
Source code in acvr/_pyav_backend.py
def _keyframe_index_nearest_pts(self, target_pts: int) -> int:
"""Return nearest keyframe index to the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
i0 = self._keyframe_index_at_or_before_pts(target_pts)
i1 = min(i0 + 1, len(kf) - 1)
if i0 == i1:
return i0
d0 = abs(kf[i0].pts - target_pts)
d1 = abs(kf[i1].pts - target_pts)
return i0 if d0 <= d1 else i1
_pts_to_frame_number(pts, fps)
Convert a PTS value to a rounded frame number.
Source code in acvr/_pyav_backend.py
def _pts_to_frame_number(self, pts: Optional[int], fps: float) -> Optional[int]:
"""Convert a PTS value to a rounded frame number."""
if pts is None:
return None
return int(round(self._pts_to_secs(int(pts)) * fps))
_pts_to_secs(pts)
Convert presentation timestamp units to seconds.
Source code in acvr/_pyav_backend.py
def _pts_to_secs(self, pts: int) -> float:
"""Convert presentation timestamp units to seconds."""
return float((pts - self._start_pts) * self._time_base)
_read_frame_at_index(index, *, decode_rgb=True, use_sequential=True)
Return the decoded frame at a zero-based index.
Source code in acvr/_pyav_backend.py
def _read_frame_at_index(
self,
index: int,
*,
decode_rgb: bool = True,
use_sequential: bool = True,
) -> DecodedFrame:
"""Return the decoded frame at a zero-based index."""
if use_sequential and index >= 0:
if self._seq_decoder is not None and index == self._seq_frame_index:
return self.read_next_frame(decode_rgb=decode_rgb)
if self._seq_decoder is None and index == 0:
self.reset_sequence()
return self.read_next_frame(decode_rgb=decode_rgb)
self._ensure_frame_pts()
assert self._frame_pts is not None
if index < 0:
index += self._frame_count
if index < 0 or index >= self._frame_count:
raise IndexError("frame index out of range")
target_pts = self._frame_pts[index]
if use_sequential and self._seq_decoder is not None and index == self._seq_frame_index:
try:
decoded = self.read_next_frame(decode_rgb=decode_rgb)
except StopIteration:
decoded = self._seek_seq_to_pts(target_pts, target_index=index, decode_rgb=decode_rgb)
elif use_sequential and self._last_index is not None and index == self._last_index + 1:
decoded = self._seek_seq_to_pts(target_pts, target_index=index, decode_rgb=decode_rgb)
else:
decoded = self._read_frame_by_pts(target_pts, decode_rgb=decode_rgb)
self._current_frame_pos = float(index)
self._last_index = index
return decoded
_read_frame_by_pts(target_pts, *, decode_rgb=True)
Decode the first frame at or after a target PTS.
Source code in acvr/_pyav_backend.py
def _read_frame_by_pts(self, target_pts: int, *, decode_rgb: bool = True) -> DecodedFrame:
"""Decode the first frame at or after a target PTS."""
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
if self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
seek_pts = self._keyframes[idx].pts
else:
seek_pts = target_pts
container = av.open(self._path)
stream = container.streams.video[self._stream.index]
self._configure_codec_context(stream)
try:
container.seek(seek_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
last: Optional[DecodedFrame] = None
for packet in container.demux(stream):
for frame in packet.decode():
pts = frame.pts
image = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=image,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if pts >= target_pts:
return cur
last = cur
finally:
container.close()
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
_read_frame_fast_like(target_frame, *, decode_rgb)
Approximate FastVideoReader behavior for fast sequential reads.
Source code in acvr/_pyav_backend.py
def _read_frame_fast_like(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
"""Approximate FastVideoReader behavior for fast sequential reads."""
fps = self._nominal_frame_rate or self._frame_rate or 1.0
pts_per_frame = 1.0 / (fps * float(self._time_base)) if fps else 1.0
wiggle = pts_per_frame / 10.0
if target_frame <= 0:
self._fast_rewind()
assert self._fast_decoder is not None
frame = next(self._fast_decoder)
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._fast_frame_to_pts(0, fps)
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
expected_prev_pts = self._fast_frame_to_pts(target_frame - 1, fps)
if self._fast_decoder is not None and self._fast_last_pts == expected_prev_pts:
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
if frame is not None:
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._fast_frame_to_pts(target_frame, fps)
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
self._fast_last_pts = int(pts) if pts is not None else None
return cur
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
target_pts = self._fast_frame_to_pts(target_frame, fps)
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
self._fast_decoder = self._fast_container.decode(self._fast_stream)
self._fast_last_pts = None
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
if frame is None:
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
cur_pts = frame.pts if frame.pts is not None else frame.dts
if cur_pts is None:
cur_pts = target_pts
if cur_pts > target_pts:
back = max(1, int(round(100)))
back_pts = self._fast_frame_to_pts(max(0, target_frame - back), fps)
self._fast_container.seek(
back_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
self._fast_decoder = self._fast_container.decode(self._fast_stream)
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
if frame is None:
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
cur_pts = frame.pts if frame.pts is not None else frame.dts
if cur_pts is None:
cur_pts = target_pts
while float(cur_pts) < (float(target_pts) - wiggle):
try:
frame = next(self._fast_decoder)
except StopIteration:
frame = None
break
if frame is None:
break
cur_pts = frame.pts if frame.pts is not None else frame.dts
if cur_pts is None:
cur_pts = target_pts
break
if frame is None:
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=cur_pts,
time_s=self._frame_time_s(cur_pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if cur_pts is not None:
self._frame_cache.put(int(cur_pts), cur)
self._fast_last_pts = int(cur_pts)
else:
self._fast_last_pts = None
return cur
_read_frame_fast_opencv_pyav(target_frame, *, decode_rgb)
Approximate OpenCV seek behavior using PyAV.
Source code in acvr/_pyav_backend.py
def _read_frame_fast_opencv_pyav(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
"""Approximate OpenCV seek behavior using PyAV."""
fps = self._nominal_frame_rate or self._frame_rate or 1.0
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def seek_to_frame(frame_index: int) -> None:
target_pts = self._secs_to_pts(frame_index / fps)
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
def frame_number_from_pts(pts: Optional[int]) -> Optional[int]:
num = self._pts_to_frame_number(pts, fps)
if num is None:
return None
if self._fast_first_frame_number is None:
return num
return num - self._fast_first_frame_number
first_frame = None
if self._fast_first_frame_number is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
pts = frame.pts if frame.pts is not None else frame.dts
self._fast_first_frame_number = self._pts_to_frame_number(pts, fps) or 0
first_frame = frame
break
if target_frame <= 0:
if first_frame is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
first_frame = frame
break
if first_frame is None:
raise RuntimeError("Failed to decode a frame after fast seek.")
pts = first_frame.pts if first_frame.pts is not None else first_frame.dts
img = first_frame.to_rgb().to_ndarray() if decode_rgb else first_frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(first_frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
delta = 16
attempts = 0
while True:
start_frame = max(target_frame - delta, 0)
seek_to_frame(start_frame)
decoder = self._fast_container.decode(self._fast_stream)
try:
frame = next(decoder)
except StopIteration:
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = start_frame
if frame_number < 0 or frame_number > target_frame:
if start_frame == 0 or delta >= 1 << 30 or attempts > 20:
break
delta = delta * 2 if delta < 16 else int(delta * 1.5)
attempts += 1
continue
while frame_number < target_frame:
try:
frame = next(decoder)
except StopIteration:
frame = None
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = target_frame
if frame is None:
break
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
target_pts = self._secs_to_pts(target_frame / fps)
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
_read_frame_fast_simple(target_pts, *, decode_rgb)
Fallback fast seek: seek and decode first frame after PTS.
Source code in acvr/_pyav_backend.py
def _read_frame_fast_simple(self, target_pts: int, *, decode_rgb: bool) -> DecodedFrame:
"""Fallback fast seek: seek and decode first frame after PTS."""
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def grab_frame(container: av.container.InputContainer, stream: av.video.stream.VideoStream) -> Optional[DecodedFrame]:
for frame in container.decode(stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
target_reached = True
else:
target_reached = pts >= target_pts
if not target_reached:
continue
if decode_rgb:
img = frame.to_rgb().to_ndarray()
else:
img = frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
return None
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=True,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
raise RuntimeError("Failed to decode a frame after fast seek.")
_secs_to_pts(t_s)
Convert seconds to presentation timestamp units.
Source code in acvr/_pyav_backend.py
def _secs_to_pts(self, t_s: float) -> int:
"""Convert seconds to presentation timestamp units."""
ticks = int(round(t_s / float(self._time_base)))
return self._start_pts + ticks
_seek_seq_to_pts(target_pts, *, target_index, decode_rgb, any_frame=False)
Seek the sequential decoder to a PTS and return the first match.
Source code in acvr/_pyav_backend.py
def _seek_seq_to_pts(
self,
target_pts: int,
*,
target_index: int,
decode_rgb: bool,
any_frame: bool = False,
) -> DecodedFrame:
"""Seek the sequential decoder to a PTS and return the first match."""
self._ensure_seq_container()
assert self._seq_container is not None
assert self._seq_stream is not None
seek_pts = target_pts
if self._keyframes and not any_frame:
idx = self._keyframe_index_at_or_before_pts(target_pts)
seek_pts = self._keyframes[idx].pts
self._seq_container.seek(
seek_pts,
stream=self._seq_stream,
backward=True,
any_frame=any_frame,
)
try:
self._seq_stream.codec_context.flush_buffers()
except Exception:
pass
decoder = self._seq_container.decode(self._seq_stream)
last: Optional[DecodedFrame] = None
for frame in decoder:
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if pts >= target_pts:
self._seq_decoder = decoder
self._seq_frame_index = target_index + 1
self._current_frame_pos = float(target_index)
return cur
last = cur
if last is not None:
self._seq_decoder = decoder
self._seq_frame_index = target_index + 1
self._current_frame_pos = float(target_index)
return last
raise RuntimeError("Could not decode any frames after seeking.")
_seek_to_pts(pts, *, backward)
Seek to a timestamp in the stream.
Source code in acvr/_pyav_backend.py
def _seek_to_pts(self, pts: int, *, backward: bool) -> None:
"""Seek to a timestamp in the stream."""
self._container.seek(pts, stream=self._stream, backward=backward, any_frame=False)
self._flush_decoder()
build_keyframe_index(*, max_packets=None)
Scan packets and store keyframe pts/time.
Source code in acvr/_pyav_backend.py
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Scan packets and store keyframe pts/time."""
path = self._container.name
idx_container = av.open(path)
idx_stream = idx_container.streams.video[self._stream.index]
key_pts: List[int] = []
n = 0
for packet in idx_container.demux(idx_stream):
if packet.dts is None and packet.pts is None:
continue
if packet.is_keyframe:
pts = packet.pts if packet.pts is not None else packet.dts
if pts is not None:
key_pts.append(int(pts))
n += 1
if max_packets is not None and n >= max_packets:
break
idx_container.close()
key_pts = sorted(set(key_pts))
if not key_pts:
key_pts = [self._start_pts]
self._keyframes = [KeyframeEntry(pts=p, time_s=self._pts_to_secs(p)) for p in key_pts]
self._index_built = True
self._bucket_to_kfidx.clear()
return self._keyframes
close()
Close the underlying PyAV container.
Source code in acvr/_pyav_backend.py
def close(self) -> None:
"""Close the underlying PyAV container."""
self._container.close()
if self._fast_container is not None:
self._fast_container.close()
self._fast_container = None
self._fast_stream = None
self._fast_decoder = None
self._fast_last_pts = None
self._fast_first_frame_number = None
if self._seq_container is not None:
self._seq_container.close()
self._seq_container = None
self._seq_stream = None
self._seq_decoder = None
self._seq_frame_index = 0
self._last_index = None
self._last_fast_index = None
frame_at_index(index)
Return the decoded frame at a zero-based index.
Source code in acvr/_pyav_backend.py
def frame_at_index(self, index: int) -> np.ndarray:
"""Return the decoded frame at a zero-based index."""
return self._read_frame_at_index(index, decode_rgb=True, use_sequential=True).image
index_from_pts(pts)
Map a PTS value to the nearest frame index.
Source code in acvr/_pyav_backend.py
def index_from_pts(self, pts: int) -> int:
"""Map a PTS value to the nearest frame index."""
self._ensure_frame_pts()
assert self._frame_pts is not None
fps = self._frame_pts
if not fps:
return 0
lo, hi = 0, len(fps) - 1
if pts <= fps[0]:
return 0
if pts >= fps[-1]:
return hi
while lo <= hi:
mid = (lo + hi) // 2
m = fps[mid]
if m == pts:
return mid
if m < pts:
lo = mid + 1
else:
hi = mid - 1
# choose nearest between hi and lo
if lo >= len(fps):
return hi
if hi < 0:
return lo
return lo if abs(fps[lo] - pts) < abs(fps[hi] - pts) else hi
index_from_time(t_s)
Map a timestamp in seconds to the nearest frame index.
Source code in acvr/_pyav_backend.py
def index_from_time(self, t_s: float) -> int:
"""Map a timestamp in seconds to the nearest frame index."""
pts = self._secs_to_pts(float(t_s))
return self.index_from_pts(int(pts))
iter_frames(*, decode_rgb=True)
Iterate through frames sequentially.
Source code in acvr/_pyav_backend.py
def iter_frames(self, *, decode_rgb: bool = True) -> Iterator[DecodedFrame]:
"""Iterate through frames sequentially."""
self.reset_sequence()
assert self._seq_decoder is not None
for frame in self._seq_decoder:
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
self._current_frame_pos = float(self._seq_frame_index)
self._seq_frame_index += 1
yield cur
pts_at_index(index)
Return the presentation timestamp (PTS) for a frame index.
Source code in acvr/_pyav_backend.py
def pts_at_index(self, index: int) -> Optional[int]:
"""Return the presentation timestamp (PTS) for a frame index."""
self._ensure_frame_pts()
assert self._frame_pts is not None
if index < 0:
index += self._frame_count
if index < 0 or index >= self._frame_count:
raise IndexError("frame index out of range")
return int(self._frame_pts[index])
read_frame(*, index=None, t_s=None, mode='accurate', decode_rgb=True, keyframe_mode='nearest', use_sequential=True)
Read a frame using a selectable access mode.
Source code in acvr/_pyav_backend.py
def read_frame(
self,
*,
index: Optional[int] = None,
t_s: Optional[Number] = None,
mode: str = "accurate",
decode_rgb: bool = True,
keyframe_mode: str = "nearest",
use_sequential: bool = True,
) -> DecodedFrame:
"""Read a frame using a selectable access mode."""
if mode not in {"accurate", "accurate_timeline", "fast", "scrub"}:
raise ValueError("mode must be one of: 'accurate', 'accurate_timeline', 'fast', 'scrub'")
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if mode == "accurate":
if index is not None:
return self._read_frame_at_index(
int(index),
decode_rgb=decode_rgb,
use_sequential=use_sequential,
)
assert t_s is not None
return self.read_frame_at(float(t_s), use_index=False)
if mode == "accurate_timeline":
if t_s is None:
fps = self._nominal_frame_rate or self._frame_rate or 1.0
t_s = float(index) / fps
return self.read_frame_at(float(t_s))
if mode == "scrub":
if t_s is None:
fps = self._frame_rate or 1.0
t_s = float(index) / fps
return self.read_keyframe_at(float(t_s), mode=keyframe_mode, decode_rgb=decode_rgb)
return self.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
use_sequential=use_sequential,
)
read_frame_at(t_s, *, return_first_after=True, max_decode_frames=10000, use_index=True)
Decode a frame near a timestamp with accurate seeking.
Simplified and robust: always uses a fresh container and backward keyframe seek.
Source code in acvr/_pyav_backend.py
def read_frame_at(
self,
t_s: Number,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Decode a frame near a timestamp with accurate seeking.
Simplified and robust: always uses a fresh container and backward keyframe seek.
"""
t_s = float(t_s)
target_pts = self._secs_to_pts(t_s)
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
container = av.open(self._path)
try:
stream = container.streams.video[self._stream.index]
self._configure_codec_context(stream)
if use_index and self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
anchor_pts = self._keyframes[idx].pts
container.seek(anchor_pts, stream=stream, backward=True, any_frame=False)
else:
container.seek(target_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
last: Optional[DecodedFrame] = None
decoded = 0
for packet in container.demux(stream):
for frame in packet.decode():
decoded += 1
if decoded > max_decode_frames:
raise RuntimeError(
"Exceeded max_decode_frames while seeking; timestamps may be broken."
)
pts = frame.pts
cur = DecodedFrame(
image=frame.to_rgb().to_ndarray(),
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if return_first_after:
if pts >= target_pts:
return cur
last = cur
else:
if pts <= target_pts:
last = cur
elif last is not None:
return last
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
finally:
container.close()
read_frame_fast(*, index=None, t_s=None, decode_rgb=True, use_sequential=True)
Return a fast, approximate frame for an index or timestamp.
Source code in acvr/_pyav_backend.py
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[Number] = None,
decode_rgb: bool = True,
use_sequential: bool = True,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp."""
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if t_s is None:
if index is None:
raise ValueError("Provide either index or t_s")
if index < 0:
index += self.number_of_frames
target_frame = int(index)
else:
t_s = float(t_s)
target_frame = int(round(t_s * (self._nominal_frame_rate or self._frame_rate or 1.0)))
if use_sequential:
decoded = self._read_frame_fast_like(target_frame, decode_rgb=decode_rgb)
self._last_fast_index = target_frame
return decoded
fps = self._nominal_frame_rate or self._frame_rate or 1.0
target_pts = self._secs_to_pts(target_frame / fps)
cached = self._frame_cache.get(target_pts)
if cached is not None:
self._last_fast_index = target_frame
return cached # type: ignore[return-value]
decoded = self._read_frame_fast_opencv_pyav(target_frame, decode_rgb=decode_rgb)
self._last_fast_index = target_frame
return decoded
read_keyframe_at(t_s, *, mode='nearest', decode_rgb=True)
Return a nearby keyframe without GOP forward decoding.
Source code in acvr/_pyav_backend.py
def read_keyframe_at(
self,
t_s: Number,
*,
mode: str = "nearest",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe without GOP forward decoding."""
t_s = float(t_s)
idx = self._keyframe_index_for_time_fast(t_s, mode)
key_pts = self._keyframes[idx].pts
cached = self._frame_cache.get(key_pts)
if cached is not None:
return cached # type: ignore[return-value]
# Use a fresh container for reliable keyframe seek, avoiding stateful issues
container = av.open(self._path)
try:
stream = container.streams.video[self._stream.index]
self._configure_codec_context(stream)
# Use backward seek to land on or before the requested keyframe PTS reliably
container.seek(key_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
for packet in container.demux(stream):
for frame in packet.decode():
pts = frame.pts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray()
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
finally:
container.close()
raise RuntimeError("Failed to decode a frame after keyframe seek.")
read_next_frame(*, decode_rgb=True)
Return the next frame using sequential decoding.
Source code in acvr/_pyav_backend.py
def read_next_frame(self, *, decode_rgb: bool = True) -> DecodedFrame:
"""Return the next frame using sequential decoding."""
self._ensure_seq_container()
assert self._seq_decoder is not None
try:
frame = next(self._seq_decoder)
except StopIteration:
raise
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
self._current_frame_pos = float(self._seq_frame_index)
self._last_index = int(self._seq_frame_index)
self._last_fast_index = int(self._seq_frame_index)
self._seq_frame_index += 1
return cur
reset_sequence()
Reset sequential decoding to the first frame.
Source code in acvr/_pyav_backend.py
def reset_sequence(self) -> None:
"""Reset sequential decoding to the first frame."""
self._ensure_seq_container()
assert self._seq_container is not None
assert self._seq_stream is not None
try:
self._seq_container.seek(0)
except Exception:
pass
self._seq_decoder = self._seq_container.decode(self._seq_stream)
self._seq_frame_index = 0
time_at_index(index)
Return the timestamp in seconds for a frame index.
Source code in acvr/_pyav_backend.py
def time_at_index(self, index: int) -> float:
"""Return the timestamp in seconds for a frame index."""
pts = self.pts_at_index(index)
return float("nan") if pts is None else self._pts_to_secs(int(pts))