Pipeline Architecture
The pipeline is the core processing engine of your voice agent. It orchestrates the flow of audio and data between components in real-time.
Pipeline Overview
┌─────────────────────────────────────────────────┐
│ Pipeline │
│ │
Telephony ──► Audio In ────►│ VAD ──► STT ──► LLM ──► TTS ──► Audio Out │────► Telephony
(Twilio) (μ-law 8kHz) │ │ │ │ │ │ │ (μ-law 8kHz)
│ ▼ ▼ ▼ ▼ ▼ │
│ Events Transcript Response Audio Playback │
│ │
└─────────────────────────────────────────────────┘
│
▼
Session State
(Context, Variables, Tools)
Frame-Based Processing
Everything in the pipeline is a frame - a typed message that flows through processors:
type Frame interface {
Type() FrameType
Timestamp() time.Time
}
// Frame types
const (
FrameTypeAudio FrameType = "audio"
FrameTypeTranscript FrameType = "transcript"
FrameTypeLLMResponse FrameType = "llm_response"
FrameTypeTTSAudio FrameType = "tts_audio"
FrameTypeControl FrameType = "control"
FrameTypeEvent FrameType = "event"
)
Common Frame Types
| Frame Type | Direction | Description |
|---|---|---|
AudioInputFrame |
Input | Raw audio from caller |
AudioOutputFrame |
Output | Audio to send to caller |
TranscriptFrame |
Internal | STT transcript (interim/final) |
LLMResponseFrame |
Internal | LLM text response |
TTSAudioFrame |
Internal | Generated speech audio |
InterruptionFrame |
Control | User interrupted bot |
FunctionCallFrame |
Internal | LLM requested function call |
EndOfSpeechFrame |
Event | User finished speaking |
Pipeline Stages
1. Audio Input Stage
// Receive audio from telephony provider
func (p *Pipeline) processAudioInput(frame *AudioInputFrame) {
// Convert μ-law to Linear16
pcm := mulawToLinear16(frame.Audio)
// Check volume threshold
if calculateVolume(pcm) < p.config.VolumeThreshold {
return // Too quiet, skip processing
}
// Send to VAD
p.vadProcessor.Process(pcm)
// Send to STT (if not muted)
if !p.isMuted() {
p.sttProcessor.Process(pcm)
}
}
2. VAD Stage
// Voice Activity Detection
func (v *VADProcessor) Process(audio []int16) {
probability := v.model.Detect(audio)
if probability >= v.threshold {
if !v.isSpeaking {
v.isSpeaking = true
v.emit(SpeechStartFrame{})
}
v.silenceFrames = 0
} else {
v.silenceFrames++
if v.isSpeaking && v.silenceFrames >= v.silenceThreshold {
v.isSpeaking = false
v.emit(EndOfSpeechFrame{})
}
}
}
3. STT Stage
// Speech-to-Text processing
func (s *STTProcessor) Process(audio []int16) {
s.stt.SendAudio(audio)
// Handle incoming transcripts
for event := range s.stt.Events() {
if event.IsFinal {
s.emit(TranscriptFrame{
Text: event.Text,
IsFinal: true,
})
} else {
s.emit(TranscriptFrame{
Text: event.Text,
IsFinal: false,
})
}
}
}
4. LLM Stage
// LLM processing with streaming
func (l *LLMProcessor) Process(transcript *TranscriptFrame) {
if !transcript.IsFinal {
// Pre-warm context with interim results
l.prepareContext(transcript.Text)
return
}
// Generate response
messages := l.buildMessages(transcript.Text)
for token := range l.llm.StreamGenerate(messages) {
l.emit(LLMResponseFrame{
Token: token,
IsFinal: false,
})
}
l.emit(LLMResponseFrame{IsFinal: true})
}
5. TTS Stage
// Text-to-Speech with streaming
func (t *TTSProcessor) Process(response *LLMResponseFrame) {
if response.Token == "" {
return
}
t.buffer.WriteString(response.Token)
// Flush at sentence boundaries
if containsPunctuation(response.Token) {
text := t.buffer.String()
t.buffer.Reset()
for chunk := range t.tts.StreamSynthesize(text) {
t.emit(TTSAudioFrame{Audio: chunk})
}
}
}
6. Audio Output Stage
// Send audio to telephony
func (p *Pipeline) processAudioOutput(frame *TTSAudioFrame) {
// Convert to μ-law for telephony
mulaw := pcmToMulaw(frame.Audio)
// Queue for playback
p.audioQueue.Enqueue(mulaw)
// Send to telephony provider
p.telephony.SendAudio(mulaw)
}
Frame Flow Example
Timeline (ms): 0 100 200 300 400 500 600 700 800
│ │ │ │ │ │ │ │ │
User speaks: [─────████████████████████████─────────────────────]
│ │
SpeechStart EndOfSpeech
│ │
STT: [interim] [interim] [final]
│ │ │
"What" "What is" "What is my order"
│
LLM: [───stream tokens───]
│
"Your order" → "is" → "shipped"
│ │ │
TTS: [audio] [audio] [audio]
│ │ │
Output: [play] [play] [play]
Processor Interface
All pipeline processors implement a common interface:
type Processor interface {
// Process handles an incoming frame
Process(frame Frame) error
// Start initializes the processor
Start(ctx context.Context) error
// Stop gracefully shuts down
Stop() error
}
// BaseProcessor provides common functionality
type BaseProcessor struct {
outputChan chan Frame
errorChan chan error
}
func (p *BaseProcessor) emit(frame Frame) {
p.outputChan <- frame
}
Pipeline Configuration
type PipelineConfig struct {
// Audio settings
SampleRate int // 8000 for telephony
ChunkSize int // 20ms chunks (160 samples at 8kHz)
// VAD settings
VADThreshold float32
MinSilenceDuration time.Duration
VolumeThreshold float32
// STT settings
STTProvider string
STTEndpointing int
// LLM settings
LLMProvider string
LLMModel string
LLMTemperature float32
// TTS settings
TTSProvider string
TTSVoice string
// Feature flags
AllowInterruptions bool
EnableFunctionCalling bool
}
Error Handling
func (p *Pipeline) Run(ctx context.Context) error {
errChan := make(chan error, 10)
// Start all processors
processors := []Processor{
p.vadProcessor,
p.sttProcessor,
p.llmProcessor,
p.ttsProcessor,
}
for _, proc := range processors {
go func(processor Processor) {
if err := processor.Start(ctx); err != nil {
errChan <- fmt.Errorf("%T: %w", processor, err)
}
}(proc)
}
// Monitor for errors
select {
case err := <-errChan:
log.Printf("Pipeline error: %v", err)
p.handleError(err)
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (p *Pipeline) handleError(err error) {
switch {
case errors.Is(err, ErrSTTDisconnected):
p.sttProcessor.Reconnect()
case errors.Is(err, ErrLLMTimeout):
p.llmProcessor.SwitchToFallback()
case errors.Is(err, ErrTTSFailed):
p.ttsProcessor.UseFallback()
}
}
Metrics and Observability
type PipelineMetrics struct {
// Latency
VADLatency time.Duration
STTLatency time.Duration
LLMLatency time.Duration
TTSLatency time.Duration
E2ELatency time.Duration
// Counts
FramesProcessed int64
ErrorCount int64
Interruptions int64
}
func (p *Pipeline) recordMetrics(stage string, duration time.Duration) {
metrics.RecordHistogram(
fmt.Sprintf("pipeline.%s.latency_ms", stage),
duration.Milliseconds(),
)
}
Next Steps
- Frames Reference - All frame types
- VAD Configuration - Voice detection tuning
- Interruption Handling - User barge-in
- Turn Detection - Conversation flow