Documentation Index
Fetch the complete documentation index at: https://visionagents.ai/llms.txt
Use this file to discover all available pages before exploring further.
Processors extend the agent’s capabilities by analyzing and transforming audio/video streams in real-time. They can access incoming media, provide state to the LLM, and publish transformed streams back to the call.
Common use cases:
- Video analysis — Pose detection, object recognition, scene understanding
- Media transformation — Video effects, avatars, filters
- State injection — Feed detection results or external data to the LLM
Class Hierarchy
All processors inherit from the abstract Processor base class:
| Class | Purpose |
|---|
Processor | Abstract base class with name, close(), and attach_agent() |
VideoProcessor | Receives video tracks via process_video() |
VideoPublisher | Outputs video via publish_video_track() |
VideoProcessorPublisher | Receives and outputs video (e.g., annotated frames) |
AudioProcessor | Receives audio via process_audio() |
AudioPublisher | Outputs audio via publish_audio_track() |
AudioProcessorPublisher | Receives and outputs audio |
Base Processor
All processors must implement name and close(). The attach_agent() method is optional.
from vision_agents.core.processors import Processor
class MyProcessor(Processor):
name = "my_processor"
async def close(self) -> None:
"""Clean up resources when the agent stops."""
pass
def attach_agent(self, agent: "Agent") -> None:
"""Called once when the processor is registered with an agent.
Use this to access agent.events for custom event registration.
"""
self._agent = agent
Video Processor
Receives video tracks from participants. The agent provides a shared VideoForwarder that distributes frames to all processors.
from typing import Optional
import aiortc
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder
class MyVideoProcessor(VideoProcessor):
name = "my_video_processor"
def __init__(self):
self._forwarder: Optional[VideoForwarder] = None
async def process_video(
self,
track: aiortc.VideoStreamTrack,
participant_id: Optional[str],
shared_forwarder: Optional[VideoForwarder] = None,
) -> None:
"""Called when a participant publishes video."""
self._forwarder = shared_forwarder
self._forwarder.add_frame_handler(
self._on_frame, fps=5.0, name="my_handler"
)
async def _on_frame(self, frame):
# Process the frame
pass
async def stop_processing(self) -> None:
"""Called when video tracks are removed."""
if self._forwarder:
await self._forwarder.remove_frame_handler(self._on_frame)
self._forwarder = None
async def close(self) -> None:
await self.stop_processing()
Video Publisher
Outputs a video track to the call (e.g., AI-generated video or avatars).
import aiortc
from vision_agents.core.processors import VideoPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack
class MyVideoPublisher(VideoPublisher):
name = "my_video_publisher"
def __init__(self):
self._track = QueuedVideoTrack(width=1280, height=720, fps=30)
def publish_video_track(self) -> aiortc.VideoStreamTrack:
"""Return the track to publish."""
return self._track
async def close(self) -> None:
self._track.stop()
Video Processor + Publisher
For processors that receive video and output transformed frames (e.g., object detection with annotations).
import av
from typing import Optional
import aiortc
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_forwarder import VideoForwarder
from vision_agents.core.utils.video_track import QueuedVideoTrack
class AnnotationProcessor(VideoProcessorPublisher):
name = "annotation_processor"
def __init__(self, fps: int = 30):
self.fps = fps
self._forwarder: Optional[VideoForwarder] = None
self._track = QueuedVideoTrack()
async def process_video(
self,
track: aiortc.VideoStreamTrack,
participant_id: Optional[str],
shared_forwarder: Optional[VideoForwarder] = None,
) -> None:
self._forwarder = shared_forwarder
self._forwarder.add_frame_handler(
self._process_frame, fps=float(self.fps), name="annotator"
)
async def _process_frame(self, frame: av.VideoFrame):
# Transform the frame
annotated = self._annotate(frame)
await self._track.add_frame(annotated)
def _annotate(self, frame: av.VideoFrame) -> av.VideoFrame:
# Add annotations to frame
return frame
def publish_video_track(self) -> aiortc.VideoStreamTrack:
return self._track
async def stop_processing(self) -> None:
if self._forwarder:
await self._forwarder.remove_frame_handler(self._process_frame)
self._forwarder = None
async def close(self) -> None:
await self.stop_processing()
self._track.stop()
Audio Processor
Receives audio data from participants. Audio is delivered as PcmData chunks.
from getstream.video.rtc import PcmData
from vision_agents.core.processors import AudioProcessor
class MyAudioProcessor(AudioProcessor):
name = "my_audio_processor"
async def process_audio(self, audio_data: PcmData) -> None:
"""Process incoming audio.
Args:
audio_data: Contains samples, sample_rate, channels, and participant info.
"""
samples = audio_data.samples
participant = audio_data.participant
# Process audio...
async def close(self) -> None:
pass
Audio Publisher
Outputs an audio track to the call.
import aiortc
from vision_agents.core.processors import AudioPublisher
class MyAudioPublisher(AudioPublisher):
name = "my_audio_publisher"
def publish_audio_track(self) -> aiortc.AudioStreamTrack:
"""Return the audio track to publish."""
return self._audio_track
async def close(self) -> None:
pass
Usage
Pass processors to the agent at initialization:
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, openai
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="AI Assistant", id="agent"),
llm=openai.Realtime(),
processors=[your_processor],
)