Skip to main content
Build custom plugins to connect Vision Agents to any AI provider. Plugins wrap provider APIs with a consistent interface, enabling seamless integration with the agent framework.
Vision Agents requires a Stream account for real-time transport.

Before You Build

Many providers support OpenAI-compatible APIs. Before writing a custom plugin, check if you can use existing plugins with a custom base_url:
from vision_agents.plugins import openai

# OpenAI-compatible LLM (Ollama, LM Studio, vLLM, etc.)
llm = openai.ChatCompletionsLLM(
    model="llama-3.1-8b",
    base_url="http://localhost:11434/v1",
    api_key="ollama",  # Some servers require a placeholder
)

# OpenAI-compatible VLM
vlm = openai.ChatCompletionsVLM(
    model="llava-1.5-7b",
    base_url="http://localhost:8000/v1",
    fps=1,
)

# OpenAI-compatible Realtime
realtime = openai.Realtime(
    model="gpt-4o-realtime",
    base_url="wss://custom-endpoint.com/v1/realtime",
)
Build a custom plugin when:
  • The provider uses a proprietary API format
  • You need provider-specific features not exposed through Chat Completions
  • The provider requires custom authentication or connection handling

Plugin Categories

CategoryBase ClassAbstract MethodsExample
STTSTTprocess_audio()Deepgram
TTSTTSstream_audio(), stop_audio()ElevenLabs
LLMLLMsimple_response()OpenRouter
VLMVideoLLMwatch_video_track(), stop_watching_video_track()NVIDIA
RealtimeRealtimeconnect(), simple_audio_response()Gemini Live
Turn DetectionTurnDetectorprocess_audio()SmartTurn
ProcessorProcessor / VideoProcessor / AudioProcessorclose()Ultralytics

Quickstart Template

Create your plugin in plugins/acme/:
plugins/acme/
├── pyproject.toml
├── README.md
├── vision_agents/
│   └── plugins/
│       └── acme/
│           ├── __init__.py
│           ├── stt.py
│           └── events.py  # Optional custom events
└── tests/
    └── test_stt.py

pyproject.toml

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "vision-agents-plugins-acme"
version = "0.1.0"
description = "Acme STT integration for Vision Agents"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "vision-agents",
    "acme-sdk>=1.0",
]

[tool.hatch.build.targets.wheel]
packages = [".", "vision_agents"]

[tool.uv.sources]
vision-agents = { workspace = true }

[dependency-groups]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=1.0",
]

init.py

from .stt import STT

__all__ = ["STT"]

stt.py

from vision_agents.core import stt
from getstream.video.rtc.track_util import PcmData
import acme_sdk

class STT(stt.STT):
    """Acme speech-to-text integration."""

    def __init__(self, api_key: str = None, model: str = "default"):
        super().__init__()
        self.client = acme_sdk.Client(api_key=api_key)
        self.model = model

    async def process_audio(self, pcm: PcmData, participant=None):
        resampled = pcm.resample(16000, 1)
        result = await self.client.transcribe(
            audio=resampled.data,
            sample_rate=resampled.sample_rate,
            model=self.model,
        )

        self._emit_transcript_event(
            text=result.text,
            participant=participant,
            response=result,
        )

    async def close(self):
        await self.client.close()

Base Class Interfaces

STT

from vision_agents.core import stt
from getstream.video.rtc.track_util import PcmData

class MySTT(stt.STT):
    async def process_audio(self, pcm: PcmData, participant=None):
        """Process audio and emit transcripts."""
        # Resample to provider's expected format
        resampled = pcm.resample(16000, 1)

        result = await self.provider.transcribe(resampled.data)

        # Emit transcript using helper method
        self._emit_transcript_event(
            text=result.text,
            participant=participant,
            response=result,
        )

    async def close(self):
        """Cleanup connections."""
        await self.provider.close()

TTS

from vision_agents.core import tts

class MyTTS(tts.TTS):
    async def stream_audio(self, text: str, **kwargs):
        """Convert text to audio. Return bytes, PcmData, or async iterator."""
        async for chunk in self.provider.synthesize(text):
            yield chunk  # bytes or PcmData

    async def stop_audio(self):
        """Stop current synthesis."""
        await self.provider.cancel()

    async def close(self):
        await self.provider.close()

LLM

from vision_agents.core import llm

class MyLLM(llm.LLM):
    async def simple_response(self, text: str, processors=None, participant=None):
        """Generate response to text input."""
        messages = self._build_messages(text)

        # Include tools if function calling is supported
        tools = self._convert_tools_to_provider_format(self.get_available_functions())

        response = await self.provider.chat(messages, tools=tools)

        # Handle tool calls if present
        tool_calls = self._extract_tool_calls_from_response(response)
        if tool_calls:
            results = await self._execute_tools(tool_calls)
            # Continue conversation with tool results...

        return self._create_response_event(response)

