22. Voice Assistant Project
Chapter 22 of 22 · 30 min
This chapter integrates previous concepts into a complete voice assistant implementing wake word detection, speech recognition, LLM processing, and speech synthesis.
Project Structure
voice_assistant/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── pipeline/
│ │ ├── __init__.py
│ │ ├── wake_word.py
│ │ ├── asr.py
│ │ ├── llm.py
│ │ ├── tts.py
│ │ └── orchestrator.py
│ ├── api/
│ │ ├── __init__.py
│ │ └── routes.py
│ └── utils/
│ ├── audio.py
│ └── logging.py
├── tests/
├── models/
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Configuration
# app/config.py
from dataclasses import dataclass
from pathlib import Path
@dataclass
class VoiceAssistantConfig:
# Paths
model_cache_dir: Path = Path("./models")
# Wake word
wake_word_model: str = "por models/wake-word"
wake_word_threshold: float = 0.5
# ASR
asr_model: str = "openai/whisper-small"
asr_language: str = "en"
# LLM
llm_model: str = "meta-llama/Llama-2-7b-chat"
llm_quantization: str = "int8"
max_context_tokens: int = 2048
# TTS
tts_model: str = "tts_models/en/ljspeech"
tts_sample_rate: int = 22050
# Audio
audio_sample_rate: int = 16000
audio_chunk_size: int = 1024
# Performance
target_latency_ms: int = 500
enable_noise_reduction: bool = True
@classmethod
def from_env(cls):
return cls(
model_cache_dir=Path(os.getenv("MODEL_CACHE_DIR", "./models")),
llm_model=os.getenv("LLM_MODEL", cls.llm_model),
tts_model=os.getenv("TTS_MODEL", cls.tts_model)
)
Wake Word Detection
# app/pipeline/wake_word.py
import torch
import pvporcupine
class WakeWordDetector:
def __init__(self, access_key: str = None, keywords: list[str] = None):
if access_key is None:
# Use open-source alternative
self.model = self._load_silero_wake_word()
else:
self.porcupine = pvporcupine.create(
access_key=access_key,
keywords=keywords or ["picovoice"]
)
def _load_silero_wake_word(self):
import silero_vad
self.model, self.utils = silero_vad.load_silero_vad()
return self.model
@torch.no_grad()
def detect(self, audio: torch.Tensor) -> bool:
if hasattr(self, 'porcupine'):
pcm = audio.numpy().astype(np.int16)
return self.porcupine.process(pcm) >= 0
else:
speech_prob = self.model(audio, 16000).item()
return speech_prob > 0.5
def stream_detect(self, audio_stream) -> AsyncIterator[bool]:
buffer = torch.zeros(512)
for chunk in audio_stream:
buffer = torch.cat([buffer, torch.from_numpy(chunk)])
if len(buffer) >= 512:
yield self.detect(buffer[-512:])
buffer = buffer[-256:] # 50% overlap
ASR Module
# app/pipeline/asr.py
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration
class ASRModule:
def __init__(self, model_name: str = "openai/whisper-small"):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.processor = WhisperProcessor.from_pretrained(model_name)
self.model = WhisperForConditionalGeneration.from_pretrained(model_name)
self.model.to(self.device)
self.model.eval()
@torch.no_grad()
async def transcribe(self, audio: np.ndarray) -> str:
inputs = self.processor(
audio,
sampling_rate=16000,
return_tensors="pt"
).to(self.device)
generated_ids = self.model.generate(
inputs["input_features"],
max_new_tokens=256
)
transcription = self.processor.batch_decode(
generated_ids,
skip_special_tokens=True
)[0]
return transcription
LLM Module
# app/pipeline/llm.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
class LLMModule:
def __init__(self, model_name: str, quantization: str = "int8"):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.tokenizer.pad_token = self.tokenizer.eos_token
if quantization == "int8":
config = BitsAndBytesConfig(load_in_8bit=True)
else:
config = None
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=config,
device_map="auto",
trust_remote_code=True
)
self.model.eval()
def generate(self, prompt: str, max_new_tokens: int = 200) -> str:
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=0.7,
top_p=0.9,
do_sample=True
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return self._extract_response(prompt, response)
def _extract_response(self, prompt: str, full_response: str) -> str:
return full_response[len(prompt):].strip()
TTS Module
# app/pipeline/tts.py
import numpy as np
from TTS.api import TTS
class TTSModule:
def __init__(self, model_name: str = "tts_models/en/ljspeech"):
self.tts = TTS(model_name)
self.sample_rate = 22050
def synthesize(self, text: str) -> np.ndarray:
wav = self.tts.tts(text)
return np.array(wav)
def synthesize_streaming(self, text: str, chunk_duration_ms: int = 100):
# For true streaming, would use a streaming TTS model
full_audio = self.synthesize(text)
chunk_size = int(self.sample_rate * chunk_duration_ms / 1000)
for i in range(0, len(full_audio), chunk_size):
yield full_audio[i:i + chunk_size]
Pipeline Orchestrator
# app/pipeline/orchestrator.py
import asyncio
from dataclasses import dataclass, field
@dataclass
class ConversationContext:
history: list[dict] = field(default_factory=list)
max_history: int = 10
class VoicePipelineOrchestrator:
def __init__(self, config: VoiceAssistantConfig):
self.config = config
self.wake_word = WakeWordDetector()
self.asr = ASRModule(config.asr_model)
self.llm = LLMModule(config.llm_model, config.llm_quantization)
self.tts = TTSModule(config.tts_model)
self.context = ConversationContext()
self._running = False
async def run(self):
self._running = True
audio_stream = self._get_audio_stream()
while self._running:
# Wait for wake word
async for is_wake in self.wake_word.stream_detect(audio_stream):
if is_wake:
await self._handle_interaction()
async def _handle_interaction(self):
# Capture audio until silence
audio_data = await self._capture_utterance()
# Transcribe
transcription = await self.asr.transcribe(audio_data)
if not transcription.strip():
return
# Generate response
prompt = self._build_prompt(transcription)
response = self.llm.generate(prompt)
# Synthesize and play
audio_response = self.tts.synthesize(response)
await self._play_audio(audio_response)
# Update context
self.context.history.append({
"user": transcription,
"assistant": response
})
if len(self.context.history) > self.context.max_history:
self.context.history.pop(0)
def _build_prompt(self, transcription: str) -> str:
system_prompt = "You are a helpful voice assistant. Keep responses concise and natural."
history_text = ""
for turn in self.context.history[-3:]:
history_text += f"User: {turn['user']}\nAssistant: {turn['assistant']}\n"
return f"{system_prompt}\n\n{history_text}User: {transcription}\nAssistant:"
async def _capture_utterance(self) -> np.ndarray:
# Capture until VAD detects end of speech
chunks = []
async for chunk in self._get_audio_stream():
chunks.append(chunk)
if self._is_speech_end(chunk):
break
return np.concatenate(chunks) if chunks else np.array([])
def _is_speech_end(self, chunk: np.ndarray) -> bool:
# Simple energy-based silence detection
energy = np.mean(np.abs(chunk))
return energy < 0.01
async def _play_audio(self, audio: np.ndarray):
# Play audio using platform-specific implementation
# Simplified for this example
pass
async def _get_audio_stream(self):
# Platform-specific audio capture
import pyaudio
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024
)
while self._running:
data = stream.read(1024, exception_on_overflow=False)
yield np.frombuffer(data, dtype=np.int16)
def stop(self):
self._running = False
API Routes
# app/api/routes.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import base64
app = FastAPI()
orchestrator = None
class AudioRequest(BaseModel):
audio: str # Base64 encoded
language: Optional[str] = "en"
class TextRequest(BaseModel):
text: str
@app.on_event("startup")
async def startup():
global orchestrator
from app.config import VoiceAssistantConfig
from app.pipeline.orchestrator import VoicePipelineOrchestrator
config = VoiceAssistantConfig.from_env()
orchestrator = VoicePipelineOrchestrator(config)
@app.post("/process_audio")
async def process_audio(request: AudioRequest):
audio_data = base64.b64decode(request.audio)
result = await orchestrator.process(audio_data, language=request.language)
return {
"transcription": result["transcription"],
"response": result["response"],
"audio": base64.b64encode(result["audio"]).decode()
}
@app.post("/process_text")
async def process_text(request: TextRequest):
result = await orchestrator.process_text(request.text)
return {"response": result["response"]}
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": True}
Main Entry Point
# app/main.py
import uvicorn
from fastapi import FastAPI
from app.config import VoiceAssistantConfig
app = FastAPI(title="Voice Assistant API")
@app.get("/")
async def root():
return {"message": "Voice Assistant API", "version": "1.0.0"}
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=False,
workers=1
)
Unit Tests
# tests/test_orchestrator.py
import pytest
from unittest.mock import MagicMock, AsyncMock
import numpy as np
class TestVoicePipeline:
@pytest.fixture
def mock_config(self):
config = MagicMock()
config.wake_word_model = "mock"
config.asr_model = "mock"
config.llm_model = "mock"
config.tts_model = "mock"
return config
@pytest.mark.asyncio
async def test_end_to_end_transcription(self, mock_config):
# Create orchestrator with mocked components
orchestrator = VoicePipelineOrchestrator(mock_config)
orchestrator.asr.transcribe = AsyncMock(return_value="hello world")
orchestrator.llm.generate = MagicMock(return_value="Hi there!")
orchestrator.tts.synthesize = MagicMock(return_value=np.zeros(1000))
# Process audio
result = await orchestrator.process(np.zeros(16000))
assert result["transcription"] == "hello world"
assert result["response"] == "Hi there!"
def test_context_truncation(self, mock_config):
orchestrator = VoicePipelineOrchestrator(mock_config)
orchestrator.context.history = [{"user": f"msg{i}", "assistant": f"resp{i}"} for i in range(15)]
# Simulate adding new turn
orchestrator.context.history.append({"user": "new", "assistant": "new_resp"})
# Should truncate to max_history
assert len(orchestrator.context.history) <= orchestrator.context.max_history
Running the Assistant
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export LLM_MODEL="meta-llama/Llama-2-7b-chat"
export MODEL_CACHE_DIR="./models"
# Run with Docker
docker-compose up --build
# Or run directly
python app/main.py
EXERCISE
Assemble the complete voice assistant from this chapter. Add a test that mocks the ASR and LLM modules and verifies the orchestrator correctly maintains conversation history and produces appropriate responses. Time: 15 minutes.