22. Voice Assistant Project

Chapter 22 of 22 · 30 min

This chapter integrates previous concepts into a complete voice assistant implementing wake word detection, speech recognition, LLM processing, and speech synthesis.

Project Structure

voice_assistant/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── pipeline/
│   │   ├── __init__.py
│   │   ├── wake_word.py
│   │   ├── asr.py
│   │   ├── llm.py
│   │   ├── tts.py
│   │   └── orchestrator.py
│   ├── api/
│   │   ├── __init__.py
│   │   └── routes.py
│   └── utils/
│       ├── audio.py
│       └── logging.py
├── tests/
├── models/
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

Configuration

# app/config.py
from dataclasses import dataclass
from pathlib import Path

@dataclass
class VoiceAssistantConfig:
    # Paths
    model_cache_dir: Path = Path("./models")
    
    # Wake word
    wake_word_model: str = "por models/wake-word"
    wake_word_threshold: float = 0.5
    
    # ASR
    asr_model: str = "openai/whisper-small"
    asr_language: str = "en"
    
    # LLM
    llm_model: str = "meta-llama/Llama-2-7b-chat"
    llm_quantization: str = "int8"
    max_context_tokens: int = 2048
    
    # TTS
    tts_model: str = "tts_models/en/ljspeech"
    tts_sample_rate: int = 22050
    
    # Audio
    audio_sample_rate: int = 16000
    audio_chunk_size: int = 1024
    
    # Performance
    target_latency_ms: int = 500
    enable_noise_reduction: bool = True
    
    @classmethod
    def from_env(cls):
        return cls(
            model_cache_dir=Path(os.getenv("MODEL_CACHE_DIR", "./models")),
            llm_model=os.getenv("LLM_MODEL", cls.llm_model),
            tts_model=os.getenv("TTS_MODEL", cls.tts_model)
        )

Wake Word Detection

# app/pipeline/wake_word.py
import torch
import pvporcupine

class WakeWordDetector:
    def __init__(self, access_key: str = None, keywords: list[str] = None):
        if access_key is None:
            # Use open-source alternative
            self.model = self._load_silero_wake_word()
        else:
            self.porcupine = pvporcupine.create(
                access_key=access_key,
                keywords=keywords or ["picovoice"]
            )
    
    def _load_silero_wake_word(self):
        import silero_vad
        self.model, self.utils = silero_vad.load_silero_vad()
        return self.model
    
    @torch.no_grad()
    def detect(self, audio: torch.Tensor) -> bool:
        if hasattr(self, 'porcupine'):
            pcm = audio.numpy().astype(np.int16)
            return self.porcupine.process(pcm) >= 0
        else:
            speech_prob = self.model(audio, 16000).item()
            return speech_prob > 0.5
    
    def stream_detect(self, audio_stream) -> AsyncIterator[bool]:
        buffer = torch.zeros(512)
        for chunk in audio_stream:
            buffer = torch.cat([buffer, torch.from_numpy(chunk)])
            if len(buffer) >= 512:
                yield self.detect(buffer[-512:])
                buffer = buffer[-256:]  # 50% overlap

ASR Module

# app/pipeline/asr.py
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration

class ASRModule:
    def __init__(self, model_name: str = "openai/whisper-small"):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.processor = WhisperProcessor.from_pretrained(model_name)
        self.model = WhisperForConditionalGeneration.from_pretrained(model_name)
        self.model.to(self.device)
        self.model.eval()
    
    @torch.no_grad()
    async def transcribe(self, audio: np.ndarray) -> str:
        inputs = self.processor(
            audio, 
            sampling_rate=16000, 
            return_tensors="pt"
        ).to(self.device)
        
        generated_ids = self.model.generate(
            inputs["input_features"],
            max_new_tokens=256
        )
        
        transcription = self.processor.batch_decode(
            generated_ids, 
            skip_special_tokens=True
        )[0]
        
        return transcription

LLM Module

# app/pipeline/llm.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

class LLMModule:
    def __init__(self, model_name: str, quantization: str = "int8"):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        
        if quantization == "int8":
            config = BitsAndBytesConfig(load_in_8bit=True)
        else:
            config = None
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            quantization_config=config,
            device_map="auto",
            trust_remote_code=True
        )
        self.model.eval()
    
    def generate(self, prompt: str, max_new_tokens: int = 200) -> str:
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                temperature=0.7,
                top_p=0.9,
                do_sample=True
            )
        
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return self._extract_response(prompt, response)
    
    def _extract_response(self, prompt: str, full_response: str) -> str:
        return full_response[len(prompt):].strip()

TTS Module

# app/pipeline/tts.py
import numpy as np
from TTS.api import TTS

class TTSModule:
    def __init__(self, model_name: str = "tts_models/en/ljspeech"):
        self.tts = TTS(model_name)
        self.sample_rate = 22050
    
    def synthesize(self, text: str) -> np.ndarray:
        wav = self.tts.tts(text)
        return np.array(wav)
    
    def synthesize_streaming(self, text: str, chunk_duration_ms: int = 100):
        # For true streaming, would use a streaming TTS model
        full_audio = self.synthesize(text)
        
        chunk_size = int(self.sample_rate * chunk_duration_ms / 1000)
        for i in range(0, len(full_audio), chunk_size):
            yield full_audio[i:i + chunk_size]

Pipeline Orchestrator

# app/pipeline/orchestrator.py
import asyncio
from dataclasses import dataclass, field

@dataclass
class ConversationContext:
    history: list[dict] = field(default_factory=list)
    max_history: int = 10

