技术链路介绍

了解不同模式的技术架构,选择适合的方案,或快速接入你的客户端

概览

本系统支持两种交互方式三种技术链路的灵活组合:

交互方式 说明 适用场景
半双工(按住说话) 按住麦克风录音,松开触发 AI 嘈杂环境、精确控制
全双工(自由对话) 持续监听,自动检测说话 自然对话、免提
技术链路 说明 特点
Realtime 端到端 Qwen Realtime API 延迟最低,架构最简
ASR + Qwen-Omni 语音识别 + Omni 模型 可显示转写文字
ASR + LLM + TTS 三段式处理 支持自定义/克隆音色

1. Realtime 端到端

半双工 + Realtime 推荐

按住说话,松开触发 AI。一个模型处理 ASR + 对话 + TTS,延迟最低。

sequenceDiagram participant U as 用户 participant B as 浏览器 participant S as 服务器 participant Q as Qwen Realtime U->>B: 按下麦克风 B->>S: start (turn_id) loop 录音中 U->>B: 说话 B->>S: audio (opus) S->>Q: append_audio end U->>B: 松开麦克风 B->>S: stop S->>Q: commit + create_response S-->>B: ai_thinking Q-->>S: audio_transcript (流式) S-->>B: transcript Q-->>S: audio.delta (流式) S-->>B: audio (opus) B-->>U: 播放 AI 语音 Q-->>S: response.done S-->>B: response_done

WebSocket 连接

const ws = new WebSocket(
  `ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=realtime`
);
输入音频 Opus, 16kHz, 32kbps
输出音频 Opus, 24kHz, 32kbps
延迟 最低 (~200ms)
打断 开始新录音自动打断
全双工 + Realtime

持续发送音频,服务端 VAD 自动检测说话,支持自然打断。

sequenceDiagram participant U as 用户 participant B as 浏览器 participant S as 服务器 participant Q as Qwen Realtime B->>S: audio (持续发送) S->>Q: audio Note over Q: VAD 检测到说话 Q-->>S: speech_started S-->>B: vad_start U->>B: 说完了 Q-->>S: speech_stopped S-->>B: vad_end Q-->>S: committed S-->>B: ai_thinking Q-->>S: 响应 (流式) S-->>B: transcript + audio B-->>U: 播放 AI 语音 Note over U: 打断 AI (直接开口) B->>S: audio Q-->>S: speech_started S->>Q: cancel_response S-->>B: response_done (interrupted)

WebSocket 连接

const ws = new WebSocket(
  `ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=full_duplex&tech=realtime`
);

2. ASR + Qwen-Omni

半双工 + Omni

独立 ASR 进行语音识别,可实时显示用户说话内容。支持跨会话记忆。

sequenceDiagram participant U as 用户 participant B as 浏览器 participant S as 服务器 participant ASR as Qwen ASR participant LLM as Qwen-Omni participant M as mem0 U->>B: 按下麦克风 B->>S: start S->>ASR: 连接 loop 录音中 B->>S: audio S->>ASR: audio ASR-->>S: 转写 (流式) S-->>B: user_transcript_delta B-->>U: 显示说话内容 end U->>B: 松开麦克风 B->>S: stop S-->>B: user_transcript_done S->>M: 查询相关记忆 M-->>S: 历史上下文 S-->>B: ai_thinking S->>LLM: 请求 (含记忆) LLM-->>S: 响应 (流式) S-->>B: transcript + audio B-->>U: 播放 AI 语音 S->>M: 存储新记忆

WebSocket 连接

const ws = new WebSocket(
  `ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=omni`
);
用户转写 实时显示说话内容
跨会话记忆 通过 mem0 实现

3. ASR + LLM + TTS (Pipeline)

半双工 + Pipeline 自定义音色

三段式处理,每个环节独立可替换。支持自定义音色和声音克隆