VLM (Video Language Model)

VLM plugins process video frames alongside text. The framework provides VideoForwarder for frame management.
from vision_agents.core.llm import VideoLLM
from vision_agents.core.utils import VideoForwarder
from collections import deque
import base64
from PIL import Image
import io

class MyVLM(VideoLLM):
    def __init__(self, model: str, fps: int = 1, frame_buffer_seconds: int = 10):
        super().__init__()
        self.model = model
        self.fps = fps
        self._frame_buffer = deque(maxlen=fps * frame_buffer_seconds)
        self._forwarder = None
        self._handler_id = None

    async def watch_video_track(self, track, shared_forwarder=None):
        """Subscribe to video frames from the call."""
        # Use shared forwarder if provided, otherwise create own
        if shared_forwarder:
            self._forwarder = shared_forwarder
        else:
            self._forwarder = VideoForwarder(track)
            await self._forwarder.start()

        # Register frame handler at desired FPS
        self._handler_id = self._forwarder.add_frame_handler(
            on_frame=self._on_frame,
            fps=self.fps,
            name="my_vlm",
        )

    def _on_frame(self, frame):
        """Called for each video frame."""
        # Convert to PIL Image
        img = frame.to_image()
        self._frame_buffer.append(img)

    async def stop_watching_video_track(self):
        """Unsubscribe from video frames."""
        if self._forwarder and self._handler_id:
            self._forwarder.remove_frame_handler(self._handler_id)

    async def simple_response(self, text: str, processors=None, participant=None):
        """Generate response using buffered frames."""
        # Encode frames as base64 for API
        images = []
        for img in self._frame_buffer:
            buffer = io.BytesIO()
            img.save(buffer, format="JPEG", quality=85)
            images.append(base64.b64encode(buffer.getvalue()).decode())

        # Call provider with images
        response = await self.provider.chat(
            messages=[{"role": "user", "content": text}],
            images=images,
        )
        return self._create_response_event(response)

    async def close(self):
        await self.stop_watching_video_track()
        if self._forwarder and not self._forwarder._shared:
            await self._forwarder.stop()
Key VLM concepts:
  • VideoForwarder: Manages frame buffering and distributes to multiple handlers at different FPS
  • Shared forwarder: Multiple plugins can share one forwarder to avoid duplicate frame processing
  • Frame buffer: Store recent frames for context (configurable size)
  • FPS control: Request frames at the rate your model needs (1-30 fps typical)

Realtime (Speech-to-Speech)

from vision_agents.core.llm import Realtime

class MyRealtime(Realtime):
    async def connect(self):
        """Establish WebSocket/WebRTC connection."""
        self.ws = await self.provider.connect()
        self._emit_connected_event()

    async def simple_audio_response(self, pcm, participant=None):
        """Process incoming audio."""
        await self.ws.send_audio(pcm.data)

    async def close(self):
        await self.ws.close()
        self._emit_disconnected_event()

Audio Utilities

Vision Agents provides utilities to simplify audio handling in STT and TTS plugins.

PcmData Resampling

Most STT providers expect 16kHz mono audio. Use the built-in resampling:
async def process_audio(self, pcm: PcmData, participant=None):
    # Resample to 16kHz mono (standard for most STT APIs)
    resampled = pcm.resample(target_sample_rate=16000, target_channels=1)

    # Access audio properties
    audio_bytes = resampled.data
    sample_rate = resampled.sample_rate  # 16000
    channels = resampled.channels         # 1

TTS Output Format

The TTS base class handles output format conversion automatically:
class MyTTS(tts.TTS):
    def __init__(self):
        super().__init__()
        # Configure output format (called by framework based on call requirements)
        # Default: 16kHz mono PCM_S16

    async def stream_audio(self, text: str, **kwargs):
        # Return audio in any format - base class resamples automatically
        async for chunk in self.provider.synthesize(text):
            yield chunk  # Resampled to _desired_sample_rate before emission

AudioQueue for Buffering

For plugins that need to buffer audio (e.g., accumulating before processing):
from vision_agents.core.utils import AudioQueue

queue = AudioQueue(max_duration_ms=5000)  # 5 second buffer

# Add audio
queue.put_nowait(pcm_chunk)

# Get specific duration
audio = queue.get_duration(duration_ms=1000)  # Get 1 second

# Get specific sample count
audio = queue.get_samples(num_samples=16000)  # Get 16k samples

Function Calling

