12. WebSocket Client
Real-time voice AI demands sub-100ms round-trip latency. WebSocket connections provide the bidirectional, low-latency transport required for streaming audio to and from LLM inference endpoints.
Connection Architecture
A voice WebSocket client manages three concurrent streams: incoming audio from the microphone, outgoing transcription to the server, and incoming LLM response tokens. The client must handle connection drops, reconnection with exponential backoff, and protocol keepalive.
import asyncio
import websockets
import pyaudio
import json
from typing import Optional
class VoiceWebSocketClient:
def __init__(self, uri: str, sample_rate: int = 16000):
self.uri = uri
self.sample_rate = sample_rate
self.audio_format = pyaudio.paInt16
self._running = False
self._connection: Optional[websockets.WebSocketClientProtocol] = None
async def connect(self) -> None:
self._connection = await websockets.connect(
self.uri,
ping_interval=20,
ping_timeout=10,
close_timeout=5
)
async def stream_audio(self, buffer_size: int = 1024):
audio = pyaudio.PyAudio()
stream = audio.open(
format=self.audio_format,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=buffer_size
)
self._running = True
try:
while self._running:
audio_data = stream.read(buffer_size, exception_on_overflow=False)
if self._connection and self._connection.open:
await self._connection.send(audio_data)
await asyncio.sleep(0.01)
finally:
stream.stop_stream()
stream.close()
audio.terminate()
async def receive_responses(self):
async for message in self._connection:
if isinstance(message, bytes):
yield message # Audio response
else:
yield json.loads(message) # Metadata/transcription
Backpressure and Flow Control
When the LLM generates faster than audio playback requires, buffer management prevents memory exhaustion. Use a bounded queue with configurable depth and drop oldest frames when necessary.
from collections import deque
class BoundedAudioBuffer:
def __init__(self, maxlen: int = 100):
self._buffer = deque(maxlen=maxlen)
self._lock = asyncio.Lock()
async def put(self, frame: bytes):
async with self._lock:
if len(self._buffer) >= self._buffer.maxlen:
self._buffer.popleft() # Drop oldest
self._buffer.append(frame)
async def get(self) -> Optional[bytes]:
async with self._lock:
return self._buffer.popleft() if self._buffer else None
Reconnection Logic
Network failures require graceful reconnection without losing conversation context.
async def run_with_reconnect(self, max_retries: int = 5):
for attempt in range(max_retries):
try:
await self.connect()
await asyncio.gather(
self.stream_audio(),
self.receive_responses()
)
except websockets.exceptions.ConnectionClosed:
wait_time = min(2 ** attempt, 30)
print(f"Reconnecting in {wait_time}s...")
await asyncio.sleep(wait_time)
except Exception as e:
print(f"Unexpected error: {e}")
break
WebSocket heartbeat intervals must be shorter than intermediate proxy timeouts—30 seconds is a safe default for most deployments.
Implement a WebSocket client that streams audio to a server and plays received TTS responses using pygame.mixer. Handle connection drops gracefully with three retry attempts and notify the user when reconnection fails. Time: 15 minutes.