Skip to main content
Processors analyze, transform, or publish audio and video streams in real-time. Build custom processors or use built-in ones like YOLO pose detection and HeyGen avatars.
For the base class API reference, see Processors Class.

How Video Processing Works

When a participant publishes video, the agent creates a VideoForwarder that reads frames from the track. This forwarder is shared with all video processors via the process_video() method. Each processor registers a frame handler with its desired FPS.
Participant Video Track


   VideoForwarder (shared)

    ┌────┼────┐
    ▼    ▼    ▼
 Proc1 Proc2 Proc3  (each at independent FPS)

Building a Custom Processor

A minimal video processor that logs frames:
import aiortc
import av
from typing import Optional
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder

class FrameLogger(VideoProcessor):
    name = "frame_logger"

    def __init__(self, fps: int = 5):
        self.fps = fps
        self.frame_count = 0
        self._forwarder: Optional[VideoForwarder] = None

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._log_frame,
            fps=float(self.fps),
            name="frame_logger",
        )

    async def _log_frame(self, frame: av.VideoFrame):
        self.frame_count += 1
        print(f"Frame {self.frame_count} ({frame.width}x{frame.height})")

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._log_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
Key patterns:
  • Set name as a class attribute
  • Store a reference to shared_forwarder to remove handlers later
  • Register handlers with add_frame_handler(callback, fps, name)
  • Clean up in stop_processing() (called when tracks are removed) and close()

Publishing Transformed Video

Use VideoProcessorPublisher with QueuedVideoTrack to publish transformed video back to the call:
import aiortc
import av
import cv2
from typing import Optional
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.utils.video_forwarder import VideoForwarder

class GrayscaleProcessor(VideoProcessorPublisher):
    name = "grayscale"

    def __init__(self, fps: int = 30):
        self.fps = fps
        self._forwarder: Optional[VideoForwarder] = None
        self._video_track = QueuedVideoTrack()

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        # Remove existing handler if switching tracks
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)

        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._process_frame,
            fps=float(self.fps),
            name="grayscale",
        )

    async def _process_frame(self, frame: av.VideoFrame):
        img = frame.to_ndarray(format="rgb24")
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)

        new_frame = av.VideoFrame.from_ndarray(rgb, format="rgb24")
        await self._video_track.add_frame(new_frame)

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        return self._video_track

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
        self._video_track.stop()
Key utilities:
  • QueuedVideoTrack — Writable video track. Call add_frame() to queue frames for publishing.
  • VideoForwarder — Distributes incoming frames to handlers at independent FPS rates.

Emitting Custom Events

Use attach_agent() to register custom events that other parts of your application can subscribe to:
from dataclasses import dataclass
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.events import Event

@dataclass
class ObjectDetectedEvent(Event):
    objects: list[str]
    frame_number: int

class DetectionProcessor(VideoProcessorPublisher):
    name = "detection"

    def attach_agent(self, agent):
        self._events = agent.events
        self._events.register(ObjectDetectedEvent)

    async def _process_frame(self, frame):
        objects = self._detect(frame)
        await self._events.emit(ObjectDetectedEvent(
            objects=objects,
            frame_number=self.frame_count,
        ))

YOLO Pose Detection

The Ultralytics plugin provides YOLOPoseProcessor for real-time pose detection with skeleton overlays:
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini, ultralytics

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Golf Coach", id="agent"),
    instructions="Analyze the user's golf swing.",
    llm=gemini.Realtime(fps=10),
    processors=[
        ultralytics.YOLOPoseProcessor(
            model_path="yolo11n-pose.pt",
            fps=30,
            conf_threshold=0.5,
        )
    ],
)
Use cases: Golf coaching, fitness form checking, dance instruction, physical therapy. See Ultralytics for parameters and model options.

HeyGen Avatars

The HeyGen plugin provides AvatarPublisher for lip-syncing AI avatars that speak agent responses. Use cases: Virtual presenters, customer service avatars, interactive tutors. See HeyGen for setup and examples.

Next Steps

Processors Class

Base class API reference

Ultralytics YOLO

Pose detection reference

HeyGen Avatars

Avatar publisher reference

Event System

Custom events reference