sequenceDiagram participant U as 用户 participant B as 浏览器 participant S as 服务器 participant ASR as Qwen ASR participant LLM as Qwen LLM participant TTS as CosyVoice participant M as mem0 U->>B: 按下麦克风 B->>S: start loop 录音中 B->>S: audio S->>ASR: audio ASR-->>S: 转写 S-->>B: user_transcript_delta end B->>S: stop S-->>B: user_transcript_done S->>M: 查询记忆 S-->>B: ai_thinking S->>LLM: 文本请求 LLM-->>S: 文本响应 (流式) S-->>B: transcript S->>TTS: 文本 (voice_id) TTS-->>S: 音频 (流式) S-->>B: audio (opus) B-->>U: 播放自定义音色 S->>M: 存储记忆

WebSocket 连接

const ws = new WebSocket(
  `ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=pipeline&voice_id=${voiceId}`
);

声音克隆 API

# 上传音频克隆声音 (10-20秒)
curl -X POST http://localhost:8089/api/voice-clone \
  -F "api_key=sk-xxx" \
  -F "audio=@voice.wav" \
  -F "name=my-voice"

# 返回
{"success": true, "voice_id": "cosyvoice-clone-xxx"}
输出音频 Opus, 24kHz, 32kbps
必需参数 voice_id
可选参数 llm_model (默认 qwen-plus)
声音克隆 上传 10-20s 音频

客户端接入指南

快速开始

1. 建立连接

const ws = new WebSocket(
  `ws://your-server:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=realtime`
);

ws.onopen = () => console.log('连接成功');
ws.onmessage = (e) => handleMessage(JSON.parse(e.data));

2. 发送音频(半双工)

// 开始录音
ws.send(JSON.stringify({ type: 'start', turn_id: generateUUID() }));

// 发送音频 (Opus, 16kHz, 32kbps, Base64)
ws.send(JSON.stringify({ type: 'audio', turn_id, data: base64Opus }));

// 停止录音
ws.send(JSON.stringify({ type: 'stop', turn_id }));

3. 处理响应

function handleMessage(msg) {
  switch (msg.type) {
    case 'audio':
      // Opus 24kHz 32kbps,需解码后播放
      playOpusAudio(base64Decode(msg.data));
      break;
    case 'transcript':
      displayText(msg.text);
      break;
    case 'response_done':
      onComplete(msg.interrupted);
      break;
    case 'error':
      console.error(msg.message);
      break;
  }
}
消息类型速查

客户端 → 服务器

类型 半双工 全双工 说明
start-开始录音,需带 turn_id
audio音频数据 (Base64)
stop-停止录音,触发 AI
text文字消息
interrupt-手动打断

服务器 → 客户端

类型 说明
connected连接成功
interrupted打断确认(半双工)
vad_start / vad_endVAD 状态(全双工)
user_transcript_delta用户语音转写(流式)
user_transcript_done用户语音转写完成
ai_thinkingAI 开始处理
transcriptAI 文本响应(流式)
audioAI 音频响应(流式)
response_doneAI 响应完成
error错误信息

记忆系统

Digest 记忆方案 推荐

基于滑动窗口 + LLM 压缩总结的记忆方案,兼顾短期高保真和长期记忆,核心原则是异步不阻塞

设计思路

传统 RAG 记忆需要先知道用户问什么才能检索相关记忆,但端到端语音场景下,建立连接时还不知道用户会说什么。Digest 方案通过时序保留 + 压缩总结解决这个问题:

flowchart LR subgraph 对话流 A[用户说话] --> B[AI 回复] B --> C{加入 Recent} end subgraph 记忆滑动窗口 C --> D[Recent 5轮] D -->|超出| E[踢到 Pending] E --> F[Pending 10轮] F -->|超出| G[异步 LLM 总结] G --> H[Summary 更新] end subgraph 上下文构建 H --> I[Summary] F --> J[Pending] D --> K[Recent] I --> L[System Prompt] J --> L K --> L end

异步不阻塞

所有记忆操作都不会阻塞对话流程:

数据结构

// MySQL 存储结构
{
  client_id: "uuid",           // 用户标识
  summary: "用户喜欢...",       // LLM 压缩后的长期记忆
  pending_turns: [...],        // 待总结的对话列表
  recent_turns: [...]          // 最近高保真对话
}
Recent 最近 5 轮,高保真
Pending 10 轮触发总结
Summary 最大 500 字
存储 MySQL 持久化
mem0 记忆方案 可选

