Skip to content

流式输出与 SSE

大模型 API 默认是流式返回的——token 一个接一个地输出,而不是等全部生成完再返回。对用户来说这意味着更快的第一字节响应;对工程师来说,前后端都需要专门处理流式数据。这篇讲清楚 SSE 的工作原理、前端如何消费流、后端如何透传流,以及常见的工程陷阱。

为什么要流式输出

非流式请求的体验是:发出请求,等待,等待,等待,然后页面上突然出现完整回答。对短回复来说还能接受,但当模型需要几秒甚至十几秒生成一段回答时,这段空白等待时间会让用户觉得系统卡死了。

流式输出把问题反过来解决:第一个 token 生成出来,就立刻推给用户。用户会看到文字在持续出现,像有人在打字。哪怕总时间没变,这种"系统在工作"的视觉反馈会让等待感下降很多。

更实际的好处是延迟感知。非流式场景里,用户的体验延迟约等于整个生成过程的耗时。流式场景里,体验延迟约等于第一个 token 到达的时间(也就是 TTFT,Time to First Token),后续的内容可以边看边等。两者的实际耗时可能相同,但用户感知完全不同。

AI 聊天助手项目里要求实现流式输出的原因就在这里。一旦你的对话不流式,和竞品放在一起比,体验差距会非常明显。

SSE 是什么

SSE(Server-Sent Events)是浏览器原生支持的服务端推送协议,基于普通 HTTP 连接,服务端可以持续往客户端推送事件,直到连接关闭。

和 WebSocket 相比,SSE 是单向的(只能服务端推客户端),不需要额外握手协议,复用普通 HTTP,对服务端的改造成本低。大多数 AI 应用只需要"服务端向前端推送生成内容",不需要双向通信,SSE 是更轻量的选择。

SSE 的数据格式很简单:

data: 一段内容\n\n
data: 另一段内容\n\n
data: [DONE]\n\n

每个事件以 data: 开头,以两个换行结尾。服务端一直推,客户端一直读,直到连接关闭或收到约定的结束标记。

大模型 API(包括 OpenAI、Anthropic 等)的流式输出就遵循这个格式,每次推送一个 token 或若干个 token,最后发一个 [DONE] 表示生成结束。

后端:如何透传模型流

以 Python + FastAPI 为例。核心思路是:模型 API 开启 stream=True,得到一个异步生成器,后端用 StreamingResponse 把它转发给前端。

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/api/chat")
async def chat(request: dict):
    messages = request.get("messages", [])

    async def generate():
        async for chunk in await client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            stream=True
        ):
            delta = chunk.choices[0].delta.content
            if delta is not None:
                # 按 SSE 格式推送每个 token
                yield f"data: {delta}\n\n"

        # 发送结束标记
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"  # 关闭 Nginx 缓冲,否则可能积压内容
        }
    )

几个细节值得注意:

delta 可能是 None(比如第一个 chunk 只包含 role 信息,没有内容),不过滤会把 None 字符串发给前端。

响应头里的 X-Accel-Buffering: no 是针对 Nginx 的。Nginx 默认会缓冲响应,流式输出的场景下这个缓冲会让内容积压,用户看到的效果变成"一段一段蹦出来"而不是逐字出现。本地开发没有 Nginx 看不出来,部署后才发现。

Cache-Control: no-cache 防止代理层缓存 SSE 连接,是常规的流式响应配置。

前端:如何消费 SSE

浏览器有原生的 EventSource API,但它只支持 GET 请求且不能带请求体,不适合发送聊天历史。实际项目通常用 fetch + ReadableStream 来消费 SSE。

javascript
async function streamChat(messages, onToken, onDone) {
    const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages })
    });

    if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });

        // 按换行拆分,处理每个事件
        const lines = buffer.split('\n');
        buffer = lines.pop(); // 最后一行可能不完整,留到下次

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6).trim();
                if (data === '[DONE]') {
                    onDone();
                    return;
                }
                if (data) {
                    onToken(data);
                }
            }
        }
    }
}

// 使用示例
let assistantMessage = '';

await streamChat(
    messages,
    (token) => {
        assistantMessage += token;
        setDisplayText(assistantMessage); // React state 更新
    },
    () => {
        console.log('流式输出完成');
        // 把完整回复追加进 messages 历史
        setMessages(prev => [...prev, {
            role: 'assistant',
            content: assistantMessage
        }]);
    }
);

