Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

how to read video from IStreamReader? #1150

Closed
Labels
@horizon86

Description

In the OpenCV type hints (.../miniforge3/envs/main/lib/python3.11/site-packages/cv2/__init__.pyi), the VideoCapture class is defined to accept an IStreamReader object via its constructor:

class IStreamReader:
 # Functions
 def read(self, buffer: str, size: int) -> int: ...
 def seek(self, offset: int, origin: int) -> int: ...
class VideoCapture:
 # Functions
 ...
 @_typing.overload
 def __init__(self, source: IStreamReader, apiPreference: int, params: _typing.Sequence[int]) -> None: ...

However, when I create a subclass of IStreamReader and implement the read() and seek() methods, I receive the following error when initializing VideoCapture:

cv2.error: OpenCV(4.12.0) :-1: error: (-5:Bad argument) in function 'VideoCapture'
> Overload resolution failed:
> - VideoCapture() takes at most 2 arguments (3 given)
> - Expected 'filename' to be a str or path-like object
> - VideoCapture() takes at most 2 arguments (3 given)
> - Argument 'index' is required to be an integer
> - Input stream should be derived from io.BufferedIOBase

This error persists even when using BytesIO instead of a subclass of IStreamReader.

You can reproduce the issue with the following code (using a valid URL to download a video):

import cv2
import requests
import time
from typing import Optional
from logging import Logger, getLogger
class URLStreamReader(cv2.IStreamReader):
 """
 URL video stream reader that inherits from cv2.IStreamReader
 Pre-downloads the entire video to memory, then provides streaming access interface
 """
 
 def __init__(self, video_url: str, logger: Optional[Logger] = None):
 """
 Initialize URLStreamReader, pre-download video to memory

 Args:
 video_url: URL address of the video
 logger: Logger instance, uses default logger if not provided
 """
 super().__init__()
 self.video_url = video_url
 self.video_data = b''
 self.position = 0
 self.logger = logger or getLogger(__name__)
 self._download_video()
 
 def _download_video(self):
 """Download video to memory buffer"""
 self.logger.info(f"Starting video download: {self.video_url}")
 start_time = time.time()
 
 try:
 response = requests.get(self.video_url, stream=True)
 response.raise_for_status()
 
 chunks = []
 total_size = 0
 for chunk in response.iter_content(chunk_size=8192):
 if chunk:
 chunks.append(chunk)
 total_size += len(chunk)
 
 self.video_data = b''.join(chunks)
 download_time = time.time() - start_time
 self.logger.info(f"Video download completed, size: {total_size / 1024 / 1024:.2f}MB, time: {download_time:.2f}s")
 
 except Exception as e:
 self.logger.error(f"Video download failed: {e}")
 raise
 
 def read(self, buffer, size: int) -> int:
 """
 Read video data to buffer

 Args:
 buffer: Target buffer (treated as bytes object)
 size: Number of bytes to read

 Returns:
 Actual number of bytes read
 """
 if self.position >= len(self.video_data):
 return 0
 
 # Calculate actual readable bytes
 available = len(self.video_data) - self.position
 bytes_to_read = min(size, available)
 
 if bytes_to_read <= 0:
 return 0
 
 # Get data to read
 data = self.video_data[self.position:self.position + bytes_to_read]
 self.position += bytes_to_read
 
 # Try to write data to buffer
 try:
 # If buffer is bytearray or other mutable byte object
 if hasattr(buffer, '__setitem__') and hasattr(buffer, '__len__'):
 # Ensure buffer is large enough
 buffer_len = len(buffer) if hasattr(buffer, '__len__') else size
 copy_len = min(bytes_to_read, buffer_len)
 
 # Copy data to buffer byte by byte
 for i in range(copy_len):
 buffer[i] = data[i]
 
 return copy_len
 
 # If buffer has write method (file-like object)
 elif hasattr(buffer, 'write'):
 buffer.write(data)
 return bytes_to_read
 
 # If buffer is memoryview
 elif isinstance(buffer, memoryview):
 copy_len = min(bytes_to_read, len(buffer))
 buffer[:copy_len] = data[:copy_len]
 return copy_len
 
 # Other cases, try direct assignment (may not succeed, but worth trying)
 else:
 # In this case, we cannot modify buffer, only return bytes read
 # OpenCV's C++ layer may get data through other mechanisms
 return bytes_to_read
 
 except Exception as e:
 self.logger.warning(f"Cannot write to buffer: {e}, buffer type: {type(buffer)}")
 # Even if write fails, return bytes read
 # OpenCV may get data through other ways
 return bytes_to_read
 
 def seek(self, offset: int, origin: int) -> int:
 """
 Set read position

 Args:
 offset: Offset value
 origin: Starting position
 0 (SEEK_SET): From file beginning
 1 (SEEK_CUR): From current position
 2 (SEEK_END): From file end

 Returns:
 New position
 """
 if origin == 0: # SEEK_SET
 new_position = offset
 elif origin == 1: # SEEK_CUR
 new_position = self.position + offset
 elif origin == 2: # SEEK_END
 new_position = len(self.video_data) + offset
 else:
 raise ValueError(f"Invalid origin value: {origin}")
 
 # Ensure position is within valid range
 self.position = max(0, min(new_position, len(self.video_data)))
 
 return self.position
# Usage example
if __name__ == "__main__":
 
 # Example URL (please replace with actual video URL)
 video_url = "https://lf26-bot-platform-tos-sign.coze.cn/bot-studio-bot-platform/bot_files/353388233497496/video/mp4/7560697486898741257/upload?lk3s=50ccb0c5&x-expires=1760967080&x-signature=%2FHQ8qDwVP%2Ftoq2xPSKw5mP2mUdM%3D"
 url_reader = URLStreamReader(video_url)
 cap = cv2.VideoCapture(url_reader, cv2.CAP_FFMPEG, None)
 
 if cap.isOpened():
 print(f"Video opened successfully, total frames: {cap.get(cv2.CAP_PROP_FRAME_COUNT)}")
 print(f"Video FPS: {cap.get(cv2.CAP_PROP_FPS)}")
 ret, frame = cap.read()
 if ret:
 print(f"Successfully read first frame, size: {frame.shape}")
 
 cap.release()
 url_reader.close()

Additional Question
I know that passing a URL directly works, but I’d like to know if there’s a way to read and process a video directly from memory (e.g., from a byte buffer or an in-memory stream) without writing it to disk.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

      Relationships

      None yet

      Development

      No branches or pull requests

      Issue actions

        AltStyle によって変換されたページ (->オリジナル) /