Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support
Add this skill
npx mdskills install sickn33/voice-ai-engine-developmentExceptionally detailed async worker pipeline architecture with streaming, interrupts, and multi-provider support
1---2name: voice-ai-engine-development3description: "Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support"4---56# Voice AI Engine Development78## Overview910This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.1112The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via `asyncio.Queue` objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.1314## When to Use This Skill1516Use this skill when:17- Building real-time voice conversation systems18- Implementing voice assistants or chatbots19- Creating voice-enabled customer service agents20- Developing voice AI applications with interrupt capabilities21- Integrating multiple transcription, LLM, or TTS providers22- Working with streaming audio processing pipelines23- The user mentions Vocode, voice engines, or conversational AI2425## Core Architecture Principles2627### The Worker Pipeline Pattern2829Every voice AI engine follows this pipeline:3031```32Audio In → Transcriber → Agent → Synthesizer → Audio Out33 (Worker 1) (Worker 2) (Worker 3)34```3536**Key Benefits:**37- **Decoupling**: Workers only know about their input/output queues38- **Concurrency**: All workers run simultaneously via asyncio39- **Backpressure**: Queues automatically handle rate differences40- **Interruptibility**: Everything can be stopped mid-stream4142### Base Worker Pattern4344Every worker follows this pattern:4546```python47class BaseWorker:48 def __init__(self, input_queue, output_queue):49 self.input_queue = input_queue # asyncio.Queue to consume from50 self.output_queue = output_queue # asyncio.Queue to produce to51 self.active = False5253 def start(self):54 """Start the worker's processing loop"""55 self.active = True56 asyncio.create_task(self._run_loop())5758 async def _run_loop(self):59 """Main processing loop - runs forever until terminated"""60 while self.active:61 item = await self.input_queue.get() # Block until item arrives62 await self.process(item) # Process the item6364 async def process(self, item):65 """Override this - does the actual work"""66 raise NotImplementedError6768 def terminate(self):69 """Stop the worker"""70 self.active = False71```7273## Component Implementation Guide7475### 1. Transcriber (Audio → Text)7677**Purpose**: Converts incoming audio chunks to text transcriptions7879**Interface Requirements**:80```python81class BaseTranscriber:82 def __init__(self, transcriber_config):83 self.input_queue = asyncio.Queue() # Audio chunks (bytes)84 self.output_queue = asyncio.Queue() # Transcriptions85 self.is_muted = False8687 def send_audio(self, chunk: bytes):88 """Client calls this to send audio"""89 if not self.is_muted:90 self.input_queue.put_nowait(chunk)91 else:92 # Send silence instead (prevents echo during bot speech)93 self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))9495 def mute(self):96 """Called when bot starts speaking (prevents echo)"""97 self.is_muted = True9899 def unmute(self):100 """Called when bot stops speaking"""101 self.is_muted = False102```103104**Output Format**:105```python106class Transcription:107 message: str # "Hello, how are you?"108 confidence: float # 0.95109 is_final: bool # True = complete sentence, False = partial110 is_interrupt: bool # Set by TranscriptionsWorker111```112113**Supported Providers**:114- **Deepgram** - Fast, accurate, streaming115- **AssemblyAI** - High accuracy, good for accents116- **Azure Speech** - Enterprise-grade117- **Google Cloud Speech** - Multi-language support118119**Critical Implementation Details**:120- Use WebSocket for bidirectional streaming121- Run sender and receiver tasks concurrently with `asyncio.gather()`122- Mute transcriber when bot speaks to prevent echo/feedback loops123- Handle both final and partial transcriptions124125### 2. Agent (Text → Response)126127**Purpose**: Processes user input and generates conversational responses128129**Interface Requirements**:130```python131class BaseAgent:132 def __init__(self, agent_config):133 self.input_queue = asyncio.Queue() # TranscriptionAgentInput134 self.output_queue = asyncio.Queue() # AgentResponse135 self.transcript = None # Conversation history136137 async def generate_response(self, human_input, is_interrupt, conversation_id):138 """Override this - returns AsyncGenerator of responses"""139 raise NotImplementedError140```141142**Why Streaming Responses?**143- **Lower latency**: Start speaking as soon as first sentence is ready144- **Better interrupts**: Can stop mid-response145- **Sentence-by-sentence**: More natural conversation flow146147**Supported Providers**:148- **OpenAI** (GPT-4, GPT-3.5) - High quality, fast149- **Google Gemini** - Multimodal, cost-effective150- **Anthropic Claude** - Long context, nuanced responses151152**Critical Implementation Details**:153- Maintain conversation history in `Transcript` object154- Stream responses using `AsyncGenerator`155- **IMPORTANT**: Buffer entire LLM response before yielding to synthesizer (prevents audio jumping)156- Handle interrupts by canceling current generation task157- Update conversation history with partial messages on interrupt158159### 3. Synthesizer (Text → Audio)160161**Purpose**: Converts agent text responses to speech audio162163**Interface Requirements**:164```python165class BaseSynthesizer:166 async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:167 """168 Returns a SynthesisResult containing:169 - chunk_generator: AsyncGenerator that yields audio chunks170 - get_message_up_to: Function to get partial text (for interrupts)171 """172 raise NotImplementedError173```174175**SynthesisResult Structure**:176```python177class SynthesisResult:178 chunk_generator: AsyncGenerator[ChunkResult, None]179 get_message_up_to: Callable[[float], str] # seconds → partial text180181 class ChunkResult:182 chunk: bytes # Raw PCM audio183 is_last_chunk: bool184```185186**Supported Providers**:187- **ElevenLabs** - Most natural voices, streaming188- **Azure TTS** - Enterprise-grade, many languages189- **Google Cloud TTS** - Cost-effective, good quality190- **Amazon Polly** - AWS integration191- **Play.ht** - Voice cloning192193**Critical Implementation Details**:194- Stream audio chunks as they're generated195- Convert audio to LINEAR16 PCM format (16kHz sample rate)196- Implement `get_message_up_to()` for interrupt handling197- Handle audio format conversion (MP3 → PCM)198199### 4. Output Device (Audio → Client)200201**Purpose**: Sends synthesized audio back to the client202203**CRITICAL: Rate Limiting for Interrupts**204205```python206async def send_speech_to_output(self, message, synthesis_result,207 stop_event, seconds_per_chunk):208 chunk_idx = 0209 async for chunk_result in synthesis_result.chunk_generator:210 # Check for interrupt211 if stop_event.is_set():212 logger.debug(f"Interrupted after {chunk_idx} chunks")213 message_sent = synthesis_result.get_message_up_to(214 chunk_idx * seconds_per_chunk215 )216 return message_sent, True # cut_off = True217218 start_time = time.time()219220 # Send chunk to output device221 self.output_device.consume_nonblocking(chunk_result.chunk)222223 # CRITICAL: Wait for chunk to play before sending next one224 # This is what makes interrupts work!225 speech_length = seconds_per_chunk226 processing_time = time.time() - start_time227 await asyncio.sleep(max(speech_length - processing_time, 0))228229 chunk_idx += 1230231 return message, False # cut_off = False232```233234**Why Rate Limiting?**235Without rate limiting, all audio chunks would be sent immediately, which would:236- Buffer entire message on client side237- Make interrupts impossible (all audio already sent)238- Cause timing issues239240By sending one chunk every N seconds:241- Real-time playback is maintained242- Interrupts can stop mid-sentence243- Natural conversation flow is preserved244245## The Interrupt System246247The interrupt system is critical for natural conversations.248249### How Interrupts Work250251**Scenario**: Bot is saying "I think the weather will be nice today and tomorrow and—" when user interrupts with "Stop".252253**Step 1: User starts speaking**254```python255# TranscriptionsWorker detects new transcription while bot speaking256async def process(self, transcription):257 if not self.conversation.is_human_speaking: # Bot was speaking!258 # Broadcast interrupt to all in-flight events259 interrupted = self.conversation.broadcast_interrupt()260 transcription.is_interrupt = interrupted261```262263**Step 2: broadcast_interrupt() stops everything**264```python265def broadcast_interrupt(self):266 num_interrupts = 0267 # Interrupt all queued events268 while True:269 try:270 interruptible_event = self.interruptible_events.get_nowait()271 if interruptible_event.interrupt(): # Sets interruption_event272 num_interrupts += 1273 except queue.Empty:274 break275276 # Cancel current tasks277 self.agent.cancel_current_task() # Stop generating text278 self.agent_responses_worker.cancel_current_task() # Stop synthesizing279 return num_interrupts > 0280```281282**Step 3: SynthesisResultsWorker detects interrupt**283```python284async def send_speech_to_output(self, synthesis_result, stop_event, ...):285 async for chunk_result in synthesis_result.chunk_generator:286 # Check stop_event (this is the interruption_event)287 if stop_event.is_set():288 logger.debug("Interrupted! Stopping speech.")289 # Calculate what was actually spoken290 seconds_spoken = chunk_idx * seconds_per_chunk291 partial_message = synthesis_result.get_message_up_to(seconds_spoken)292 # e.g., "I think the weather will be nice today"293 return partial_message, True # cut_off = True294```295296**Step 4: Agent updates history**297```python298if cut_off:299 # Update conversation history with partial message300 self.agent.update_last_bot_message_on_cut_off(message_sent)301 # History now shows:302 # Bot: "I think the weather will be nice today" (incomplete)303```304305### InterruptibleEvent Pattern306307Every event in the pipeline is wrapped in an `InterruptibleEvent`:308309```python310class InterruptibleEvent:311 def __init__(self, payload, is_interruptible=True):312 self.payload = payload313 self.is_interruptible = is_interruptible314 self.interruption_event = threading.Event() # Initially not set315 self.interrupted = False316317 def interrupt(self) -> bool:318 """Interrupt this event"""319 if not self.is_interruptible:320 return False321 if not self.interrupted:322 self.interruption_event.set() # Signal to stop!323 self.interrupted = True324 return True325 return False326327 def is_interrupted(self) -> bool:328 return self.interruption_event.is_set()329```330331## Multi-Provider Factory Pattern332333Support multiple providers with a factory pattern:334335```python336class VoiceHandler:337 """Multi-provider factory for voice components"""338339 def create_transcriber(self, agent_config: Dict):340 """Create transcriber based on transcriberProvider"""341 provider = agent_config.get("transcriberProvider", "deepgram")342343 if provider == "deepgram":344 return self._create_deepgram_transcriber(agent_config)345 elif provider == "assemblyai":346 return self._create_assemblyai_transcriber(agent_config)347 elif provider == "azure":348 return self._create_azure_transcriber(agent_config)349 elif provider == "google":350 return self._create_google_transcriber(agent_config)351 else:352 raise ValueError(f"Unknown transcriber provider: {provider}")353354 def create_agent(self, agent_config: Dict):355 """Create LLM agent based on llmProvider"""356 provider = agent_config.get("llmProvider", "openai")357358 if provider == "openai":359 return self._create_openai_agent(agent_config)360 elif provider == "gemini":361 return self._create_gemini_agent(agent_config)362 else:363 raise ValueError(f"Unknown LLM provider: {provider}")364365 def create_synthesizer(self, agent_config: Dict):366 """Create voice synthesizer based on voiceProvider"""367 provider = agent_config.get("voiceProvider", "elevenlabs")368369 if provider == "elevenlabs":370 return self._create_elevenlabs_synthesizer(agent_config)371 elif provider == "azure":372 return self._create_azure_synthesizer(agent_config)373 elif provider == "google":374 return self._create_google_synthesizer(agent_config)375 elif provider == "polly":376 return self._create_polly_synthesizer(agent_config)377 elif provider == "playht":378 return self._create_playht_synthesizer(agent_config)379 else:380 raise ValueError(f"Unknown voice provider: {provider}")381```382383## WebSocket Integration384385Voice AI engines typically use WebSocket for bidirectional audio streaming:386387```python388@app.websocket("/conversation")389async def websocket_endpoint(websocket: WebSocket):390 await websocket.accept()391392 # Create voice components393 voice_handler = VoiceHandler()394 transcriber = voice_handler.create_transcriber(agent_config)395 agent = voice_handler.create_agent(agent_config)396 synthesizer = voice_handler.create_synthesizer(agent_config)397398 # Create output device399 output_device = WebsocketOutputDevice(400 ws=websocket,401 sampling_rate=16000,402 audio_encoding=AudioEncoding.LINEAR16403 )404405 # Create conversation orchestrator406 conversation = StreamingConversation(407 output_device=output_device,408 transcriber=transcriber,409 agent=agent,410 synthesizer=synthesizer411 )412413 # Start all workers414 await conversation.start()415416 try:417 # Receive audio from client418 async for message in websocket.iter_bytes():419 conversation.receive_audio(message)420 except WebSocketDisconnect:421 logger.info("Client disconnected")422 finally:423 await conversation.terminate()424```425426## Common Pitfalls and Solutions427428### 1. Audio Jumping/Cutting Off429430**Problem**: Bot's audio jumps or cuts off mid-response.431432**Cause**: Sending text to synthesizer in small chunks causes multiple TTS calls.433434**Solution**: Buffer the entire LLM response before sending to synthesizer:435436```python437# ❌ Bad: Yields sentence-by-sentence438async for sentence in llm_stream:439 yield GeneratedResponse(message=BaseMessage(text=sentence))440441# ✅ Good: Buffer entire response442full_response = ""443async for chunk in llm_stream:444 full_response += chunk445yield GeneratedResponse(message=BaseMessage(text=full_response))446```447448### 2. Echo/Feedback Loop449450**Problem**: Bot hears itself speaking and responds to its own audio.451452**Cause**: Transcriber not muted during bot speech.453454**Solution**: Mute transcriber when bot starts speaking:455456```python457# Before sending audio to output458self.transcriber.mute()459# After audio playback complete460self.transcriber.unmute()461```462463### 3. Interrupts Not Working464465**Problem**: User can't interrupt bot mid-sentence.466467**Cause**: All audio chunks sent at once instead of rate-limited.468469**Solution**: Rate-limit audio chunks to match real-time playback:470471```python472async for chunk in synthesis_result.chunk_generator:473 start_time = time.time()474475 # Send chunk476 output_device.consume_nonblocking(chunk)477478 # Wait for chunk duration before sending next479 processing_time = time.time() - start_time480 await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))481```482483### 4. Memory Leaks from Unclosed Streams484485**Problem**: Memory usage grows over time.486487**Cause**: WebSocket connections or API streams not properly closed.488489**Solution**: Always use context managers and cleanup:490491```python492try:493 async with websockets.connect(url) as ws:494 # Use websocket495 pass496finally:497 # Cleanup498 await conversation.terminate()499 await transcriber.terminate()500```501502## Production Considerations503504### 1. Error Handling505506```python507async def _run_loop(self):508 while self.active:509 try:510 item = await self.input_queue.get()511 await self.process(item)512 except Exception as e:513 logger.error(f"Worker error: {e}", exc_info=True)514 # Don't crash the worker, continue processing515```516517### 2. Graceful Shutdown518519```python520async def terminate(self):521 """Gracefully shut down all workers"""522 self.active = False523524 # Stop all workers525 self.transcriber.terminate()526 self.agent.terminate()527 self.synthesizer.terminate()528529 # Wait for queues to drain530 await asyncio.sleep(0.5)531532 # Close connections533 if self.websocket:534 await self.websocket.close()535```536537### 3. Monitoring and Logging538539```python540# Log key events541logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'")542logger.info(f"🤖 [AGENT] Generating response...")543logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters")544logger.info(f"⚠️ [INTERRUPT] User interrupted bot")545546# Track metrics547metrics.increment("transcriptions.count")548metrics.timing("agent.response_time", duration)549metrics.gauge("active_conversations", count)550```551552### 4. Rate Limiting and Quotas553554```python555# Implement rate limiting for API calls556from aiolimiter import AsyncLimiter557558rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second559560async def call_api(self, data):561 async with rate_limiter:562 return await self.client.post(data)563```564565## Key Design Patterns566567### 1. Producer-Consumer with Queues568569```python570# Producer571async def producer(queue):572 while True:573 item = await generate_item()574 queue.put_nowait(item)575576# Consumer577async def consumer(queue):578 while True:579 item = await queue.get()580 await process_item(item)581```582583### 2. Streaming Generators584585Instead of returning complete results:586587```python588# ❌ Bad: Wait for entire response589async def generate_response(prompt):590 response = await openai.complete(prompt) # 5 seconds591 return response592593# ✅ Good: Stream chunks as they arrive594async def generate_response(prompt):595 async for chunk in openai.complete(prompt, stream=True):596 yield chunk # Yield after 0.1s, 0.2s, etc.597```598599### 3. Conversation State Management600601Maintain conversation history for context:602603```python604class Transcript:605 event_logs: List[Message] = []606607 def add_human_message(self, text):608 self.event_logs.append(Message(sender=Sender.HUMAN, text=text))609610 def add_bot_message(self, text):611 self.event_logs.append(Message(sender=Sender.BOT, text=text))612613 def to_openai_messages(self):614 return [615 {"role": "user" if msg.sender == Sender.HUMAN else "assistant",616 "content": msg.text}617 for msg in self.event_logs618 ]619```620621## Testing Strategies622623### 1. Unit Test Workers in Isolation624625```python626async def test_transcriber():627 transcriber = DeepgramTranscriber(config)628629 # Mock audio input630 audio_chunk = b'\x00\x01\x02...'631 transcriber.send_audio(audio_chunk)632633 # Check output634 transcription = await transcriber.output_queue.get()635 assert transcription.message == "expected text"636```637638### 2. Integration Test Pipeline639640```python641async def test_full_pipeline():642 # Create all components643 conversation = create_test_conversation()644645 # Send test audio646 conversation.receive_audio(test_audio_chunk)647648 # Wait for response649 response = await wait_for_audio_output(timeout=5)650651 assert response is not None652```653654### 3. Test Interrupts655656```python657async def test_interrupt():658 conversation = create_test_conversation()659660 # Start bot speaking661 await conversation.agent.generate_response("Tell me a long story")662663 # Interrupt mid-response664 await asyncio.sleep(1) # Let it speak for 1 second665 conversation.broadcast_interrupt()666667 # Verify partial message in transcript668 last_message = conversation.transcript.event_logs[-1]669 assert last_message.text != full_expected_message670```671672## Implementation Workflow673674When implementing a voice AI engine:6756761. **Start with Base Workers**: Implement the base worker pattern first6772. **Add Transcriber**: Choose a provider and implement streaming transcription6783. **Add Agent**: Implement LLM integration with streaming responses6794. **Add Synthesizer**: Implement TTS with audio streaming6805. **Connect Pipeline**: Wire all workers together with queues6816. **Add Interrupts**: Implement the interrupt system6827. **Add WebSocket**: Create WebSocket endpoint for client communication6838. **Test Components**: Unit test each worker in isolation6849. **Test Integration**: Test the full pipeline end-to-end68510. **Add Error Handling**: Implement robust error handling and logging68611. **Optimize**: Add rate limiting, monitoring, and performance optimizations687688## Related Skills689690- `@websocket-patterns` - For WebSocket implementation details691- `@async-python` - For asyncio and async patterns692- `@streaming-apis` - For streaming API integration693- `@audio-processing` - For audio format conversion and processing694- `@systematic-debugging` - For debugging complex async pipelines695696## Resources697698**Libraries**:699- `asyncio` - Async programming700- `websockets` - WebSocket client/server701- `FastAPI` - WebSocket server framework702- `pydub` - Audio manipulation703- `numpy` - Audio data processing704705**API Providers**:706- Transcription: Deepgram, AssemblyAI, Azure Speech, Google Cloud Speech707- LLM: OpenAI, Google Gemini, Anthropic Claude708- TTS: ElevenLabs, Azure TTS, Google Cloud TTS, Amazon Polly, Play.ht709710## Summary711712Building a voice AI engine requires:713- ✅ Async worker pipeline for concurrent processing714- ✅ Queue-based communication between components715- ✅ Streaming at every stage (transcription, LLM, synthesis)716- ✅ Interrupt system for natural conversations717- ✅ Rate limiting for real-time audio playback718- ✅ Multi-provider support for flexibility719- ✅ Proper error handling and graceful shutdown720721**The key insight**: Everything must stream and everything must be interruptible for natural, real-time conversations.722
Full transparency — inspect the skill content before installing.