Documentation Index Fetch the complete documentation index at: https://visionagents.ai/llms.txt
Use this file to discover all available pages before exploring further.
Processors analyze, transform, or publish audio and video streams in real-time. Build custom processors or use built-in ones like YOLO pose detection. For lip-synced avatars, use the LiveAvatar integration via the agent’s avatar parameter.
How Video Processing Works
When a participant publishes video, the agent creates a VideoForwarder that reads frames from the track. This forwarder is shared with all video processors via the process_video() method. Each processor registers a frame handler with its desired FPS.
Participant Video Track
│
▼
VideoForwarder (shared)
│
┌────┼────┐
▼ ▼ ▼
Proc1 Proc2 Proc3 (each at independent FPS)
Building a Custom Processor
A minimal video processor that logs frames:
import aiortc
import av
from typing import Optional
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder
class FrameLogger ( VideoProcessor ):
name = "frame_logger"
def __init__ ( self , fps : int = 5 ):
self .fps = fps
self .frame_count = 0
self ._forwarder: Optional[VideoForwarder] = None
async def process_video (
self ,
track : aiortc.VideoStreamTrack,
participant_id : Optional[ str ],
shared_forwarder : Optional[VideoForwarder] = None ,
) -> None :
self ._forwarder = shared_forwarder
self ._forwarder.add_frame_handler(
self ._log_frame,
fps = float ( self .fps),
name = "frame_logger" ,
)
async def _log_frame ( self , frame : av.VideoFrame):
self .frame_count += 1
print ( f "Frame { self .frame_count } ( { frame.width } x { frame.height } )" )
async def stop_processing ( self ) -> None :
if self ._forwarder:
await self ._forwarder.remove_frame_handler( self ._log_frame)
self ._forwarder = None
async def close ( self ) -> None :
await self .stop_processing()
Key patterns:
Set name as a class attribute
Store a reference to shared_forwarder to remove handlers later
Register handlers with add_frame_handler(callback, fps, name)
Clean up in stop_processing() (called when tracks are removed) and close()
Use VideoProcessorPublisher with QueuedVideoTrack to publish transformed video back to the call:
import aiortc
import av
import cv2
from typing import Optional
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.utils.video_forwarder import VideoForwarder
class GrayscaleProcessor ( VideoProcessorPublisher ):
name = "grayscale"
def __init__ ( self , fps : int = 30 ):
self .fps = fps
self ._forwarder: Optional[VideoForwarder] = None
self ._video_track = QueuedVideoTrack()
async def process_video (
self ,
track : aiortc.VideoStreamTrack,
participant_id : Optional[ str ],
shared_forwarder : Optional[VideoForwarder] = None ,
) -> None :
# Remove existing handler if switching tracks
if self ._forwarder:
await self ._forwarder.remove_frame_handler( self ._process_frame)
self ._forwarder = shared_forwarder
self ._forwarder.add_frame_handler(
self ._process_frame,
fps = float ( self .fps),
name = "grayscale" ,
)
async def _process_frame ( self , frame : av.VideoFrame):
img = frame.to_ndarray( format = "rgb24" )
gray = cv2.cvtColor(img, cv2. COLOR_RGB2GRAY )
rgb = cv2.cvtColor(gray, cv2. COLOR_GRAY2RGB )
new_frame = av.VideoFrame.from_ndarray(rgb, format = "rgb24" )
await self ._video_track.add_frame(new_frame)
def publish_video_track ( self ) -> aiortc.VideoStreamTrack:
return self ._video_track
async def stop_processing ( self ) -> None :
if self ._forwarder:
await self ._forwarder.remove_frame_handler( self ._process_frame)
self ._forwarder = None
async def close ( self ) -> None :
await self .stop_processing()
self ._video_track.stop()
Key utilities:
QueuedVideoTrack — Writable video track. Call add_frame() to queue frames for publishing.
VideoForwarder — Distributes incoming frames to handlers at independent FPS rates.
Emitting Custom Events
Use attach_agent() to register custom events that other parts of your application can subscribe to:
from dataclasses import dataclass
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.events import Event
@dataclass
class ObjectDetectedEvent ( Event ):
objects: list[ str ]
frame_number: int
class DetectionProcessor ( VideoProcessorPublisher ):
name = "detection"
def attach_agent ( self , agent ):
self ._events = agent.events
self ._events.register(ObjectDetectedEvent)
async def _process_frame ( self , frame ):
objects = self ._detect(frame)
await self ._events.emit(ObjectDetectedEvent(
objects = objects,
frame_number = self .frame_count,
))
YOLO Pose Detection
The Ultralytics plugin provides YOLOPoseProcessor for real-time pose detection with skeleton overlays:
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini, ultralytics
agent = Agent(
edge = getstream.Edge(),
agent_user = User( name = "Golf Coach" , id = "agent" ),
instructions = "Analyze the user's golf swing." ,
llm = gemini.Realtime( fps = 10 ),
processors = [
ultralytics.YOLOPoseProcessor(
model_path = "yolo11n-pose.pt" ,
fps = 30 ,
conf_threshold = 0.5 ,
)
],
)
Use cases: Golf coaching, fitness form checking, dance instruction, physical therapy.
See Ultralytics for parameters and model options.
LiveAvatar
The LiveAvatar plugin provides liveavatar.Avatar() for lip-syncing AI avatars that speak agent responses.
Use cases: Virtual presenters, customer service avatars, interactive tutors.
See LiveAvatar for setup and examples.
Next Steps
Processors Class Base class API reference
Ultralytics YOLO Pose detection reference
LiveAvatar Avatar integration reference
Event System Custom events reference