了解不同模式的技术架构,选择适合的方案,或快速接入你的客户端
本系统支持两种交互方式和三种技术链路的灵活组合:
| 交互方式 | 说明 | 适用场景 |
|---|---|---|
| 半双工(按住说话) | 按住麦克风录音,松开触发 AI | 嘈杂环境、精确控制 |
| 全双工(自由对话) | 持续监听,自动检测说话 | 自然对话、免提 |
| 技术链路 | 说明 | 特点 |
|---|---|---|
| Realtime 端到端 | Qwen Realtime API | 延迟最低,架构最简 |
| ASR + Qwen-Omni | 语音识别 + Omni 模型 | 可显示转写文字 |
| ASR + LLM + TTS | 三段式处理 | 支持自定义/克隆音色 |
按住说话,松开触发 AI。一个模型处理 ASR + 对话 + TTS,延迟最低。
const ws = new WebSocket(
`ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=realtime`
);
持续发送音频,服务端 VAD 自动检测说话,支持自然打断。
const ws = new WebSocket(
`ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=full_duplex&tech=realtime`
);
独立 ASR 进行语音识别,可实时显示用户说话内容。支持跨会话记忆。
const ws = new WebSocket(
`ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=omni`
);
三段式处理,每个环节独立可替换。支持自定义音色和声音克隆。
const ws = new WebSocket(
`ws://localhost:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=pipeline&voice_id=${voiceId}`
);
# 上传音频克隆声音 (10-20秒)
curl -X POST http://localhost:8089/api/voice-clone \
-F "api_key=sk-xxx" \
-F "audio=@voice.wav" \
-F "name=my-voice"
# 返回
{"success": true, "voice_id": "cosyvoice-clone-xxx"}
const ws = new WebSocket(
`ws://your-server:8089/ws?client_id=${uuid}&api_key=${apiKey}&interaction=half_duplex&tech=realtime`
);
ws.onopen = () => console.log('连接成功');
ws.onmessage = (e) => handleMessage(JSON.parse(e.data));
// 开始录音
ws.send(JSON.stringify({ type: 'start', turn_id: generateUUID() }));
// 发送音频 (Opus, 16kHz, 32kbps, Base64)
ws.send(JSON.stringify({ type: 'audio', turn_id, data: base64Opus }));
// 停止录音
ws.send(JSON.stringify({ type: 'stop', turn_id }));
function handleMessage(msg) {
switch (msg.type) {
case 'audio':
// Opus 24kHz 32kbps,需解码后播放
playOpusAudio(base64Decode(msg.data));
break;
case 'transcript':
displayText(msg.text);
break;
case 'response_done':
onComplete(msg.interrupted);
break;
case 'error':
console.error(msg.message);
break;
}
}
| 类型 | 半双工 | 全双工 | 说明 |
|---|---|---|---|
start | ✓ | - | 开始录音,需带 turn_id |
audio | ✓ | ✓ | 音频数据 (Base64) |
stop | ✓ | - | 停止录音,触发 AI |
text | ✓ | ✓ | 文字消息 |
interrupt | - | ✓ | 手动打断 |
| 类型 | 说明 |
|---|---|
connected | 连接成功 |
interrupted | 打断确认(半双工) |
vad_start / vad_end | VAD 状态(全双工) |
user_transcript_delta | 用户语音转写(流式) |
user_transcript_done | 用户语音转写完成 |
ai_thinking | AI 开始处理 |
transcript | AI 文本响应(流式) |
audio | AI 音频响应(流式) |
response_done | AI 响应完成 |
error | 错误信息 |
基于滑动窗口 + LLM 压缩总结的记忆方案,兼顾短期高保真和长期记忆,核心原则是异步不阻塞。
传统 RAG 记忆需要先知道用户问什么才能检索相关记忆,但端到端语音场景下,建立连接时还不知道用户会说什么。Digest 方案通过时序保留 + 压缩总结解决这个问题:
所有记忆操作都不会阻塞对话流程:
add_turn() 同步立即返回,对话不等待asyncio.create_task() 后台执行// MySQL 存储结构
{
client_id: "uuid", // 用户标识
summary: "用户喜欢...", // LLM 压缩后的长期记忆
pending_turns: [...], // 待总结的对话列表
recent_turns: [...] // 最近高保真对话
}
基于向量检索的记忆方案,适合需要语义搜索的场景。
// 使用 Digest(默认)
ws://localhost:8089/ws?...&memory=digest
// 使用 mem0
ws://localhost:8089/ws?...&memory=mem0
在语音对话场景中,存在两个核心挑战:
传统方案依赖前端通过 turn_id 过滤,但这造成了不必要的网络传输和前端处理负担。
通过优先级消息队列实现 SDK 回调线程与 asyncio 主循环的解耦,并在服务端完成过期消息过滤。
| 组件 | 职责 | 线程模型 |
|---|---|---|
| Producer | SDK 回调事件入队 | 回调线程(同步) |
| MessageQueue | 优先级排序、线程安全 | 共享队列 |
| Consumer | 出队、过滤、发送 | asyncio 主循环 |
消息按优先级分为三级,确保控制指令优先处理:
| 优先级 | 类型 | 消息示例 |
|---|---|---|
| HIGH (0) | 控制指令 | start, stop |
| MEDIUM (1) | 状态变更 | asr_done, response_done |
| LOW (2) | 数据流 | audio, text_delta |
SDK 回调在非 asyncio 线程中执行,使用 run_coroutine_threadsafe 安全入队:
# Producer (回调线程)
def on_audio_delta(self, event):
asyncio.run_coroutine_threadsafe(
self.message_queue.put(MessageType.AUDIO_DELTA, {"data": event.data}),
self._loop
)
# Consumer (asyncio 主循环)
async def _consumer_loop(self):
while self._running:
msg = await self.message_queue.get()
await self._handle_message(msg)
Consumer 通过 response_id 过滤过期消息,避免发送给客户端:
async def _handle_audio_delta(self, data: dict):
response_id = data.get("response_id")
# 过滤过期响应
if response_id != self._current_response_id:
self._debug(f"[Filter] 丢弃过期音频: {response_id}")
return
# 有效消息,发送给客户端
await self.send_audio(data["audio"])
每个 Session 模块定义自己的消息类型和优先级映射,初始化时绑定:
# realtime_ptt/message_types.py
class MessageType(StrEnum):
START = "start"
STOP = "stop"
AUDIO = "audio"
QWEN_AUDIO_DELTA = "qwen_audio_delta"
QWEN_RESPONSE_DONE = "qwen_response_done"
PRIORITY_MAP = {
MessageType.START: MessagePriority.HIGH,
MessageType.STOP: MessagePriority.HIGH,
MessageType.QWEN_AUDIO_DELTA: MessagePriority.LOW,
}
# session.py
self.message_queue = MessageQueue(priority_map=PRIORITY_MAP)
await self.message_queue.put(MessageType.AUDIO, {"data": audio_data})
| 特性 | 半双工+Realtime | 全双工+Realtime | 半双工+Omni | 半双工+Pipeline |
|---|---|---|---|---|
| 延迟 | 最低 | 最低 | 中等 | 较高 |
| 用户转写显示 | - | ✓ | ✓ | ✓ |
| 自定义音色 | - | - | - | ✓ |
| 声音克隆 | - | - | - | ✓ |
| 跨会话记忆 | ✓ | ✓ | ✓ | ✓ |
| 打断方式 | 手动 | 自动 | 手动 | 手动 |
| 推荐场景 | 通用首选 | 自然对话 | 需显示转写 | 个性化音色 |