WebSocket streaming is the backbone of real-time transcription. Unlike REST APIs that require complete audio files, WebSocket connections allow bidirectional communication — sending audio chunks and receiving transcripts simultaneously. In this guide, we'll build a production-grade streaming transcription API.
Why WebSockets for Transcription?
HTTP request-response cycles introduce latency that's unacceptable for real-time applications. With WebSockets:
- Audio streams continuously without waiting for responses
- Partial (interim) results arrive before the speaker finishes
- Connection overhead happens once, not per request
- Server-push enables features like endpointing notifications
Architecture Overview
Client Server ASR Provider
| | |
|-- audio chunk ---------->| |
| |-- forward audio --------->|
| |<-- interim transcript ---|
|<-- interim result ------| |
|-- audio chunk ---------->| |
| |-- forward audio --------->|
| |<-- final transcript -----|
|<-- final result --------| |
Server Implementation with FastAPI
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
import websockets
app = FastAPI()
DEEPGRAM_WS = "wss://api.deepgram.com/v1/listen"
DEEPGRAM_KEY = "your-key"
@app.websocket("/ws/transcribe")
async def transcribe(websocket: WebSocket):
await websocket.accept()
# Connect to Deepgram
headers = {"Authorization": f"Token {DEEPGRAM_KEY}"}
params = "?model=nova-2&smart_format=true&language=en"
async with websockets.connect(
f"{DEEPGRAM_WS}{params}",
extra_headers=headers
) as dg_ws:
async def forward_audio():
try:
while True:
data = await websocket.receive_bytes()
await dg_ws.send(data)
except WebSocketDisconnect:
await dg_ws.send(b"") # Signal end of audio
async def forward_transcripts():
try:
async for msg in dg_ws:
result = json.loads(msg)
transcript = (
result.get("channel", {})
.get("alternatives", [{}])[0]
.get("transcript", "")
)
if transcript:
await websocket.send_json({
"transcript": transcript,
"is_final": result.get("is_final", False),
"speech_final": result.get("speech_final", False),
})
except Exception:
pass
await asyncio.gather(forward_audio(), forward_transcripts())
Connection Lifecycle Management
Production WebSocket connections need careful lifecycle management:
Handling Disconnections
async def connect_with_retry(url, headers, max_retries=3):
for attempt in range(max_retries):
try:
ws = await websockets.connect(url, extra_headers=headers)
return ws
except Exception as e:
if attempt == max_retries - 1:
raise
wait = 0.1 * (2 ** attempt) # Exponential backoff
await asyncio.sleep(wait)
Client-Side JavaScript
class TranscriptionClient {
constructor(url) {
this.url = url;
this.ws = null;
this.onTranscript = null;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (this.onTranscript) {
this.onTranscript(data.transcript, data.is_final);
}
};
this.ws.onclose = () => setTimeout(() => this.connect(), 1000);
}
sendAudio(chunk) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(chunk);
}
}
}
// Usage
const client = new TranscriptionClient('ws://localhost:8000/ws/transcribe');
client.onTranscript = (text, isFinal) => {
console.log(isFinal ? `FINAL: ${text}` : `interim: ${text}`);
};
client.connect();
Error Handling Patterns
| Error Type | Handling Strategy |
|---|---|
| Network disconnect | Exponential backoff reconnection |
| ASR provider error | Failover to local ASR |
| Audio format mismatch | Validate on connect, reject with clear error |
| Rate limiting | Queue audio chunks, drain on reconnect |
| Memory overflow | Ring buffer with fixed size for audio chunks |
"Reliable WebSocket streaming is the hardest part of building a real-time transcription system. Get the connection lifecycle right, and everything else falls into place." — Audio Engineering Team at Voxclar
For more on the transcription pipeline, read our Python speech-to-text tutorial and our complete guide to real-time transcription for meetings.
Voxclar