Skip to main content
Processors analyze, transform, or publish audio and video streams in real-time. Build custom processors or use built-in ones like YOLO pose detection and HeyGen avatars.
For the base class API reference, see Processors Class.

How Video Processing Works

When a participant publishes video, the agent creates a VideoForwarder that reads frames from the track. This forwarder is shared with all video processors via the process_video() method. Each processor registers a frame handler with its desired FPS.
Participant Video Track


   VideoForwarder (shared)

    ┌────┼────┐
    ▼    ▼    ▼
 Proc1 Proc2 Proc3  (each at independent FPS)

Building a Custom Processor

A minimal video processor that logs frames:
import aiortc
import av
from typing import Optional
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder

class FrameLogger(VideoProcessor):
    name = "frame_logger"

    def __init__(self, fps: int = 5):
        self.fps = fps
        self.frame_count = 0
        self._forwarder: Optional[VideoForwarder] = None

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._log_frame,
            fps=float(self.fps),
            name="frame_logger",
        )

    async def _log_frame(self, frame: av.VideoFrame):
        self.frame_count += 1
        print(f"Frame {self.frame_count} ({frame.width}x{frame.height})")

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._log_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
Key patterns:
  • Set name as a class attribute
  • Store a reference to shared_forwarder to remove handlers later
  • Register handlers with add_frame_handler(callback, fps, name)
  • Clean up in stop_processing() (called when tracks are removed) and close()

Publishing Transformed Video

Use VideoProcessorPublisher with QueuedVideoTrack to publish transformed video back to the call:
import aiortc
import av
import cv2
from typing import Optional
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.utils.video_forwarder import VideoForwarder

class GrayscaleProcessor(VideoProcessorPublisher):
    name = "grayscale"

    def __init__(self, fps: int = 30):
        self.fps = fps
        self._forwarder: Optional[VideoForwarder] = None
        self._video_track = QueuedVideoTrack()

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        # Remove existing handler if switching tracks
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)

        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._process_frame,
            fps=float(self.fps),
            name="grayscale",
        )

    async def _process_frame(self, frame: av.VideoFrame):
        img = frame.to_ndarray(format="rgb24")
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)

        new_frame = av.VideoFrame.from_ndarray(rgb, format="rgb24")
        await self._video_track.add_frame(new_frame)

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        return self._video_track

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
        self._video_track.stop()
Key utilities:
  • QueuedVideoTrack — Writable video track. Call add_frame() to queue frames for publishing.
  • VideoForwarder — Distributes incoming frames to handlers at independent FPS rates.

Emitting Custom Events

Use attach_agent() to register custom events that other parts of your application can subscribe to:
from dataclasses import dataclass
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.events import Event

@dataclass
class ObjectDetectedEvent(Event):
    objects: list[str]
    frame_number: int

class DetectionProcessor(VideoProcessorPublisher):
    name = "detection"

    def attach_agent(self, agent):
        self._events = agent.events
        self._events.register(ObjectDetectedEvent)

    async def _process_frame(self, frame):
        objects = self._detect(frame)
        await self._events.emit(ObjectDetectedEvent(
            objects=objects,
            frame_number=self.frame_count,
        ))

YOLO Pose Detection

The Ultralytics plugin provides YOLOPoseProcessor for real-time pose detection with skeleton overlays:
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini, ultralytics

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Golf Coach", id="agent"),
    instructions="Analyze the user's golf swing.",
    llm=gemini.Realtime(fps=10),
    processors=[
        ultralytics.YOLOPoseProcessor(
            model_path="yolo11n-pose.pt",
            fps=30,
            conf_threshold=0.5,
        )
    ],
)
Use cases: Golf coaching, fitness form checking, dance instruction, physical therapy. See Ultralytics for parameters and model options.

HeyGen Avatars

The HeyGen plugin provides AvatarPublisher for lip-syncing AI avatars that speak agent responses. Use cases: Virtual presenters, customer service avatars, interactive tutors. See HeyGen for setup and examples.

Next Steps