Build custom plugins to connect Vision Agents to any AI provider. Plugins wrap provider APIs with a consistent interface, enabling seamless integration with the agent framework.
Vision Agents requires a Stream account for real-time transport.
Before You Build
Many providers support OpenAI-compatible APIs. Before writing a custom plugin, check if you can use existing plugins with a custom base_url:
from vision_agents.plugins import openai
# OpenAI-compatible LLM (Ollama, LM Studio, vLLM, etc.)
llm = openai.ChatCompletionsLLM(
model = "llama-3.1-8b" ,
base_url = "http://localhost:11434/v1" ,
api_key = "ollama" , # Some servers require a placeholder
)
# OpenAI-compatible VLM
vlm = openai.ChatCompletionsVLM(
model = "llava-1.5-7b" ,
base_url = "http://localhost:8000/v1" ,
fps = 1 ,
)
# OpenAI-compatible Realtime
realtime = openai.Realtime(
model = "gpt-realtime" ,
base_url = "wss://custom-endpoint.com/v1/realtime" ,
)
Build a custom plugin when:
The provider uses a proprietary API format
You need provider-specific features not exposed through Chat Completions
The provider requires custom authentication or connection handling
Plugin Categories
Category Base Class Abstract Methods Example STT STTprocess_audio()Deepgram TTS TTSstream_audio(), stop_audio()ElevenLabs LLM LLMsimple_response()OpenRouter VLM VideoLLMwatch_video_track(), stop_watching_video_track()NVIDIA Realtime Realtimeconnect(), simple_audio_response()Gemini Live Turn Detection TurnDetectorprocess_audio()SmartTurn Processor Processor / VideoProcessor / AudioProcessorclose()Ultralytics
The TTS and Realtime base classes also provide a built-in interrupt() method and epoch property for barge-in handling. You do not need to override these — the Agent calls interrupt() automatically when a user interruption is detected.
Quickstart Template
Create your plugin in plugins/acme/:
plugins/acme/
├── pyproject.toml
├── README.md
├── vision_agents/
│ └── plugins/
│ └── acme/
│ ├── __init__.py
│ ├── stt.py
│ └── events.py # Optional custom events
└── tests/
└── test_stt.py
pyproject.toml
[ build-system ]
requires = [ "hatchling" ]
build-backend = "hatchling.build"
[ project ]
name = "vision-agents-plugins-acme"
version = "0.1.0"
description = "Acme STT integration for Vision Agents"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"vision-agents" ,
"acme-sdk>=1.0" ,
]
[ tool . hatch . build . targets . wheel ]
packages = [ "." , "vision_agents" ]
[ tool . uv . sources ]
vision-agents = { workspace = true }
[ dependency-groups ]
dev = [
"pytest>=8.0" ,
"pytest-asyncio>=1.0" ,
]
init .py
from .stt import STT
__all__ = [ "STT" ]
stt.py
from vision_agents.core import stt
from getstream.video.rtc.track_util import PcmData
import acme_sdk
class STT ( stt . STT ):
"""Acme speech-to-text integration."""
def __init__ ( self , api_key : str = None , model : str = "default" ):
super (). __init__ ()
self .client = acme_sdk.Client( api_key = api_key)
self .model = model
async def process_audio ( self , pcm : PcmData, participant = None ):
resampled = pcm.resample( 16000 , 1 )
result = await self .client.transcribe(
audio = resampled.data,
sample_rate = resampled.sample_rate,
model = self .model,
)
self ._emit_transcript_event(
text = result.text,
participant = participant,
response = result,
)
async def close ( self ):
await self .client.close()
Base Class Interfaces
STT
from vision_agents.core import stt
from getstream.video.rtc.track_util import PcmData
class MySTT ( stt . STT ):
async def process_audio ( self , pcm : PcmData, participant = None ):
"""Process audio and emit transcripts."""
# Resample to provider's expected format
resampled = pcm.resample( 16000 , 1 )
result = await self .provider.transcribe(resampled.data)
# Emit transcript using helper method
self ._emit_transcript_event(
text = result.text,
participant = participant,
response = result,
)
async def close ( self ):
"""Cleanup connections."""
await self .provider.close()
TTS
from vision_agents.core import tts
class MyTTS ( tts . TTS ):
async def stream_audio ( self , text : str , ** kwargs ):
"""Convert text to audio. Return bytes, PcmData, or async iterator."""
async for chunk in self .provider.synthesize(text):
yield chunk # bytes or PcmData
async def stop_audio ( self ):
"""Stop current synthesis."""
await self .provider.cancel()
async def close ( self ):
await self .provider.close()
LLM
from vision_agents.core import llm
class MyLLM ( llm . LLM ):
async def simple_response ( self , text : str , processors = None , participant = None ):
"""Generate response to text input."""
messages = self ._build_messages(text)
# Include tools if function calling is supported
tools = self ._convert_tools_to_provider_format( self .get_available_functions())
response = await self .provider.chat(messages, tools = tools)
# Handle tool calls if present
tool_calls = self ._extract_tool_calls_from_response(response)
if tool_calls:
results = await self ._execute_tools(tool_calls)
# Continue conversation with tool results...
return self ._create_response_event(response)
VLM (Video Language Model)
VLM plugins process video frames alongside text. The framework provides VideoForwarder for frame management.
from vision_agents.core.llm import VideoLLM
from vision_agents.core.utils import VideoForwarder
from collections import deque
import base64
from PIL import Image
import io
class MyVLM ( VideoLLM ):
def __init__ ( self , model : str , fps : int = 1 , frame_buffer_seconds : int = 10 ):
super (). __init__ ()
self .model = model
self .fps = fps
self ._frame_buffer = deque( maxlen = fps * frame_buffer_seconds)
self ._forwarder = None
self ._handler_id = None
async def watch_video_track ( self , track , shared_forwarder = None ):
"""Subscribe to video frames from the call."""
# Use shared forwarder if provided, otherwise create own
if shared_forwarder:
self ._forwarder = shared_forwarder
else :
self ._forwarder = VideoForwarder(track)
await self ._forwarder.start()
# Register frame handler at desired FPS
self ._handler_id = self ._forwarder.add_frame_handler(
on_frame = self ._on_frame,
fps = self .fps,
name = "my_vlm" ,
)
def _on_frame ( self , frame ):
"""Called for each video frame."""
# Convert to PIL Image
img = frame.to_image()
self ._frame_buffer.append(img)
async def stop_watching_video_track ( self ):
"""Unsubscribe from video frames."""
if self ._forwarder and self ._handler_id:
self ._forwarder.remove_frame_handler( self ._handler_id)
async def simple_response ( self , text : str , processors = None , participant = None ):
"""Generate response using buffered frames."""
# Encode frames as base64 for API
images = []
for img in self ._frame_buffer:
buffer = io.BytesIO()
img.save(buffer, format = "JPEG" , quality = 85 )
images.append(base64.b64encode(buffer.getvalue()).decode())
# Call provider with images
response = await self .provider.chat(
messages = [{ "role" : "user" , "content" : text}],
images = images,
)
return self ._create_response_event(response)
async def close ( self ):
await self .stop_watching_video_track()
if self ._forwarder and not self ._forwarder._shared:
await self ._forwarder.stop()
Key VLM concepts:
VideoForwarder : Manages frame buffering and distributes to multiple handlers at different FPS
Shared forwarder : Multiple plugins can share one forwarder to avoid duplicate frame processing
Frame buffer : Store recent frames for context (configurable size)
FPS control : Request frames at the rate your model needs (1-30 fps typical)
Realtime (Speech-to-Speech)
from vision_agents.core.llm import Realtime
class MyRealtime ( Realtime ):
async def connect ( self ):
"""Establish WebSocket/WebRTC connection."""
self .ws = await self .provider.connect()
self ._emit_connected_event()
async def simple_audio_response ( self , pcm , participant = None ):
"""Process incoming audio."""
await self .ws.send_audio(pcm.data)
async def close ( self ):
await self .ws.close()
self ._emit_disconnected_event()
Audio Utilities
Vision Agents provides utilities to simplify audio handling in STT and TTS plugins.
PcmData Resampling
Most STT providers expect 16kHz mono audio. Use the built-in resampling:
async def process_audio ( self , pcm : PcmData, participant = None ):
# Resample to 16kHz mono (standard for most STT APIs)
resampled = pcm.resample( target_sample_rate = 16000 , target_channels = 1 )
# Access audio properties
audio_bytes = resampled.data
sample_rate = resampled.sample_rate # 16000
channels = resampled.channels # 1
The TTS base class handles output format conversion automatically:
class MyTTS ( tts . TTS ):
def __init__ ( self ):
super (). __init__ ()
# Configure output format (called by framework based on call requirements)
# Default: 16kHz mono PCM_S16
async def stream_audio ( self , text : str , ** kwargs ):
# Return audio in any format - base class resamples automatically
async for chunk in self .provider.synthesize(text):
yield chunk # Resampled to _desired_sample_rate before emission
AudioQueue for Buffering
For plugins that need to buffer audio (e.g., accumulating before processing):
from vision_agents.core.utils import AudioQueue
queue = AudioQueue( max_duration_ms = 5000 ) # 5 second buffer
# Add audio
queue.put_nowait(pcm_chunk)
# Get specific duration
audio = queue.get_duration( duration_ms = 1000 ) # Get 1 second
# Get specific sample count
audio = queue.get_samples( num_samples = 16000 ) # Get 16k samples
Function Calling
To support function calling in your LLM plugin, override these methods:
class MyLLM ( llm . LLM ):
def _convert_tools_to_provider_format ( self , tools ):
"""Convert ToolSchema list to provider's format."""
return [
{
"name" : t[ "name" ],
"description" : t.get( "description" , "" ),
"parameters" : t[ "parameters_schema" ],
}
for t in tools
]
def _extract_tool_calls_from_response ( self , response ):
"""Extract tool calls from provider response."""
return [
{
"type" : "tool_call" ,
"name" : call.function_name,
"arguments_json" : call.arguments,
"id" : call.id,
}
for call in response.tool_calls
]
def _create_tool_result_message ( self , tool_calls , results ):
"""Format tool results for follow-up request."""
return [
{ "role" : "tool" , "tool_call_id" : tc[ "id" ], "content" : str (result)}
for tc, result in zip (tool_calls, results)
]
The base class handles:
Function registration via @llm.register_function()
Tool execution with _execute_tools() (concurrent, with timeout)
Tool call deduplication by (name, arguments)
Multi-round tool calling (configurable via max_tool_rounds)
Event Emission
Base classes provide helper methods for common events:
Base Class Helper Methods STT _emit_transcript_event(), _emit_partial_transcript_event(), _emit_turn_started_event(), _emit_turn_ended_event()TTS _emit_chunk() (called automatically by send())Realtime _emit_connected_event(), _emit_disconnected_event(), _emit_audio_output_event()
For custom events:
from dataclasses import dataclass
from vision_agents.core.events import PluginBaseEvent
@dataclass
class AcmeCustomEvent ( PluginBaseEvent ):
type : str = "acme.custom"
custom_field: str = ""
# In your plugin:
self .events.send(AcmeCustomEvent( custom_field = "value" ))
Gotchas & Best Practices
Connection Lifecycle
Owned vs shared clients : Track whether your plugin created the client or received it:
def __init__ ( self , client = None ):
self ._own_client = client is None
self ._client = client or aiohttp.ClientSession()
async def close ( self ):
if self ._own_client and self ._client:
await self ._client.close()
Connection timeouts : Always use timeouts for connection setup:
self .connection = await asyncio.wait_for(
self .provider.connect(),
timeout = 10.0
)
Cleanup Order
Follow this order in close() to prevent deadlocks:
async def close ( self ):
# 1. Set closed flag first (prevents new work)
self .closed = True
# 2. Cancel background tasks
if self ._task:
self ._task.cancel()
try :
await self ._task
except asyncio.CancelledError:
pass
# 3. Close connections in try-finally
if self .connection:
try :
await self .connection.close()
finally :
self .connection = None
Error Handling
Temporary errors (network timeouts, transient API errors): Emit and continue:
except TimeoutError as e:
self ._emit_error_event(e, context = "transcription timeout" )
# Plugin continues operating
Permanent errors (invalid API key, unsupported model): Raise directly:
if not api_key:
raise ValueError ( "API key required" )
Threading for Blocking Operations
Some SDKs have blocking calls. Use a thread pool:
from concurrent.futures import ThreadPoolExecutor
class MySTT ( stt . STT ):
def __init__ ( self ):
super (). __init__ ()
self ._executor = ThreadPoolExecutor( max_workers = 1 )
async def process_audio ( self , pcm , participant = None ):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
self ._executor,
self .blocking_transcribe,
pcm.data
)
self ._emit_transcript_event(result.text, participant, result)
async def close ( self ):
self ._executor.shutdown( wait = False )
Concurrency Control
Prevent concurrent processing when your provider doesn’t support it:
def __init__ ( self ):
self ._lock = asyncio.Lock()
async def process_audio ( self , pcm , participant = None ):
async with self ._lock:
# Only one request at a time
result = await self .provider.transcribe(pcm.data)
Sample Rate Requirements
Plugin Type Expected Input Standard Rate STT Resampled audio 16kHz mono TTS Output configurable 16-48kHz Realtime Raw PCM 24kHz or 48kHz (provider-specific)
Reconnection with Backoff
For WebSocket-based plugins:
async def _reconnect ( self ):
for attempt in range ( 3 ):
try :
await asyncio.sleep( 2 ** attempt) # 1, 2, 4 seconds
await self .connect()
return
except Exception as e:
logger.warning( f "Reconnect attempt { attempt + 1 } failed: { e } " )
raise ConnectionError ( "Failed to reconnect after 3 attempts" )
Testing
cd plugins/acme
uv sync
uv run pytest -v
Contribution Checklist
Implement required abstract methods
Add tests with reasonable coverage
Pass uv run pre-commit run --all-files
Add README.md documenting usage and events
Open a PR to the Vision Agents repo
Next Steps
Event System Learn about events
Function Calling Add tool support