To support function calling in your LLM plugin, override these methods:
class MyLLM(llm.LLM):
    def _convert_tools_to_provider_format(self, tools):
        """Convert ToolSchema list to provider's format."""
        return [
            {
                "name": t["name"],
                "description": t.get("description", ""),
                "parameters": t["parameters_schema"],
            }
            for t in tools
        ]

    def _extract_tool_calls_from_response(self, response):
        """Extract tool calls from provider response."""
        return [
            {
                "type": "tool_call",
                "name": call.function_name,
                "arguments_json": call.arguments,
                "id": call.id,
            }
            for call in response.tool_calls
        ]

    def _create_tool_result_message(self, tool_calls, results):
        """Format tool results for follow-up request."""
        return [
            {"role": "tool", "tool_call_id": tc["id"], "content": str(result)}
            for tc, result in zip(tool_calls, results)
        ]
The base class handles:
  • Function registration via @llm.register_function()
  • Tool execution with _execute_tools() (concurrent, with timeout)
  • Tool call deduplication by (name, arguments)
  • Multi-round tool calling (configurable via max_tool_rounds)

Event Emission

Base classes provide helper methods for common events:
Base ClassHelper Methods
STT_emit_transcript_event(), _emit_partial_transcript_event(), _emit_turn_started_event(), _emit_turn_ended_event()
TTS_emit_chunk() (called automatically by send())
Realtime_emit_connected_event(), _emit_disconnected_event(), _emit_audio_output_event()
For custom events:
from dataclasses import dataclass
from vision_agents.core.events import PluginBaseEvent

@dataclass
class AcmeCustomEvent(PluginBaseEvent):
    type: str = "acme.custom"
    custom_field: str = ""

# In your plugin:
self.events.send(AcmeCustomEvent(custom_field="value"))

Gotchas & Best Practices

Connection Lifecycle

Owned vs shared clients: Track whether your plugin created the client or received it:
def __init__(self, client=None):
    self._own_client = client is None
    self._client = client or aiohttp.ClientSession()

async def close(self):
    if self._own_client and self._client:
        await self._client.close()
Connection timeouts: Always use timeouts for connection setup:
self.connection = await asyncio.wait_for(
    self.provider.connect(),
    timeout=10.0
)

Cleanup Order

Follow this order in close() to prevent deadlocks:
async def close(self):
    # 1. Set closed flag first (prevents new work)
    self.closed = True

    # 2. Cancel background tasks
    if self._task:
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass

    # 3. Close connections in try-finally
    if self.connection:
        try:
            await self.connection.close()
        finally:
            self.connection = None

Error Handling

Temporary errors (network timeouts, transient API errors): Emit and continue:
except TimeoutError as e:
    self._emit_error_event(e, context="transcription timeout")
    # Plugin continues operating
Permanent errors (invalid API key, unsupported model): Raise directly:
if not api_key:
    raise ValueError("API key required")

Threading for Blocking Operations

Some SDKs have blocking calls. Use a thread pool:
from concurrent.futures import ThreadPoolExecutor

class MySTT(stt.STT):
    def __init__(self):
        super().__init__()
        self._executor = ThreadPoolExecutor(max_workers=1)

    async def process_audio(self, pcm, participant=None):
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            self._executor,
            self.blocking_transcribe,
            pcm.data
        )
        self._emit_transcript_event(result.text, participant, result)

    async def close(self):
        self._executor.shutdown(wait=False)

Concurrency Control

Prevent concurrent processing when your provider doesn’t support it:
def __init__(self):
    self._lock = asyncio.Lock()

async def process_audio(self, pcm, participant=None):
    async with self._lock:
        # Only one request at a time
        result = await self.provider.transcribe(pcm.data)

Sample Rate Requirements

Plugin TypeExpected InputStandard Rate
STTResampled audio16kHz mono
TTSOutput configurable16-48kHz
RealtimeRaw PCM24kHz or 48kHz (provider-specific)

Reconnection with Backoff

For WebSocket-based plugins:
async def _reconnect(self):
    for attempt in range(3):
        try:
            await asyncio.sleep(2 ** attempt)  # 1, 2, 4 seconds
            await self.connect()
            return
        except Exception as e:
            logger.warning(f"Reconnect attempt {attempt + 1} failed: {e}")
    raise ConnectionError("Failed to reconnect after 3 attempts")

Testing

cd plugins/acme
uv sync
uv run pytest -v

Contribution Checklist

  1. Implement required abstract methods
  2. Add tests with reasonable coverage
  3. Pass uv run pre-commit run --all-files
  4. Add README.md documenting usage and events
  5. Open a PR to the Vision Agents repo

Next Steps