class VoicePipelineOrchestrator:
    def __init__(self, config: VoiceAssistantConfig):
        self.config = config
        self.wake_word = WakeWordDetector()
        self.asr = ASRModule(config.asr_model)
        self.llm = LLMModule(config.llm_model, config.llm_quantization)
        self.tts = TTSModule(config.tts_model)
        self.context = ConversationContext()
        self._running = False
    
    async def run(self):
        self._running = True
        audio_stream = self._get_audio_stream()
        
        while self._running:
            # Wait for wake word
            async for is_wake in self.wake_word.stream_detect(audio_stream):
                if is_wake:
                    await self._handle_interaction()
    
    async def _handle_interaction(self):
        # Capture audio until silence
        audio_data = await self._capture_utterance()
        
        # Transcribe
        transcription = await self.asr.transcribe(audio_data)
        
        if not transcription.strip():
            return
        
        # Generate response
        prompt = self._build_prompt(transcription)
        response = self.llm.generate(prompt)
        
        # Synthesize and play
        audio_response = self.tts.synthesize(response)
        await self._play_audio(audio_response)
        
        # Update context
        self.context.history.append({
            "user": transcription,
            "assistant": response
        })
        if len(self.context.history) > self.context.max_history:
            self.context.history.pop(0)
    
    def _build_prompt(self, transcription: str) -> str:
        system_prompt = "You are a helpful voice assistant. Keep responses concise and natural."
        
        history_text = ""
        for turn in self.context.history[-3:]:
            history_text += f"User: {turn['user']}\nAssistant: {turn['assistant']}\n"
        
        return f"{system_prompt}\n\n{history_text}User: {transcription}\nAssistant:"
    
    async def _capture_utterance(self) -> np.ndarray:
        # Capture until VAD detects end of speech
        chunks = []
        async for chunk in self._get_audio_stream():
            chunks.append(chunk)
            if self._is_speech_end(chunk):
                break
        
        return np.concatenate(chunks) if chunks else np.array([])
    
    def _is_speech_end(self, chunk: np.ndarray) -> bool:
        # Simple energy-based silence detection
        energy = np.mean(np.abs(chunk))
        return energy < 0.01
    
    async def _play_audio(self, audio: np.ndarray):
        # Play audio using platform-specific implementation
        # Simplified for this example
        pass
    
    async def _get_audio_stream(self):
        # Platform-specific audio capture
        import pyaudio
        p = pyaudio.PyAudio()
        stream = p.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=16000,
            input=True,
            frames_per_buffer=1024
        )
        
        while self._running:
            data = stream.read(1024, exception_on_overflow=False)
            yield np.frombuffer(data, dtype=np.int16)
    
    def stop(self):
        self._running = False

API Routes

# app/api/routes.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import base64

app = FastAPI()
orchestrator = None

class AudioRequest(BaseModel):
    audio: str  # Base64 encoded
    language: Optional[str] = "en"

class TextRequest(BaseModel):
    text: str

@app.on_event("startup")
async def startup():
    global orchestrator
    from app.config import VoiceAssistantConfig
    from app.pipeline.orchestrator import VoicePipelineOrchestrator
    
    config = VoiceAssistantConfig.from_env()
    orchestrator = VoicePipelineOrchestrator(config)

@app.post("/process_audio")
async def process_audio(request: AudioRequest):
    audio_data = base64.b64decode(request.audio)
    result = await orchestrator.process(audio_data, language=request.language)
    
    return {
        "transcription": result["transcription"],
        "response": result["response"],
        "audio": base64.b64encode(result["audio"]).decode()
    }

@app.post("/process_text")
async def process_text(request: TextRequest):
    result = await orchestrator.process_text(request.text)
    return {"response": result["response"]}

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": True}

Main Entry Point

# app/main.py
import uvicorn
from fastapi import FastAPI
from app.config import VoiceAssistantConfig

app = FastAPI(title="Voice Assistant API")

@app.get("/")
async def root():
    return {"message": "Voice Assistant API", "version": "1.0.0"}

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        reload=False,
        workers=1
    )

Unit Tests

# tests/test_orchestrator.py
import pytest
from unittest.mock import MagicMock, AsyncMock
import numpy as np

class TestVoicePipeline:
    @pytest.fixture
    def mock_config(self):
        config = MagicMock()
        config.wake_word_model = "mock"
        config.asr_model = "mock"
        config.llm_model = "mock"
        config.tts_model = "mock"
        return config
    
    @pytest.mark.asyncio
    async def test_end_to_end_transcription(self, mock_config):
        # Create orchestrator with mocked components
        orchestrator = VoicePipelineOrchestrator(mock_config)
        orchestrator.asr.transcribe = AsyncMock(return_value="hello world")
        orchestrator.llm.generate = MagicMock(return_value="Hi there!")
        orchestrator.tts.synthesize = MagicMock(return_value=np.zeros(1000))
        
        # Process audio
        result = await orchestrator.process(np.zeros(16000))
        
        assert result["transcription"] == "hello world"
        assert result["response"] == "Hi there!"
    
    def test_context_truncation(self, mock_config):
        orchestrator = VoicePipelineOrchestrator(mock_config)
        orchestrator.context.history = [{"user": f"msg{i}", "assistant": f"resp{i}"} for i in range(15)]
        
        # Simulate adding new turn
        orchestrator.context.history.append({"user": "new", "assistant": "new_resp"})
        
        # Should truncate to max_history
        assert len(orchestrator.context.history) <= orchestrator.context.max_history

Running the Assistant

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export LLM_MODEL="meta-llama/Llama-2-7b-chat"
export MODEL_CACHE_DIR="./models"

# Run with Docker
docker-compose up --build

# Or run directly
python app/main.py
EXERCISE

Assemble the complete voice assistant from this chapter. Add a test that mocks the ASR and LLM modules and verifies the orchestrator correctly maintains conversation history and produces appropriate responses. Time: 15 minutes.