Skip to main content
Processors extend the agent’s capabilities by analyzing and transforming audio/video streams in real-time. They can access incoming media, provide state to the LLM, and publish transformed streams back to the call. Common use cases:
  • Video analysis — Pose detection, object recognition, scene understanding
  • Media transformation — Video effects, avatars, filters
  • State injection — Feed detection results or external data to the LLM

Class Hierarchy

All processors inherit from the abstract Processor base class:
ClassPurpose
ProcessorAbstract base class with name, close(), and attach_agent()
VideoProcessorReceives video tracks via process_video()
VideoPublisherOutputs video via publish_video_track()
VideoProcessorPublisherReceives and outputs video (e.g., annotated frames)
AudioProcessorReceives audio via process_audio()
AudioPublisherOutputs audio via publish_audio_track()
AudioProcessorPublisherReceives and outputs audio

Base Processor

All processors must implement name and close(). The attach_agent() method is optional.
from vision_agents.core.processors import Processor

class MyProcessor(Processor):
    name = "my_processor"

    async def close(self) -> None:
        """Clean up resources when the agent stops."""
        pass

    def attach_agent(self, agent: "Agent") -> None:
        """Called once when the processor is registered with an agent.

        Use this to access agent.events for custom event registration.
        """
        self._agent = agent

Video Processor

Receives video tracks from participants. The agent provides a shared VideoForwarder that distributes frames to all processors.
from typing import Optional
import aiortc
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder

class MyVideoProcessor(VideoProcessor):
    name = "my_video_processor"

    def __init__(self):
        self._forwarder: Optional[VideoForwarder] = None

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        """Called when a participant publishes video."""
        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._on_frame, fps=5.0, name="my_handler"
        )

    async def _on_frame(self, frame):
        # Process the frame
        pass

    async def stop_processing(self) -> None:
        """Called when video tracks are removed."""
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._on_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()

Video Publisher

Outputs a video track to the call (e.g., AI-generated video or avatars).
import aiortc
from vision_agents.core.processors import VideoPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack

class MyVideoPublisher(VideoPublisher):
    name = "my_video_publisher"

    def __init__(self):
        self._track = QueuedVideoTrack(width=1280, height=720, fps=30)

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        """Return the track to publish."""
        return self._track

    async def close(self) -> None:
        self._track.stop()

Video Processor + Publisher

For processors that receive video and output transformed frames (e.g., object detection with annotations).
import av
from typing import Optional
import aiortc
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_forwarder import VideoForwarder
from vision_agents.core.utils.video_track import QueuedVideoTrack

class AnnotationProcessor(VideoProcessorPublisher):
    name = "annotation_processor"

    def __init__(self, fps: int = 30):
        self.fps = fps
        self._forwarder: Optional[VideoForwarder] = None
        self._track = QueuedVideoTrack()

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._process_frame, fps=float(self.fps), name="annotator"
        )

    async def _process_frame(self, frame: av.VideoFrame):
        # Transform the frame
        annotated = self._annotate(frame)
        await self._track.add_frame(annotated)

    def _annotate(self, frame: av.VideoFrame) -> av.VideoFrame:
        # Add annotations to frame
        return frame

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        return self._track

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
        self._track.stop()

Audio Processor

Receives audio data from participants. Audio is delivered as PcmData chunks.
from getstream.video.rtc import PcmData
from vision_agents.core.processors import AudioProcessor

class MyAudioProcessor(AudioProcessor):
    name = "my_audio_processor"

    async def process_audio(self, audio_data: PcmData) -> None:
        """Process incoming audio.

        Args:
            audio_data: Contains samples, sample_rate, channels, and participant info.
        """
        samples = audio_data.samples
        participant = audio_data.participant
        # Process audio...

    async def close(self) -> None:
        pass

Audio Publisher

Outputs an audio track to the call.
import aiortc
from vision_agents.core.processors import AudioPublisher

class MyAudioPublisher(AudioPublisher):
    name = "my_audio_publisher"

    def publish_audio_track(self) -> aiortc.AudioStreamTrack:
        """Return the audio track to publish."""
        return self._audio_track

    async def close(self) -> None:
        pass

Usage

Pass processors to the agent at initialization:
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, openai

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="AI Assistant", id="agent"),
    llm=openai.Realtime(),
    processors=[your_processor],
)
For complete examples including YOLO pose detection and object detection, see Building Video Processors.