The landscape of AI voice assistants has evolved dramatically, moving from simple command-response systems to sophisticated agents that can access and reason over vast knowledge bases. This is where Retrieval-Augmented Generation (RAG) comes in, a powerful technique that allows LLMs to access and use external, private knowledge bases. Today, we'll explore how to build a production-ready voice AI agent using LiveKit's real-time framework combined with Supabase's vector database capabilities.
What Makes This Stack Powerful
LiveKit provides a realtime framework for production-grade multimodal and voice AI agents, while Supabase offers pgvector, a Postgres extension for vector similarity search that can store embeddings, with their philosophy being "The best vector database is the database you already have." This combination creates a robust foundation for building intelligent voice assistants that can access your organization's specific knowledge.
The key advantage of this approach is that since pgvector is built on top of Postgres, you can implement fine-grain access control on your vector database using Row Level Security (RLS), making it perfect for enterprise applications where data security is paramount.
Understanding the RAG Architecture
How RAG Works in Voice Applications
When building a voice AI agent with RAG capabilities, the system processes user speech in real-time and retrieves relevant information from your knowledge base. A voice AI agent uses tools like LlamaIndex for RAG to answer questions from a knowledge base, with agents that can answer questions about specific topics with lookups against documentation websites.
The typical flow works like this:
- User speaks a question
- Speech is transcribed to text
- The query is converted to embeddings
- Vector similarity search finds relevant knowledge
- Retrieved context is fed to the LLM
- Response is generated and spoken back
However, there are different approaches to maintain user engagement during Retrieval-Augmented Generation (RAG) lookups in voice-enabled AI assistants since these operations can introduce noticeable delays.
Setting Up the Supabase Vector Database
First, let's establish our vector storage using Supabase's pgvector extension. Vectors in Supabase are enabled via pgvector, a Postgres extension for storing and querying vectors in Postgres that can be used to store embeddings.
sql
-- Enable the vector extension CREATE EXTENSION IF NOT EXISTS vector; -- Create a table for our knowledge base CREATE TABLE documents ( id BIGSERIAL PRIMARY KEY, title TEXT NOT NULL, content TEXT NOT NULL, embedding VECTOR(1536), -- OpenAI embeddings dimension metadata JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Create an index for vector similarity search CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
The embedding column uses the VECTOR data type. The embeddings column is defined with the VECTOR data type and a size that specifies the number of dimensions each vector holds, aligned with the output dimensions of the embedding model you'll use.
Implementing the LiveKit Agent
Now let's build the core agent that connects LiveKit with our Supabase vector database:
python
import asyncio import logging from typing import Annotated from livekit.agents import ( AutoSubscribe, JobContext, WorkerOptions, cli, llm, ) from livekit.agents.voice_assistant import VoiceAssistant from livekit.plugins import openai, silero import supabase from supabase import create_client logger = logging.getLogger("voice-assistant") class SupabaseRAG: def __init__(self, supabase_url: str, supabase_key: str): self.client = create_client(supabase_url, supabase_key) async def similarity_search(self, query: str, limit: int = 5) -> list[str]: """Perform vector similarity search in Supabase""" # Generate embedding for the query embedding_response = await openai.create_embedding( input=query, model="text-embedding-3-small" ) query_embedding = embedding_response.data[0].embedding # Perform similarity search response = self.client.rpc( "match_documents", { "query_embedding": query_embedding, "match_threshold": 0.78, "match_count": limit } ).execute() return [doc["content"] for doc in response.data] async def entrypoint(ctx: JobContext): # Initialize components initial_ctx = llm.ChatContext().append( role="system", text=( "You are a voice assistant powered by LiveKit. Your interface with users will be voice. " "You can access a knowledge base to answer questions accurately. " "Be concise and conversational in your responses." ), ) # Initialize RAG system rag_system = SupabaseRAG( supabase_url="your-supabase-url", supabase_key="your-supabase-key" ) # Create the assistant assistant = VoiceAssistant( vad=silero.VAD.load(), stt=openai.STT(), llm=openai.LLM(), tts=openai.TTS(), chat_ctx=initial_ctx, ) # Add RAG capabilities @assistant.on("function_calls_finished") async def on_function_calls_finished(called_functions): # Extract user query from the conversation last_message = assistant.chat_ctx.messages[-1] if last_message.role == "user": # Perform RAG search relevant_docs = await rag_system.similarity_search(last_message.content) if relevant_docs: context = "\n".join(relevant_docs) assistant.chat_ctx.append( role="system", text=f"Relevant information from knowledge base:\n{context}" ) assistant.start(ctx.room) await asyncio.sleep(1) await assistant.say("Hello! I'm your AI assistant. I can help you with questions using my knowledge base.", allow_interruptions=True)
Creating the Vector Search Function
We need to create a PostgreSQL function for efficient similarity search:
sql
CREATE OR REPLACE FUNCTION match_documents ( query_embedding VECTOR(1536), match_threshold FLOAT, match_count INT ) RETURNS TABLE ( id BIGINT, title TEXT, content TEXT, similarity FLOAT ) LANGUAGE SQL STABLE AS $$ SELECT documents.id, documents.title, documents.content, 1 - (documents.embedding <=> query_embedding) AS similarity FROM documents WHERE 1 - (documents.embedding <=> query_embedding) > match_threshold ORDER BY documents.embedding <=> query_embedding LIMIT match_count; $$;
Handling Knowledge Base Ingestion
Document Processing Pipeline
To populate your knowledge base, you'll need a robust document processing pipeline:
python
import asyncio from pathlib import Path from supabase import create_client import openai import tiktoken class DocumentProcessor: def __init__(self, supabase_url: str, supabase_key: str): self.client = create_client(supabase_url, supabase_key) self.tokenizer = tiktoken.get_encoding("cl100k_base") def chunk_text(self, text: str, max_tokens: int = 500, overlap: int = 50) -> list[str]: """Split text into overlapping chunks""" tokens = self.tokenizer.encode(text) chunks = [] for i in range(0, len(tokens), max_tokens - overlap): chunk_tokens = tokens[i:i + max_tokens] chunk_text = self.tokenizer.decode(chunk_tokens) chunks.append(chunk_text) return chunks async def process_document(self, file_path: Path, metadata: dict = None): """Process a document and store in Supabase""" with open(file_path, 'r', encoding='utf-8') as file: content = file.read() chunks = self.chunk_text(content) for i, chunk in enumerate(chunks): # Generate embedding response = await openai.embeddings.create( input=chunk, model="text-embedding-3-small" ) embedding = response.data[0].embedding # Store in Supabase self.client.table('documents').insert({ 'title': f"{file_path.stem} - Chunk {i+1}", 'content': chunk, 'embedding': embedding, 'metadata': { **(metadata or {}), 'source_file': str(file_path), 'chunk_index': i } }).execute()
Batch Processing Multiple Documents
For larger knowledge bases, you'll want to process documents in batches:
python
async def process_knowledge_base(processor: DocumentProcessor, docs_dir: Path): """Process all documents in a directory""" tasks = [] for file_path in docs_dir.glob("**/*.txt"): task = processor.process_document( file_path, metadata={"category": file_path.parent.name} ) tasks.append(task) # Process in batches to avoid rate limits batch_size = 10 for i in range(0, len(tasks), batch_size): batch = tasks[i:i + batch_size] await asyncio.gather(*batch) await asyncio.sleep(1) # Rate limiting
Optimizing for Real-Time Performance
Managing RAG Delays in Voice Applications
When users ask questions, the system needs time to search the knowledge base and generate responses, and there are three different methods to maintain user engagement during this process. Here are strategies to handle these delays:
class OptimizedVoiceAssistant(VoiceAssistant): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.thinking_phrases = [ "Let me search for that information...", "I'm looking that up for you...", "One moment while I check my knowledge base..." ] async def handle_rag_delay(self, query: str): """Provide feedback during RAG search""" import random # Start search in background search_task = asyncio.create_task( self.rag_system.similarity_search(query) ) # Provide immediate feedback thinking_phrase = random.choice(self.thinking_phrases) await self.say(thinking_phrase, allow_interruptions=False) # Wait for search results try: results = await asyncio.wait_for(search_task, timeout=5.0) return results except asyncio.TimeoutError: await self.say("I'm having trouble accessing that information right now.") return []
Caching and Performance Optimization
Implement intelligent caching to reduce database calls:
python
from functools import lru_cache import hashlib class CachedRAGSystem(SupabaseRAG): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.query_cache = {} def _hash_query(self, query: str) -> str: return hashlib.md5(query.lower().encode()).hexdigest() async def similarity_search(self, query: str, limit: int = 5) -> list[str]: # Check cache first cache_key = f"{self._hash_query(query)}_{limit}" if cache_key in self.query_cache: return self.query_cache[cache_key] # Perform search results = await super().similarity_search(query, limit) # Cache results self.query_cache[cache_key] = results return results
Advanced Features and Security
Implementing Row-Level Security
You can implement fine-grain access control on your vector database using Row Level Security (RLS), which means you can restrict which documents are returned during a vector similarity search to users that have access to them.
-- Enable RLS on the documents table ALTER TABLE documents ENABLE ROW LEVEL SECURITY; -- Create a policy for user-specific access CREATE POLICY "Users can only see their documents" ON documents FOR SELECT USING ( metadata->>'user_id' = auth.uid()::TEXT OR metadata->>'public' = 'true' ); -- Create a function to check user permissions CREATE OR REPLACE FUNCTION match_documents_secure ( query_embedding VECTOR(1536), match_threshold FLOAT, match_count INT, user_id TEXT ) RETURNS TABLE ( id BIGINT, title TEXT, content TEXT, similarity FLOAT ) LANGUAGE SQL STABLE SECURITY DEFINER AS $$ SELECT documents.id, documents.title, documents.content, 1 - (documents.embedding <=> query_embedding) AS similarity FROM documents WHERE 1 - (documents.embedding <=> query_embedding) > match_threshold AND ( documents.metadata->>'user_id' = user_id OR documents.metadata->>'public' = 'true' ) ORDER BY documents.embedding <=> query_embedding LIMIT match_count; $$;
Monitoring and Analytics
Track usage and performance with built-in analytics:
python
class AnalyticsRAGSystem(SupabaseRAG): async def similarity_search(self, query: str, limit: int = 5) -> list[str]: import time start_time = time.time() try: results = await super().similarity_search(query, limit) # Log successful search self.client.table('search_analytics').insert({ 'query': query, 'results_count': len(results), 'response_time': time.time() - start_time, 'status': 'success', 'timestamp': 'now()' }).execute() return results except Exception as e: # Log failed search self.client.table('search_analytics').insert({ 'query': query, 'results_count': 0, 'response_time': time.time() - start_time, 'status': 'error', 'error_message': str(e), 'timestamp': 'now()' }).execute() raise
Deployment and Production Considerations
Environment Configuration
Set up your production environment with proper configuration:
python
import os from dataclasses import dataclass @dataclass class Config: supabase_url: str = os.getenv("SUPABASE_URL") supabase_key: str = os.getenv("SUPABASE_SERVICE_ROLE_KEY") openai_api_key: str = os.getenv("OPENAI_API_KEY") livekit_url: str = os.getenv("LIVEKIT_URL") livekit_api_key: str = os.getenv("LIVEKIT_API_KEY") livekit_api_secret: str = os.getenv("LIVEKIT_API_SECRET") def create_production_agent(): config = Config() # Validate configuration required_vars = [ config.supabase_url, config.supabase_key, config.openai_api_key, config.livekit_url ] if not all(required_vars): raise ValueError("Missing required environment variables") return OptimizedVoiceAssistant(config)
Running the Agent
Finally, here's how to run your production agent:
python
if __name__ == "__main__": cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
Conclusion
Building a RAG-powered voice assistant with LiveKit and Supabase creates a powerful foundation for intelligent, context-aware AI agents. The combination of LiveKit's real-time capabilities with Supabase's vector database provides both performance and scalability while maintaining the flexibility to implement advanced features like access control and analytics.
Supabase Vector powered by pgvector allows you to create simple and efficient products, with companies storing over 1.6 million embeddings successfully in production environments. This architecture scales from prototype to production, making it an ideal choice for organizations looking to deploy sophisticated voice AI solutions.
The key to success lies in optimizing for real-time performance, implementing proper security measures, and maintaining a clean separation between your knowledge base, vector search, and voice interface components. With these foundations in place, you can build voice assistants that truly understand and respond to your users' needs with accurate, contextual information from your organization's knowledge base.