buffer 的处理是这段代码里最容易出错的地方。ReadableStream 读到的数据块不一定按事件边界对齐,一次 read() 可能拿到半个事件,也可能拿到多个事件。把最后一行留在 buffer 里留到下次处理,是正确处理流式数据的标准做法。

还有一个很实际的点:onDone 回调里才是把完整 assistant 消息追加进对话历史的时机。如果在流式过程中就追加,会导致历史消息不完整,下一轮对话时模型看到的是截断的回答。

错误处理

流式请求的错误处理比普通请求更麻烦,因为错误可能发生在连接建立之前,也可能发生在推送过程中。

连接建立之前的错误(HTTP 4xx/5xx),可以在 fetch 返回后立刻检查 response.ok,这部分和普通请求一样。

推送过程中的错误,这是流式特有的场景。模型 API 可能在推了几个 token 之后超时,或者中途报错。这时候 reader.read() 会抛出异常,要在 while 循环外加 try/catch 处理:

javascript
async function streamChat(messages, onToken, onDone, onError) {
    try {
        const response = await fetch('/api/chat', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ messages })
        });

        if (!response.ok) {
            onError(new Error(`服务器返回 ${response.status}`));
            return;
        }

        const reader = response.body.getReader();
        // ...(同上)
    } catch (err) {
        // 网络中断、读取失败等
        onError(err);
    }
}

后端也需要考虑错误中间层的问题。如果模型 API 调用失败,不能让后端直接崩掉,要在 generate() 里捕获异常,可以推送一个错误事件通知前端:

python
async def generate():
    try:
        async for chunk in await client.chat.completions.create(..., stream=True):
            # ...
    except Exception as e:
        yield f"data: [ERROR] {str(e)}\n\n"

前端收到 [ERROR] 前缀的数据时,终止流并展示错误提示。这比让用户看着一个没有任何反应的页面要好得多。

用户中断

用户经常会在模型还没生成完的时候就点停止。这时候需要主动关闭请求。

javascript
let abortController = null;

function startStream(messages) {
    abortController = new AbortController();

    fetch('/api/chat', {
        method: 'POST',
        signal: abortController.signal,
        // ...
    });
}

function stopStream() {
    if (abortController) {
        abortController.abort();
        abortController = null;
    }
}

后端收到中断信号时,generate() 里的异步循环会因为客户端断开而停止,不会继续消耗模型 API 的 token。正确处理中断,可以节省成本,也能提升体验。

和聊天项目的对接

AI 聊天助手项目第二版里要求实现流式输出,这一章的内容直接对应那里的工程实现。

聊天项目里有两个细节要和流式输出配合:

流式过程中不应该允许再次发送请求。 用户如果在流式输出还在进行时再次提交,要么禁用输入框,要么先中断当前流再开始新请求,不能同时跑两个并发流。

Markdown 渲染要在流式过程中实时更新。 一种常见做法是用 remark/marked 这类库,在每次 onToken 触发时重新渲染全量 Markdown,而不是把 token 直接拼接成纯文本。如果库很重,也可以流式阶段显示纯文本,完成后再切换成 Markdown 渲染结果。

几个工程陷阱

前端没有结束标记处理。 如果前端不识别 [DONE] 或者没有在 reader.read() 返回 done: true 时退出循环,会一直等待,连接不会正常关闭。表现是输出完成后页面 loading 状态一直在转。

Nginx 缓冲未关闭。 前面提到的 X-Accel-Buffering: no,如果部署环境有反向代理,这个配置不加就会有积压效果,用户看到的不是逐字输出,而是积累一段时间后突然刷出一大段文字。

SSE 在 HTTP/2 下的行为变化。 HTTP/2 的多路复用会影响 SSE 的推送节奏,某些场景下推送延迟比 HTTP/1.1 更高。这个通常不是你需要第一天解决的问题,但上线后如果发现流式体验反常,可以从这里排查。

边缘场景:连续快速发消息。 如果用户在上一条流还没结束时就发了新消息,没有中断上一个流的处理,会导致两个流的内容混合追加到同一个 assistantMessage 字符串里。状态管理要对这个情况做防护。


下一步可以用这一章的内容直接补充聊天助手第二版,或者继续进入 Embedding 与向量检索——后面做 RAG 问答时,生成阶段同样会用到流式输出。

面向开发者系统学习 AI 应用开发、RAG、Agent 与 Vibe Coding。