基于向量检索的记忆方案,适合需要语义搜索的场景。

WebSocket 连接参数

// 使用 Digest(默认)
ws://localhost:8089/ws?...&memory=digest

// 使用 mem0
ws://localhost:8089/ws?...&memory=mem0

消息队列架构

设计背景

在语音对话场景中,存在两个核心挑战:

传统方案依赖前端通过 turn_id 过滤,但这造成了不必要的网络传输和前端处理负担。

Producer-Consumer 架构 核心设计

通过优先级消息队列实现 SDK 回调线程与 asyncio 主循环的解耦,并在服务端完成过期消息过滤。

flowchart LR subgraph 回调线程 A[Qwen SDK 回调] end subgraph 消息队列 B[MessageQueue
优先级排序] end subgraph asyncio 主循环 C[Consumer
过滤 + 发送] end subgraph 客户端 D[WebSocket] end A -->|put| B B -->|get| C C -->|有效消息| D C -.->|过期消息| E[丢弃]
组件 职责 线程模型
Producer SDK 回调事件入队 回调线程(同步)
MessageQueue 优先级排序、线程安全 共享队列
Consumer 出队、过滤、发送 asyncio 主循环
优先级机制

消息按优先级分为三级,确保控制指令优先处理:

优先级 类型 消息示例
HIGH (0) 控制指令 start, stop
MEDIUM (1) 状态变更 asr_done, response_done
LOW (2) 数据流 audio, text_delta

跨线程安全

SDK 回调在非 asyncio 线程中执行,使用 run_coroutine_threadsafe 安全入队:

# Producer (回调线程)
def on_audio_delta(self, event):
    asyncio.run_coroutine_threadsafe(
        self.message_queue.put(MessageType.AUDIO_DELTA, {"data": event.data}),
        self._loop
    )

# Consumer (asyncio 主循环)
async def _consumer_loop(self):
    while self._running:
        msg = await self.message_queue.get()
        await self._handle_message(msg)
服务端过滤

Consumer 通过 response_id 过滤过期消息,避免发送给客户端:

sequenceDiagram participant U as 用户 participant S as 服务器 participant Q as 消息队列 participant C as Consumer Note over S: response_id = "resp_1" Q->>C: audio (resp_1) C->>U: 发送音频 U->>S: start (turn_id=2) Note over S: response_id = "resp_2" S->>Q: 取消旧响应 Q->>C: audio (resp_1) Note over C: resp_1 ≠ resp_2 C--xC: 丢弃过期消息 Q->>C: audio (resp_2) C->>U: 发送新音频
async def _handle_audio_delta(self, data: dict):
    response_id = data.get("response_id")

    # 过滤过期响应
    if response_id != self._current_response_id:
        self._debug(f"[Filter] 丢弃过期音频: {response_id}")
        return

    # 有效消息,发送给客户端
    await self.send_audio(data["audio"])
高内聚模块设计

每个 Session 模块定义自己的消息类型和优先级映射,初始化时绑定:

# realtime_ptt/message_types.py
class MessageType(StrEnum):
    START = "start"
    STOP = "stop"
    AUDIO = "audio"
    QWEN_AUDIO_DELTA = "qwen_audio_delta"
    QWEN_RESPONSE_DONE = "qwen_response_done"

PRIORITY_MAP = {
    MessageType.START: MessagePriority.HIGH,
    MessageType.STOP: MessagePriority.HIGH,
    MessageType.QWEN_AUDIO_DELTA: MessagePriority.LOW,
}

# session.py
self.message_queue = MessageQueue(priority_map=PRIORITY_MAP)
await self.message_queue.put(MessageType.AUDIO, {"data": audio_data})
过滤位置 服务端(非前端)
网络开销 只发有效数据
线程安全 队列天然安全
模块独立 各模块自定义类型

模式对比

特性 半双工+Realtime 全双工+Realtime 半双工+Omni 半双工+Pipeline
延迟 最低 最低 中等 较高
用户转写显示 -
自定义音色 - - -
声音克隆 - - -
跨会话记忆
打断方式 手动 自动 手动 手动
推荐场景 通用首选 自然对话 需显示转写 个性化音色