from vision_agents.core.llm import VideoLLM
from vision_agents.core.utils import VideoForwarder
from collections import deque
import base64
from PIL import Image
import io
class MyVLM(VideoLLM):
def __init__(self, model: str, fps: int = 1, frame_buffer_seconds: int = 10):
super().__init__()
self.model = model
self.fps = fps
self._frame_buffer = deque(maxlen=fps * frame_buffer_seconds)
self._forwarder = None
self._handler_id = None
async def watch_video_track(self, track, shared_forwarder=None):
"""Subscribe to video frames from the call."""
# Use shared forwarder if provided, otherwise create own
if shared_forwarder:
self._forwarder = shared_forwarder
else:
self._forwarder = VideoForwarder(track)
await self._forwarder.start()
# Register frame handler at desired FPS
self._handler_id = self._forwarder.add_frame_handler(
on_frame=self._on_frame,
fps=self.fps,
name="my_vlm",
)
def _on_frame(self, frame):
"""Called for each video frame."""
# Convert to PIL Image
img = frame.to_image()
self._frame_buffer.append(img)
async def stop_watching_video_track(self):
"""Unsubscribe from video frames."""
if self._forwarder and self._handler_id:
self._forwarder.remove_frame_handler(self._handler_id)
async def simple_response(self, text: str, processors=None, participant=None):
"""Generate response using buffered frames."""
# Encode frames as base64 for API
images = []
for img in self._frame_buffer:
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
images.append(base64.b64encode(buffer.getvalue()).decode())
# Call provider with images
response = await self.provider.chat(
messages=[{"role": "user", "content": text}],
images=images,
)
return self._create_response_event(response)
async def close(self):
await self.stop_watching_video_track()
if self._forwarder and not self._forwarder._shared:
await self._forwarder.stop()