API
VideoReader
High-level video reader with array-style access.
Source code in acvr/reader.py
class VideoReader:
"""High-level video reader with array-style access."""
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = False,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 100,
scrub_bucket_lru_size: int = 4096,
) -> None:
"""Create a reader for the given video path."""
self._backend = PyAVVideoBackend(
path,
video_stream_index=video_stream_index,
build_index=build_index,
decoded_frame_cache_size=decoded_frame_cache_size,
scrub_bucket_ms=scrub_bucket_ms,
scrub_bucket_lru_size=scrub_bucket_lru_size,
)
def close(self) -> None:
"""Close the underlying video resources."""
self._backend.close()
def __enter__(self) -> "VideoReader":
"""Return self for context manager usage."""
return self
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the reader when leaving a context manager."""
self.close()
def __len__(self) -> int:
"""Return the number of frames in the video."""
return self.number_of_frames
def __getitem__(self, key: IndexKey) -> Union[np.ndarray, List[np.ndarray]]:
"""Return a frame or list of frames for the given index or slice."""
if isinstance(key, slice):
start, stop, step = key.indices(self.number_of_frames)
return [self._backend.frame_at_index(i) for i in range(start, stop, step)]
return self._backend.frame_at_index(int(key))
def __iter__(self) -> Iterator[np.ndarray]:
"""Iterate over all frames in the video."""
return iter(self[:])
@property
def frame_height(self) -> int:
"""Return the frame height in pixels."""
return self._backend.frame_height
@property
def frame_width(self) -> int:
"""Return the frame width in pixels."""
return self._backend.frame_width
@property
def frame_rate(self) -> float:
"""Return the video frame rate."""
return self._backend.frame_rate
@property
def fourcc(self) -> int:
"""Return the fourcc codec identifier."""
return self._backend.fourcc
@property
def frame_format(self) -> int:
"""Return the pixel format identifier."""
return self._backend.frame_format
@property
def number_of_frames(self) -> int:
"""Return the total number of frames."""
return self._backend.number_of_frames
@property
def frame_shape(self) -> tuple:
"""Return the expected frame shape (H, W, C)."""
return self._backend.frame_shape
@property
def current_frame_pos(self) -> float:
"""Return the last accessed frame index."""
return self._backend.current_frame_pos
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Build a keyframe index for faster random access."""
return self._backend.build_keyframe_index(max_packets=max_packets)
def read_keyframe_at(
self,
t_s: float,
*,
mode: str = "previous",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe for fast scrubbing."""
return self._backend.read_keyframe_at(t_s, mode=mode, decode_rgb=decode_rgb)
def read_frame_at(
self,
t_s: float,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Return a frame at a timestamp with accurate seeking."""
return self._backend.read_frame_at(
t_s,
return_first_after=return_first_after,
max_decode_frames=max_decode_frames,
use_index=use_index,
)
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
decode_rgb: bool = False,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp."""
return self._backend.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
)
def read_frame(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
mode: str = "accurate",
decode_rgb: bool = False,
keyframe_mode: str = "previous",
) -> DecodedFrame:
"""Read a frame using a selectable access mode."""
if mode not in {"accurate", "fast", "scrub"}:
raise ValueError("mode must be one of: 'accurate', 'fast', 'scrub'")
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if mode == "accurate":
if index is not None:
return self._backend.frame_at_index(int(index))
assert t_s is not None
return self._backend.read_frame_at(float(t_s))
if mode == "scrub":
if t_s is None:
fps = self.frame_rate or 1.0
t_s = float(index) / fps
return self._backend.read_keyframe_at(float(t_s), mode=keyframe_mode, decode_rgb=decode_rgb)
return self._backend.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
)
_backend = PyAVVideoBackend(path, video_stream_index=video_stream_index, build_index=build_index, decoded_frame_cache_size=decoded_frame_cache_size, scrub_bucket_ms=scrub_bucket_ms, scrub_bucket_lru_size=scrub_bucket_lru_size)
instance-attribute
current_frame_pos
property
Return the last accessed frame index.
fourcc
property
Return the fourcc codec identifier.
frame_format
property
Return the pixel format identifier.
frame_height
property
Return the frame height in pixels.
frame_rate
property
Return the video frame rate.
frame_shape
property
Return the expected frame shape (H, W, C).
frame_width
property
Return the frame width in pixels.
number_of_frames
property
Return the total number of frames.
__enter__()
Return self for context manager usage.
Source code in acvr/reader.py
def __enter__(self) -> "VideoReader":
"""Return self for context manager usage."""
return self
__exit__(exc_type, exc, tb)
Close the reader when leaving a context manager.
Source code in acvr/reader.py
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the reader when leaving a context manager."""
self.close()
__getitem__(key)
Return a frame or list of frames for the given index or slice.
Source code in acvr/reader.py
def __getitem__(self, key: IndexKey) -> Union[np.ndarray, List[np.ndarray]]:
"""Return a frame or list of frames for the given index or slice."""
if isinstance(key, slice):
start, stop, step = key.indices(self.number_of_frames)
return [self._backend.frame_at_index(i) for i in range(start, stop, step)]
return self._backend.frame_at_index(int(key))
__init__(path, video_stream_index=0, *, build_index=False, decoded_frame_cache_size=0, scrub_bucket_ms=100, scrub_bucket_lru_size=4096)
Create a reader for the given video path.
Source code in acvr/reader.py
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = False,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 100,
scrub_bucket_lru_size: int = 4096,
) -> None:
"""Create a reader for the given video path."""
self._backend = PyAVVideoBackend(
path,
video_stream_index=video_stream_index,
build_index=build_index,
decoded_frame_cache_size=decoded_frame_cache_size,
scrub_bucket_ms=scrub_bucket_ms,
scrub_bucket_lru_size=scrub_bucket_lru_size,
)
__iter__()
Iterate over all frames in the video.
Source code in acvr/reader.py
def __iter__(self) -> Iterator[np.ndarray]:
"""Iterate over all frames in the video."""
return iter(self[:])
__len__()
Return the number of frames in the video.
Source code in acvr/reader.py
def __len__(self) -> int:
"""Return the number of frames in the video."""
return self.number_of_frames
build_keyframe_index(*, max_packets=None)
Build a keyframe index for faster random access.
Source code in acvr/reader.py
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Build a keyframe index for faster random access."""
return self._backend.build_keyframe_index(max_packets=max_packets)
close()
Close the underlying video resources.
Source code in acvr/reader.py
def close(self) -> None:
"""Close the underlying video resources."""
self._backend.close()
read_frame(*, index=None, t_s=None, mode='accurate', decode_rgb=False, keyframe_mode='previous')
Read a frame using a selectable access mode.
Source code in acvr/reader.py
def read_frame(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
mode: str = "accurate",
decode_rgb: bool = False,
keyframe_mode: str = "previous",
) -> DecodedFrame:
"""Read a frame using a selectable access mode."""
if mode not in {"accurate", "fast", "scrub"}:
raise ValueError("mode must be one of: 'accurate', 'fast', 'scrub'")
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if mode == "accurate":
if index is not None:
return self._backend.frame_at_index(int(index))
assert t_s is not None
return self._backend.read_frame_at(float(t_s))
if mode == "scrub":
if t_s is None:
fps = self.frame_rate or 1.0
t_s = float(index) / fps
return self._backend.read_keyframe_at(float(t_s), mode=keyframe_mode, decode_rgb=decode_rgb)
return self._backend.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
)
read_frame_at(t_s, *, return_first_after=True, max_decode_frames=10000, use_index=True)
Return a frame at a timestamp with accurate seeking.
Source code in acvr/reader.py
def read_frame_at(
self,
t_s: float,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Return a frame at a timestamp with accurate seeking."""
return self._backend.read_frame_at(
t_s,
return_first_after=return_first_after,
max_decode_frames=max_decode_frames,
use_index=use_index,
)
read_frame_fast(*, index=None, t_s=None, decode_rgb=False)
Return a fast, approximate frame for an index or timestamp.
Source code in acvr/reader.py
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[float] = None,
decode_rgb: bool = False,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp."""
return self._backend.read_frame_fast(
index=index,
t_s=t_s,
decode_rgb=decode_rgb,
)
read_keyframe_at(t_s, *, mode='previous', decode_rgb=True)
Return a nearby keyframe for fast scrubbing.
Source code in acvr/reader.py
def read_keyframe_at(
self,
t_s: float,
*,
mode: str = "previous",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe for fast scrubbing."""
return self._backend.read_keyframe_at(t_s, mode=mode, decode_rgb=decode_rgb)
Backend implementation
Frame-accurate seeking with keyframe index and scrub acceleration.
Source code in acvr/_pyav_backend.py
class PyAVVideoBackend:
"""Frame-accurate seeking with keyframe index and scrub acceleration."""
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = False,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 100,
scrub_bucket_lru_size: int = 4096,
) -> None:
"""Initialize the PyAV-backed decoder."""
self._path = path
self._container = av.open(path)
self._stream = self._container.streams.video[video_stream_index]
self._codec_ctx = self._stream.codec_context
self._fast_container: Optional[av.container.InputContainer] = None
self._fast_stream: Optional[av.video.stream.VideoStream] = None
self._fast_first_frame_number: Optional[int] = None
self._time_base: Fraction = self._stream.time_base
self._start_pts: int = self._stream.start_time if self._stream.start_time is not None else 0
self._keyframes: List[KeyframeEntry] = []
self._index_built: bool = False
self._frame_pts: Optional[List[int]] = None
self._frame_count: int = int(self._stream.frames or 0)
self._current_frame_pos: float = 0.0
self._frame_cache = _LRU(decoded_frame_cache_size)
self._scrub_bucket_ms = max(1, int(scrub_bucket_ms))
self._bucket_to_kfidx = _LRU(scrub_bucket_lru_size)
if build_index:
self.build_keyframe_index()
self._frame_height = int(self._stream.height or 0)
self._frame_width = int(self._stream.width or 0)
self._frame_shape = (self._frame_height, self._frame_width, 3)
self._frame_rate = self._compute_frame_rate()
self._fourcc = self._compute_fourcc()
self._frame_format = 0
def close(self) -> None:
"""Close the underlying PyAV container."""
self._container.close()
if self._fast_container is not None:
self._fast_container.close()
self._fast_container = None
self._fast_stream = None
self._fast_first_frame_number = None
def __enter__(self) -> "PyAVVideoBackend":
"""Return self for context manager usage."""
return self
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the backend on exit from a context manager."""
self.close()
def _secs_to_pts(self, t_s: float) -> int:
"""Convert seconds to presentation timestamp units."""
ticks = int(round(t_s / float(self._time_base)))
return self._start_pts + ticks
def _pts_to_secs(self, pts: int) -> float:
"""Convert presentation timestamp units to seconds."""
return float((pts - self._start_pts) * self._time_base)
def _pts_to_frame_number(self, pts: Optional[int], fps: float) -> Optional[int]:
"""Convert a PTS value to a rounded frame number."""
if pts is None:
return None
return int(round(self._pts_to_secs(int(pts)) * fps))
def _frame_time_s(self, pts: Optional[int]) -> float:
"""Return the timestamp for a frame PTS."""
return float("nan") if pts is None else self._pts_to_secs(pts)
def _flush_decoder(self) -> None:
"""Flush decoder buffers if supported."""
try:
self._codec_ctx.flush_buffers()
except Exception:
pass
def _compute_frame_rate(self) -> float:
"""Compute the stream frame rate in frames per second."""
rate = self._stream.average_rate or self._stream.base_rate
return float(rate) if rate is not None else 0.0
def _compute_fourcc(self) -> int:
"""Compute a fourcc code from the stream codec tag."""
tag = self._stream.codec_context.codec_tag
if isinstance(tag, str) and len(tag) >= 4:
tag = tag[:4]
return (
ord(tag[0])
| (ord(tag[1]) << 8)
| (ord(tag[2]) << 16)
| (ord(tag[3]) << 24)
)
return 0
def _ensure_frame_pts(self) -> None:
"""Decode the stream once to collect frame PTS values."""
if self._frame_pts is not None:
return
idx_container = av.open(self._path)
idx_stream = idx_container.streams.video[self._stream.index]
pts_list: List[int] = []
for frame in idx_container.decode(idx_stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._start_pts + len(pts_list)
pts_list.append(int(pts))
idx_container.close()
self._frame_pts = pts_list
self._frame_count = len(pts_list)
def _read_frame_by_pts(self, target_pts: int) -> DecodedFrame:
"""Decode the first frame at or after a target PTS."""
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
if self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
seek_pts = self._keyframes[idx].pts
else:
seek_pts = target_pts
container = av.open(self._path)
stream = container.streams.video[self._stream.index]
try:
container.seek(seek_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
last: Optional[DecodedFrame] = None
for packet in container.demux(stream):
for frame in packet.decode():
pts = frame.pts
cur = DecodedFrame(
image=frame.to_rgb().to_ndarray(),
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if pts >= target_pts:
return cur
last = cur
finally:
container.close()
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
def frame_at_index(self, index: int) -> np.ndarray:
"""Return the decoded frame at a zero-based index."""
self._ensure_frame_pts()
assert self._frame_pts is not None
if index < 0:
index += self._frame_count
if index < 0 or index >= self._frame_count:
raise IndexError("frame index out of range")
target_pts = self._frame_pts[index]
decoded = self._read_frame_by_pts(target_pts)
self._current_frame_pos = float(index)
return decoded.image
@property
def frame_height(self) -> int:
"""Return the video frame height."""
return self._frame_height
@property
def frame_width(self) -> int:
"""Return the video frame width."""
return self._frame_width
@property
def frame_rate(self) -> float:
"""Return the reported frame rate in frames per second."""
return self._frame_rate
@property
def fourcc(self) -> int:
"""Return the fourcc codec identifier."""
return self._fourcc
@property
def frame_format(self) -> int:
"""Return the frame format identifier."""
return self._frame_format
@property
def number_of_frames(self) -> int:
"""Return the total number of frames, decoding if needed."""
if self._frame_count <= 0:
self._ensure_frame_pts()
return self._frame_count
@property
def frame_shape(self) -> tuple:
"""Return the expected frame shape (H, W, C)."""
return self._frame_shape
@property
def current_frame_pos(self) -> float:
"""Return the last frame index accessed."""
return self._current_frame_pos
def _seek_to_pts(self, pts: int, *, backward: bool) -> None:
"""Seek to a timestamp in the stream."""
self._container.seek(pts, stream=self._stream, backward=backward, any_frame=False)
self._flush_decoder()
def _ensure_fast_container(self) -> None:
"""Initialize the fast-seek container if needed."""
if self._fast_container is not None:
return
self._fast_container = av.open(self._path)
self._fast_stream = self._fast_container.streams.video[self._stream.index]
try:
codec_ctx = self._fast_stream.codec_context
codec_ctx.thread_type = "AUTO"
codec_ctx.thread_count = 0
except Exception:
pass
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Scan packets and store keyframe pts/time."""
path = self._container.name
idx_container = av.open(path)
idx_stream = idx_container.streams.video[self._stream.index]
key_pts: List[int] = []
n = 0
for packet in idx_container.demux(idx_stream):
if packet.dts is None and packet.pts is None:
continue
if packet.is_keyframe:
pts = packet.pts if packet.pts is not None else packet.dts
if pts is not None:
key_pts.append(int(pts))
n += 1
if max_packets is not None and n >= max_packets:
break
idx_container.close()
key_pts = sorted(set(key_pts))
if not key_pts:
key_pts = [self._start_pts]
self._keyframes = [KeyframeEntry(pts=p, time_s=self._pts_to_secs(p)) for p in key_pts]
self._index_built = True
self._bucket_to_kfidx.clear()
return self._keyframes
def _keyframe_index_at_or_before_pts(self, target_pts: int) -> int:
"""Return keyframe index at or before the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
if target_pts <= kf[0].pts:
return 0
if target_pts >= kf[-1].pts:
return len(kf) - 1
lo, hi = 0, len(kf) - 1
while lo <= hi:
mid = (lo + hi) // 2
m = kf[mid].pts
if m == target_pts:
return mid
if m < target_pts:
lo = mid + 1
else:
hi = mid - 1
return hi
def _keyframe_index_nearest_pts(self, target_pts: int) -> int:
"""Return nearest keyframe index to the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
i0 = self._keyframe_index_at_or_before_pts(target_pts)
i1 = min(i0 + 1, len(kf) - 1)
if i0 == i1:
return i0
d0 = abs(kf[i0].pts - target_pts)
d1 = abs(kf[i1].pts - target_pts)
return i0 if d0 <= d1 else i1
def _bucket_key(self, t_s: float) -> int:
"""Return a bucket key for the scrub acceleration cache."""
return int(round(t_s * 1000.0 / self._scrub_bucket_ms))
def _keyframe_index_for_time_fast(self, t_s: float, mode: str) -> int:
"""Return a keyframe index using cached time buckets."""
if not self._index_built:
raise RuntimeError("Keyframe index not built. Call build_keyframe_index() first.")
b = self._bucket_key(t_s)
mode_tag = {"previous": 0, "nearest": 1, "next": 2}.get(mode)
if mode_tag is None:
raise ValueError("mode must be one of: 'previous', 'nearest', 'next'")
cache_key = (b << 2) | mode_tag
cached = self._bucket_to_kfidx.get(cache_key)
if cached is not None:
return int(cached)
target_pts = self._secs_to_pts(t_s)
if mode == "previous":
idx = self._keyframe_index_at_or_before_pts(target_pts)
elif mode == "nearest":
idx = self._keyframe_index_nearest_pts(target_pts)
else:
i_prev = self._keyframe_index_at_or_before_pts(target_pts)
if self._keyframes[i_prev].pts >= target_pts:
idx = i_prev
else:
idx = min(i_prev + 1, len(self._keyframes) - 1)
self._bucket_to_kfidx.put(cache_key, idx)
return idx
def read_keyframe_at(
self,
t_s: Number,
*,
mode: str = "previous",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe without GOP forward decoding."""
t_s = float(t_s)
idx = self._keyframe_index_for_time_fast(t_s, mode)
key_pts = self._keyframes[idx].pts
cached = self._frame_cache.get(key_pts)
if cached is not None:
return cached # type: ignore[return-value]
self._seek_to_pts(key_pts, backward=False)
for packet in self._container.demux(self._stream):
for frame in packet.decode():
pts = frame.pts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray()
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
raise RuntimeError("Failed to decode a frame after keyframe seek.")
def read_frame_at(
self,
t_s: Number,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Decode a frame near a timestamp with accurate seeking."""
t_s = float(t_s)
target_pts = self._secs_to_pts(t_s)
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
if use_index and self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
anchor_pts = self._keyframes[idx].pts
self._seek_to_pts(anchor_pts, backward=False)
else:
self._seek_to_pts(target_pts, backward=True)
last: Optional[DecodedFrame] = None
decoded = 0
for packet in self._container.demux(self._stream):
for frame in packet.decode():
decoded += 1
if decoded > max_decode_frames:
raise RuntimeError(
"Exceeded max_decode_frames while seeking; timestamps may be broken."
)
pts = frame.pts
cur = DecodedFrame(
image=frame.to_rgb().to_ndarray(),
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if return_first_after:
if pts >= target_pts:
return cur
last = cur
else:
if pts <= target_pts:
last = cur
elif last is not None:
return last
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
def _read_frame_fast_simple(self, target_pts: int, *, decode_rgb: bool) -> DecodedFrame:
"""Fallback fast seek: seek and decode first frame after PTS."""
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def grab_frame(container: av.container.InputContainer, stream: av.video.stream.VideoStream) -> Optional[DecodedFrame]:
for frame in container.decode(stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
target_reached = True
else:
target_reached = pts >= target_pts
if not target_reached:
continue
if decode_rgb:
img = frame.to_rgb().to_ndarray()
else:
img = frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
return None
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=True,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
raise RuntimeError("Failed to decode a frame after fast seek.")
def _read_frame_fast_opencv_pyav(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
"""Approximate OpenCV seek behavior using PyAV."""
fps = self._frame_rate or 1.0
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def seek_to_frame(frame_index: int) -> None:
target_pts = self._secs_to_pts(frame_index / fps)
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
def frame_number_from_pts(pts: Optional[int]) -> Optional[int]:
num = self._pts_to_frame_number(pts, fps)
if num is None:
return None
if self._fast_first_frame_number is None:
return num
return num - self._fast_first_frame_number
first_frame = None
if self._fast_first_frame_number is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
pts = frame.pts if frame.pts is not None else frame.dts
self._fast_first_frame_number = self._pts_to_frame_number(pts, fps) or 0
first_frame = frame
break
if target_frame <= 0:
if first_frame is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
first_frame = frame
break
if first_frame is None:
raise RuntimeError("Failed to decode a frame after fast seek.")
pts = first_frame.pts if first_frame.pts is not None else first_frame.dts
img = first_frame.to_rgb().to_ndarray() if decode_rgb else first_frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(first_frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
delta = 16
attempts = 0
while True:
start_frame = max(target_frame - delta, 0)
seek_to_frame(start_frame)
decoder = self._fast_container.decode(self._fast_stream)
try:
frame = next(decoder)
except StopIteration:
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = start_frame
if frame_number < 0 or frame_number > target_frame:
if start_frame == 0 or delta >= 1 << 30 or attempts > 20:
break
delta = delta * 2 if delta < 16 else int(delta * 1.5)
attempts += 1
continue
while frame_number < target_frame:
try:
frame = next(decoder)
except StopIteration:
frame = None
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = target_frame
if frame is None:
break
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
target_pts = self._secs_to_pts(target_frame / fps)
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[Number] = None,
decode_rgb: bool = False,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp."""
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if t_s is None:
if index is None:
raise ValueError("Provide either index or t_s")
if index < 0:
index += self.number_of_frames
target_index = int(index)
else:
t_s = float(t_s)
target_index = int(round(t_s * (self._frame_rate or 1.0)))
fps = self._frame_rate or 1.0
target_pts = self._secs_to_pts(target_index / fps)
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
return self._read_frame_fast_opencv_pyav(target_index, decode_rgb=decode_rgb)
_bucket_to_kfidx = _LRU(scrub_bucket_lru_size)
instance-attribute
_codec_ctx = self._stream.codec_context
instance-attribute
_container = av.open(path)
instance-attribute
_current_frame_pos = 0.0
instance-attribute
_fast_container = None
instance-attribute
_fast_first_frame_number = None
instance-attribute
_fast_stream = None
instance-attribute
_fourcc = self._compute_fourcc()
instance-attribute
_frame_cache = _LRU(decoded_frame_cache_size)
instance-attribute
_frame_count = int(self._stream.frames or 0)
instance-attribute
_frame_format = 0
instance-attribute
_frame_height = int(self._stream.height or 0)
instance-attribute
_frame_pts = None
instance-attribute
_frame_rate = self._compute_frame_rate()
instance-attribute
_frame_shape = (self._frame_height, self._frame_width, 3)
instance-attribute
_frame_width = int(self._stream.width or 0)
instance-attribute
_index_built = False
instance-attribute
_keyframes = []
instance-attribute
_path = path
instance-attribute
_scrub_bucket_ms = max(1, int(scrub_bucket_ms))
instance-attribute
_start_pts = self._stream.start_time if self._stream.start_time is not None else 0
instance-attribute
_stream = self._container.streams.video[video_stream_index]
instance-attribute
_time_base = self._stream.time_base
instance-attribute
current_frame_pos
property
Return the last frame index accessed.
fourcc
property
Return the fourcc codec identifier.
frame_format
property
Return the frame format identifier.
frame_height
property
Return the video frame height.
frame_rate
property
Return the reported frame rate in frames per second.
frame_shape
property
Return the expected frame shape (H, W, C).
frame_width
property
Return the video frame width.
number_of_frames
property
Return the total number of frames, decoding if needed.
__enter__()
Return self for context manager usage.
Source code in acvr/_pyav_backend.py
def __enter__(self) -> "PyAVVideoBackend":
"""Return self for context manager usage."""
return self
__exit__(exc_type, exc, tb)
Close the backend on exit from a context manager.
Source code in acvr/_pyav_backend.py
def __exit__(self, exc_type, exc, tb) -> None:
"""Close the backend on exit from a context manager."""
self.close()
__init__(path, video_stream_index=0, *, build_index=False, decoded_frame_cache_size=0, scrub_bucket_ms=100, scrub_bucket_lru_size=4096)
Initialize the PyAV-backed decoder.
Source code in acvr/_pyav_backend.py
def __init__(
self,
path: str,
video_stream_index: int = 0,
*,
build_index: bool = False,
decoded_frame_cache_size: int = 0,
scrub_bucket_ms: int = 100,
scrub_bucket_lru_size: int = 4096,
) -> None:
"""Initialize the PyAV-backed decoder."""
self._path = path
self._container = av.open(path)
self._stream = self._container.streams.video[video_stream_index]
self._codec_ctx = self._stream.codec_context
self._fast_container: Optional[av.container.InputContainer] = None
self._fast_stream: Optional[av.video.stream.VideoStream] = None
self._fast_first_frame_number: Optional[int] = None
self._time_base: Fraction = self._stream.time_base
self._start_pts: int = self._stream.start_time if self._stream.start_time is not None else 0
self._keyframes: List[KeyframeEntry] = []
self._index_built: bool = False
self._frame_pts: Optional[List[int]] = None
self._frame_count: int = int(self._stream.frames or 0)
self._current_frame_pos: float = 0.0
self._frame_cache = _LRU(decoded_frame_cache_size)
self._scrub_bucket_ms = max(1, int(scrub_bucket_ms))
self._bucket_to_kfidx = _LRU(scrub_bucket_lru_size)
if build_index:
self.build_keyframe_index()
self._frame_height = int(self._stream.height or 0)
self._frame_width = int(self._stream.width or 0)
self._frame_shape = (self._frame_height, self._frame_width, 3)
self._frame_rate = self._compute_frame_rate()
self._fourcc = self._compute_fourcc()
self._frame_format = 0
_bucket_key(t_s)
Return a bucket key for the scrub acceleration cache.
Source code in acvr/_pyav_backend.py
def _bucket_key(self, t_s: float) -> int:
"""Return a bucket key for the scrub acceleration cache."""
return int(round(t_s * 1000.0 / self._scrub_bucket_ms))
_compute_fourcc()
Compute a fourcc code from the stream codec tag.
Source code in acvr/_pyav_backend.py
def _compute_fourcc(self) -> int:
"""Compute a fourcc code from the stream codec tag."""
tag = self._stream.codec_context.codec_tag
if isinstance(tag, str) and len(tag) >= 4:
tag = tag[:4]
return (
ord(tag[0])
| (ord(tag[1]) << 8)
| (ord(tag[2]) << 16)
| (ord(tag[3]) << 24)
)
return 0
_compute_frame_rate()
Compute the stream frame rate in frames per second.
Source code in acvr/_pyav_backend.py
def _compute_frame_rate(self) -> float:
"""Compute the stream frame rate in frames per second."""
rate = self._stream.average_rate or self._stream.base_rate
return float(rate) if rate is not None else 0.0
_ensure_fast_container()
Initialize the fast-seek container if needed.
Source code in acvr/_pyav_backend.py
def _ensure_fast_container(self) -> None:
"""Initialize the fast-seek container if needed."""
if self._fast_container is not None:
return
self._fast_container = av.open(self._path)
self._fast_stream = self._fast_container.streams.video[self._stream.index]
try:
codec_ctx = self._fast_stream.codec_context
codec_ctx.thread_type = "AUTO"
codec_ctx.thread_count = 0
except Exception:
pass
_ensure_frame_pts()
Decode the stream once to collect frame PTS values.
Source code in acvr/_pyav_backend.py
def _ensure_frame_pts(self) -> None:
"""Decode the stream once to collect frame PTS values."""
if self._frame_pts is not None:
return
idx_container = av.open(self._path)
idx_stream = idx_container.streams.video[self._stream.index]
pts_list: List[int] = []
for frame in idx_container.decode(idx_stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
pts = self._start_pts + len(pts_list)
pts_list.append(int(pts))
idx_container.close()
self._frame_pts = pts_list
self._frame_count = len(pts_list)
_flush_decoder()
Flush decoder buffers if supported.
Source code in acvr/_pyav_backend.py
def _flush_decoder(self) -> None:
"""Flush decoder buffers if supported."""
try:
self._codec_ctx.flush_buffers()
except Exception:
pass
_frame_time_s(pts)
Return the timestamp for a frame PTS.
Source code in acvr/_pyav_backend.py
def _frame_time_s(self, pts: Optional[int]) -> float:
"""Return the timestamp for a frame PTS."""
return float("nan") if pts is None else self._pts_to_secs(pts)
_keyframe_index_at_or_before_pts(target_pts)
Return keyframe index at or before the target PTS.
Source code in acvr/_pyav_backend.py
def _keyframe_index_at_or_before_pts(self, target_pts: int) -> int:
"""Return keyframe index at or before the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
if target_pts <= kf[0].pts:
return 0
if target_pts >= kf[-1].pts:
return len(kf) - 1
lo, hi = 0, len(kf) - 1
while lo <= hi:
mid = (lo + hi) // 2
m = kf[mid].pts
if m == target_pts:
return mid
if m < target_pts:
lo = mid + 1
else:
hi = mid - 1
return hi
_keyframe_index_for_time_fast(t_s, mode)
Return a keyframe index using cached time buckets.
Source code in acvr/_pyav_backend.py
def _keyframe_index_for_time_fast(self, t_s: float, mode: str) -> int:
"""Return a keyframe index using cached time buckets."""
if not self._index_built:
raise RuntimeError("Keyframe index not built. Call build_keyframe_index() first.")
b = self._bucket_key(t_s)
mode_tag = {"previous": 0, "nearest": 1, "next": 2}.get(mode)
if mode_tag is None:
raise ValueError("mode must be one of: 'previous', 'nearest', 'next'")
cache_key = (b << 2) | mode_tag
cached = self._bucket_to_kfidx.get(cache_key)
if cached is not None:
return int(cached)
target_pts = self._secs_to_pts(t_s)
if mode == "previous":
idx = self._keyframe_index_at_or_before_pts(target_pts)
elif mode == "nearest":
idx = self._keyframe_index_nearest_pts(target_pts)
else:
i_prev = self._keyframe_index_at_or_before_pts(target_pts)
if self._keyframes[i_prev].pts >= target_pts:
idx = i_prev
else:
idx = min(i_prev + 1, len(self._keyframes) - 1)
self._bucket_to_kfidx.put(cache_key, idx)
return idx
_keyframe_index_nearest_pts(target_pts)
Return nearest keyframe index to the target PTS.
Source code in acvr/_pyav_backend.py
def _keyframe_index_nearest_pts(self, target_pts: int) -> int:
"""Return nearest keyframe index to the target PTS."""
kf = self._keyframes
if not self._index_built or not kf:
return 0
i0 = self._keyframe_index_at_or_before_pts(target_pts)
i1 = min(i0 + 1, len(kf) - 1)
if i0 == i1:
return i0
d0 = abs(kf[i0].pts - target_pts)
d1 = abs(kf[i1].pts - target_pts)
return i0 if d0 <= d1 else i1
_pts_to_frame_number(pts, fps)
Convert a PTS value to a rounded frame number.
Source code in acvr/_pyav_backend.py
def _pts_to_frame_number(self, pts: Optional[int], fps: float) -> Optional[int]:
"""Convert a PTS value to a rounded frame number."""
if pts is None:
return None
return int(round(self._pts_to_secs(int(pts)) * fps))
_pts_to_secs(pts)
Convert presentation timestamp units to seconds.
Source code in acvr/_pyav_backend.py
def _pts_to_secs(self, pts: int) -> float:
"""Convert presentation timestamp units to seconds."""
return float((pts - self._start_pts) * self._time_base)
_read_frame_by_pts(target_pts)
Decode the first frame at or after a target PTS.
Source code in acvr/_pyav_backend.py
def _read_frame_by_pts(self, target_pts: int) -> DecodedFrame:
"""Decode the first frame at or after a target PTS."""
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
if self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
seek_pts = self._keyframes[idx].pts
else:
seek_pts = target_pts
container = av.open(self._path)
stream = container.streams.video[self._stream.index]
try:
container.seek(seek_pts, stream=stream, backward=True, any_frame=False)
try:
stream.codec_context.flush_buffers()
except Exception:
pass
last: Optional[DecodedFrame] = None
for packet in container.demux(stream):
for frame in packet.decode():
pts = frame.pts
cur = DecodedFrame(
image=frame.to_rgb().to_ndarray(),
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if pts >= target_pts:
return cur
last = cur
finally:
container.close()
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
_read_frame_fast_opencv_pyav(target_frame, *, decode_rgb)
Approximate OpenCV seek behavior using PyAV.
Source code in acvr/_pyav_backend.py
def _read_frame_fast_opencv_pyav(self, target_frame: int, *, decode_rgb: bool) -> DecodedFrame:
"""Approximate OpenCV seek behavior using PyAV."""
fps = self._frame_rate or 1.0
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def seek_to_frame(frame_index: int) -> None:
target_pts = self._secs_to_pts(frame_index / fps)
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
def frame_number_from_pts(pts: Optional[int]) -> Optional[int]:
num = self._pts_to_frame_number(pts, fps)
if num is None:
return None
if self._fast_first_frame_number is None:
return num
return num - self._fast_first_frame_number
first_frame = None
if self._fast_first_frame_number is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
pts = frame.pts if frame.pts is not None else frame.dts
self._fast_first_frame_number = self._pts_to_frame_number(pts, fps) or 0
first_frame = frame
break
if target_frame <= 0:
if first_frame is None:
seek_to_frame(0)
for frame in self._fast_container.decode(self._fast_stream):
first_frame = frame
break
if first_frame is None:
raise RuntimeError("Failed to decode a frame after fast seek.")
pts = first_frame.pts if first_frame.pts is not None else first_frame.dts
img = first_frame.to_rgb().to_ndarray() if decode_rgb else first_frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(first_frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
delta = 16
attempts = 0
while True:
start_frame = max(target_frame - delta, 0)
seek_to_frame(start_frame)
decoder = self._fast_container.decode(self._fast_stream)
try:
frame = next(decoder)
except StopIteration:
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = start_frame
if frame_number < 0 or frame_number > target_frame:
if start_frame == 0 or delta >= 1 << 30 or attempts > 20:
break
delta = delta * 2 if delta < 16 else int(delta * 1.5)
attempts += 1
continue
while frame_number < target_frame:
try:
frame = next(decoder)
except StopIteration:
frame = None
break
pts = frame.pts if frame.pts is not None else frame.dts
frame_number = frame_number_from_pts(pts)
if frame_number is None:
frame_number = target_frame
if frame is None:
break
pts = frame.pts if frame.pts is not None else frame.dts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
target_pts = self._secs_to_pts(target_frame / fps)
return self._read_frame_fast_simple(target_pts, decode_rgb=decode_rgb)
_read_frame_fast_simple(target_pts, *, decode_rgb)
Fallback fast seek: seek and decode first frame after PTS.
Source code in acvr/_pyav_backend.py
def _read_frame_fast_simple(self, target_pts: int, *, decode_rgb: bool) -> DecodedFrame:
"""Fallback fast seek: seek and decode first frame after PTS."""
self._ensure_fast_container()
assert self._fast_container is not None
assert self._fast_stream is not None
def grab_frame(container: av.container.InputContainer, stream: av.video.stream.VideoStream) -> Optional[DecodedFrame]:
for frame in container.decode(stream):
pts = frame.pts if frame.pts is not None else frame.dts
if pts is None:
target_reached = True
else:
target_reached = pts >= target_pts
if not target_reached:
continue
if decode_rgb:
img = frame.to_rgb().to_ndarray()
else:
img = frame.to_ndarray(format="bgr24")
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
return None
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=True,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
self._fast_container.seek(
target_pts,
stream=self._fast_stream,
backward=True,
any_frame=False,
)
try:
self._fast_stream.codec_context.flush_buffers()
except Exception:
pass
grabbed = grab_frame(self._fast_container, self._fast_stream)
if grabbed is not None:
return grabbed
raise RuntimeError("Failed to decode a frame after fast seek.")
_secs_to_pts(t_s)
Convert seconds to presentation timestamp units.
Source code in acvr/_pyav_backend.py
def _secs_to_pts(self, t_s: float) -> int:
"""Convert seconds to presentation timestamp units."""
ticks = int(round(t_s / float(self._time_base)))
return self._start_pts + ticks
_seek_to_pts(pts, *, backward)
Seek to a timestamp in the stream.
Source code in acvr/_pyav_backend.py
def _seek_to_pts(self, pts: int, *, backward: bool) -> None:
"""Seek to a timestamp in the stream."""
self._container.seek(pts, stream=self._stream, backward=backward, any_frame=False)
self._flush_decoder()
build_keyframe_index(*, max_packets=None)
Scan packets and store keyframe pts/time.
Source code in acvr/_pyav_backend.py
def build_keyframe_index(self, *, max_packets: Optional[int] = None) -> List[KeyframeEntry]:
"""Scan packets and store keyframe pts/time."""
path = self._container.name
idx_container = av.open(path)
idx_stream = idx_container.streams.video[self._stream.index]
key_pts: List[int] = []
n = 0
for packet in idx_container.demux(idx_stream):
if packet.dts is None and packet.pts is None:
continue
if packet.is_keyframe:
pts = packet.pts if packet.pts is not None else packet.dts
if pts is not None:
key_pts.append(int(pts))
n += 1
if max_packets is not None and n >= max_packets:
break
idx_container.close()
key_pts = sorted(set(key_pts))
if not key_pts:
key_pts = [self._start_pts]
self._keyframes = [KeyframeEntry(pts=p, time_s=self._pts_to_secs(p)) for p in key_pts]
self._index_built = True
self._bucket_to_kfidx.clear()
return self._keyframes
close()
Close the underlying PyAV container.
Source code in acvr/_pyav_backend.py
def close(self) -> None:
"""Close the underlying PyAV container."""
self._container.close()
if self._fast_container is not None:
self._fast_container.close()
self._fast_container = None
self._fast_stream = None
self._fast_first_frame_number = None
frame_at_index(index)
Return the decoded frame at a zero-based index.
Source code in acvr/_pyav_backend.py
def frame_at_index(self, index: int) -> np.ndarray:
"""Return the decoded frame at a zero-based index."""
self._ensure_frame_pts()
assert self._frame_pts is not None
if index < 0:
index += self._frame_count
if index < 0 or index >= self._frame_count:
raise IndexError("frame index out of range")
target_pts = self._frame_pts[index]
decoded = self._read_frame_by_pts(target_pts)
self._current_frame_pos = float(index)
return decoded.image
read_frame_at(t_s, *, return_first_after=True, max_decode_frames=10000, use_index=True)
Decode a frame near a timestamp with accurate seeking.
Source code in acvr/_pyav_backend.py
def read_frame_at(
self,
t_s: Number,
*,
return_first_after: bool = True,
max_decode_frames: int = 10_000,
use_index: bool = True,
) -> DecodedFrame:
"""Decode a frame near a timestamp with accurate seeking."""
t_s = float(t_s)
target_pts = self._secs_to_pts(t_s)
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
if use_index and self._index_built:
idx = self._keyframe_index_at_or_before_pts(target_pts)
anchor_pts = self._keyframes[idx].pts
self._seek_to_pts(anchor_pts, backward=False)
else:
self._seek_to_pts(target_pts, backward=True)
last: Optional[DecodedFrame] = None
decoded = 0
for packet in self._container.demux(self._stream):
for frame in packet.decode():
decoded += 1
if decoded > max_decode_frames:
raise RuntimeError(
"Exceeded max_decode_frames while seeking; timestamps may be broken."
)
pts = frame.pts
cur = DecodedFrame(
image=frame.to_rgb().to_ndarray(),
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
if pts is None:
last = cur
continue
if return_first_after:
if pts >= target_pts:
return cur
last = cur
else:
if pts <= target_pts:
last = cur
elif last is not None:
return last
if last is not None:
return last
raise RuntimeError("Could not decode any frames after seeking.")
read_frame_fast(*, index=None, t_s=None, decode_rgb=False)
Return a fast, approximate frame for an index or timestamp.
Source code in acvr/_pyav_backend.py
def read_frame_fast(
self,
*,
index: Optional[int] = None,
t_s: Optional[Number] = None,
decode_rgb: bool = False,
) -> DecodedFrame:
"""Return a fast, approximate frame for an index or timestamp."""
if index is None and t_s is None:
raise ValueError("Provide either index or t_s")
if index is not None and t_s is not None:
raise ValueError("Provide only one of index or t_s")
if t_s is None:
if index is None:
raise ValueError("Provide either index or t_s")
if index < 0:
index += self.number_of_frames
target_index = int(index)
else:
t_s = float(t_s)
target_index = int(round(t_s * (self._frame_rate or 1.0)))
fps = self._frame_rate or 1.0
target_pts = self._secs_to_pts(target_index / fps)
cached = self._frame_cache.get(target_pts)
if cached is not None:
return cached # type: ignore[return-value]
return self._read_frame_fast_opencv_pyav(target_index, decode_rgb=decode_rgb)
read_keyframe_at(t_s, *, mode='previous', decode_rgb=True)
Return a nearby keyframe without GOP forward decoding.
Source code in acvr/_pyav_backend.py
def read_keyframe_at(
self,
t_s: Number,
*,
mode: str = "previous",
decode_rgb: bool = True,
) -> DecodedFrame:
"""Return a nearby keyframe without GOP forward decoding."""
t_s = float(t_s)
idx = self._keyframe_index_for_time_fast(t_s, mode)
key_pts = self._keyframes[idx].pts
cached = self._frame_cache.get(key_pts)
if cached is not None:
return cached # type: ignore[return-value]
self._seek_to_pts(key_pts, backward=False)
for packet in self._container.demux(self._stream):
for frame in packet.decode():
pts = frame.pts
img = frame.to_rgb().to_ndarray() if decode_rgb else frame.to_ndarray()
cur = DecodedFrame(
image=img,
pts=pts,
time_s=self._frame_time_s(pts),
key_frame=bool(getattr(frame, "key_frame", False)),
)
if pts is not None:
self._frame_cache.put(int(pts), cur)
return cur
raise RuntimeError("Failed to decode a frame after keyframe seek.")