Skip to content

练手项目 · AI 聊天助手

这是你的第一个 AI 应用闭环项目。目标是做一个具备基本产品体验和工程意识的聊天系统——能跑通完整链路,而不只是页面上能看到模型回复。

这个项目和前面几章的关系

读完前面的基础章节之后,你已经有了这些认知储备:

  • LLM 基础概念 讲了 Token、Context Window 和为什么模型不是数据库。这个项目里,上下文拼接和成本控制都会变得很具体。
  • Prompt 工程 讲了 System Prompt 和上下文工程。这个项目里你要亲手写第一个 System Prompt,并决定历史消息怎么传。
  • Structured Output 暂时不是第一版重点,但它会成为后面接工具和做稳定业务输出的基础。
  • Tool Calling 会在第二版项目里用到,到时你会把“模型提出工具调用,程序执行工具”这条链路跑通。

这个项目的核心价值不是"让模型回答问题",而是让你把前后端、Prompt 组织、多轮对话、流式输出这几个链路串成一个可以实际使用的系统。

项目目标

  • 跑通 前端输入 -> 后端调用模型 -> 返回结果 -> 页面展示
  • 支持多轮对话、流式输出、System Prompt 和基础错误处理
  • 为后续升级到工具调用版打基础

这个项目真正要验证的,不只是“能聊天”

很多人做第一个聊天助手项目时,会把目标理解成"页面上能看到模型回复"。这当然是起点,但远远不是这个项目最重要的收获。

这个项目真正要验证的是:当模型开始进入一个真实产品链路时,你能不能把最小但关键的系统能力组织起来:

  • 上下文怎么传
  • Prompt 谁来拼
  • 错误怎么暴露给用户
  • 流式输出怎么落到交互上
  • 对话变长之后,成本和状态怎么管理

换句话说,这个项目不是在验证模型会不会说话,而是在验证你能不能把"一次模型调用"包装成一个像样的产品闭环。

建议前置章节

推荐技术栈

  • 前端:React + Next.js 或 Vue + Vite
  • 后端:Python + FastAPI
  • 模型调用:任选一个可用的大模型 API

版本边界:先命令行,再网页闭环

这个项目分三层做,不要一上来把所有体验都塞进第一版。

命令行最小版先验证 API Key、messages 多轮上下文、System Prompt、错误处理和历史裁剪。它不解决产品体验,只回答一个问题:模型调用链路和对话状态是否真的跑通。

网页第一版再加入输入框、消息列表、Loading 状态、错误提示、清空上下文和可配置 System Prompt。这里可以先用普通请求返回完整答案,也可以直接使用 SSE。关键不是炫技,而是把“前端输入 -> 后端调用模型 -> 页面展示”这条链路做稳。

第二版增强再补流式输出体验、Markdown 渲染、token 级裁剪、温度或模型切换、工具调用过程展示。它们会让产品更像真实助手,但不应该掩盖第一版最重要的闭环验证。

这个边界很重要。问题一出现,你要能判断它出在前端状态、后端接口、Prompt、模型调用,还是上下文拼接,而不是被一堆增强功能搅在一起。

你会在这个项目里真正学到什么

  • 后端如何组织 prompt
  • 多轮上下文为什么会越来越贵
  • 流式输出为何要同时考虑前端体验和后端协议
  • 为什么错误处理和重试不能等项目后期再补

先跑起来:命令行最小版

在做前端、SSE 和完整后端之前,先用一个命令行脚本验证最小闭环。它没有页面,也没有数据库,但已经包含聊天助手最重要的东西:System Prompt、多轮 messages、退出指令、错误处理和历史裁剪。

环境要求

  • Python 3.9+
  • 一个可用的 OpenAI 或 OpenAI-compatible API Key
  • openai Python SDK

安装依赖:

bash
pip install openai

设置环境变量:

bash
export OPENAI_API_KEY="你的 API Key"
export OPENAI_MODEL="gpt-4o-mini"

# 如果使用 OpenAI 兼容服务,再设置:
# export OPENAI_BASE_URL="https://your-provider.example.com/v1"

把下面代码保存为 chat.py,然后运行 python chat.py

python
import os
from openai import OpenAI

SYSTEM_PROMPT = (
    "你是一个面向开发者的 AI 聊天助手。"
    "回答要直接、准确;如果信息不足,先说明不确定,再给出可验证的下一步。"
)
MAX_HISTORY_MESSAGES = 12


def build_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise RuntimeError("请先设置 OPENAI_API_KEY 环境变量")

    return OpenAI(
        api_key=api_key,
        base_url=os.getenv("OPENAI_BASE_URL") or None,
    )


def trim_history(messages: list[dict]) -> list[dict]:
    """保留 system prompt 和最近几轮对话,避免上下文无限增长。"""
    system_messages = [m for m in messages if m["role"] == "system"]
    chat_messages = [m for m in messages if m["role"] != "system"]
    return system_messages + chat_messages[-MAX_HISTORY_MESSAGES:]


def ask_model(client: OpenAI, messages: list[dict]) -> str:
    model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.7,
    )
    return response.choices[0].message.content or ""


def main() -> None:
    client = build_client()
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]

    print("AI 聊天助手已启动。输入 exit 或 quit 退出。")
    while True:
        user_input = input("\n你:").strip()
        if not user_input:
            continue
        if user_input.lower() in {"exit", "quit"}:
            print("已退出。")
            break

        messages.append({"role": "user", "content": user_input})
        messages = trim_history(messages)

        try:
            answer = ask_model(client, messages)
        except Exception as exc:
            messages.pop()
            print(f"助手:模型调用失败:{exc}")
            continue

        messages.append({"role": "assistant", "content": answer})
        print(f"助手:{answer}")


if __name__ == "__main__":
    main()

预期运行效果大概是这样:

text
AI 聊天助手已启动。输入 exit 或 quit 退出。

你:用一句话解释什么是 context window
助手:context window 是模型在一次请求中能看到的最大上下文范围,包括 system prompt、历史消息、用户输入和模型要生成的输出。

你:那为什么多轮对话会越来越贵?
助手:因为每一轮都要把保留下来的历史消息重新发给模型,历史越长,输入 token 越多,成本和延迟都会增加。

这个脚本能帮你确认三件事:API Key 能用,messages 多轮上下文能延续,历史裁剪没有把 system prompt 弄丢。确认这些没问题后,再加前端和流式输出才有意义。

流式输出扩展

如果想先在命令行里看到“逐字输出”的效果,可以把 ask_model 换成下面这个函数,并在主循环里调用它:

python
def stream_model(client: OpenAI, messages: list[dict]) -> str:
    model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
    stream = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.7,
        stream=True,
    )

    parts: list[str] = []
    print("助手:", end="", flush=True)
    for chunk in stream:
        delta = chunk.choices[0].delta.content
        if not delta:
            continue
        parts.append(delta)
        print(delta, end="", flush=True)

    print()
    return "".join(parts)

对应地,把主循环里的这一行:

python
answer = ask_model(client, messages)

改成:

python
answer = stream_model(client, messages)

命令行版跑通后,再看后面的 FastAPI 和前端代码会轻松很多。它们解决的是“怎么把同一个消息流放到网页里”,不是另一套完全不同的逻辑。

为什么聊天助手看起来最简单,实际上却是所有后续项目的底座

后面的 RAG、Agent、Research Agent,看上去复杂得多,但它们几乎都建立在这个项目的几个基本能力上:

  • 前后端消息如何流动
  • messages 历史如何组织
  • System Prompt 放在哪里
  • 流式输出如何落 UI
  • 错误和中断如何反馈给用户

如果这些基础没做扎实,后面接再多检索、工具和状态管理,系统也只会是在不稳的底座上继续加层。

后端 Prompt 组织:messages 数组是核心

多轮对话的关键不是"前端记录历史",而是每次请求都把完整的对话历史打包传给模型。模型本身不记忆任何状态,它只看当前这次请求里的 messages

一个典型的后端请求体:

python
messages = [
    {
        "role": "system",
        "content": "你是一个帮助用户解答编程问题的助手,回答要简洁明确。"
    },
    {
        "role": "user",
        "content": "Python 里怎么读取 JSON 文件?"
    },
    {
        "role": "assistant",
        "content": "使用 `json.load()` 读取文件对象,或 `json.loads()` 解析字符串。"
    },
    {
        "role": "user",
        "content": "如果文件不存在怎么处理?"
    }
]

模型读到这个 messages 数组,会理解这是一段连续对话,并基于上下文回答最后一条 user 消息。

注意:每轮对话后,你需要把模型返回的 assistant 消息追加进数组,下一轮再连同历史一起发出去。对话越长,这个数组越大,Token 成本和延迟也随之上升。这就是"历史消息裁剪"后续要解决的问题。

这里最值得建立的认知是:聊天系统的状态,本质上并不在模型里,而在你每次重新构造的 messages 数组里。

也就是说,多轮对话的连续性不是模型天然记住了什么,而是你的系统每轮都把该保留的信息重新带了回去。这个理解一旦建立起来,后面读 RAG、记忆、上下文压缩时会轻松很多。

流式输出方案

流式输出让用户能看到模型"逐字打字"的效果,而不是等全部生成完才显示。

下面先用一段短代码说明 SSE 的形状。完整可运行骨架在后面的 server.pystatic/index.html,那里会统一使用 JSON delta/api/chat/stream 路径。

后端(Python + FastAPI)使用 SSE:

python
from fastapi.responses import StreamingResponse

async def stream_chat(messages):
    async def generate():
        stream = await client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            stream=True,
        )
        async for chunk in stream:
            delta = chunk.choices[0].delta.content
            if delta:
                yield f"data: {delta}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

前端接收 SSE:

javascript
const eventSource = new EventSource('/api/chat/stream?message=hello');
eventSource.onmessage = (e) => {
    if (e.data === '[DONE]') {
        eventSource.close();
        return;
    }
    setMessage(prev => prev + e.data);
};

这套方案的好处是:前端只需监听事件,不需要轮询;后端逐块推送,延迟低,用户体验好。

从产品视角看,流式输出的意义也不只是"像打字机效果更酷"。它在本质上是在降低等待的不确定性。用户哪怕还没拿到完整答案,只要看到系统正在持续产出内容,就更容易理解系统还活着、还在工作,而不是卡死了。

最小目录建议

text
chat-app/
├── frontend/
├── server/
├── README.md
└── docs/
    └── 聊天应用原理说明.md

历史消息裁剪

随着对话进行,messages 数组会越来越大,每次请求都携带所有历史,成本和延迟都会线性增长。实际部署时必须处理这个问题。

常见的三种策略:

策略一:固定条数截断

最简单的做法。只保留最近 N 轮对话(比如最近 10 条消息),丢弃更早的历史:

python
MAX_HISTORY = 10  # 保留最近 10 条

def trim_messages(messages: list[dict]) -> list[dict]:
    system_messages = [m for m in messages if m["role"] == "system"]
    non_system = [m for m in messages if m["role"] != "system"]
    # 只保留最近 MAX_HISTORY 条非系统消息
    trimmed = non_system[-MAX_HISTORY:]
    return system_messages + trimmed

优点:实现简单;缺点:早期重要信息会被丢掉,用户可能发现模型"忘事"了。

策略二:按 token 数截断

更精确的做法是控制总 token 数,保留尽量多的近期历史,但不超过阈值:

python
import tiktoken

def count_tokens(messages: list[dict], model: str = "gpt-4o") -> int:
    enc = tiktoken.encoding_for_model(model)
    total = 0
    for m in messages:
        total += len(enc.encode(m.get("content", ""))) + 4  # 每条消息 overhead
    return total

def trim_to_token_limit(messages: list[dict], max_tokens: int = 3000) -> list[dict]:
    system_msgs = [m for m in messages if m["role"] == "system"]
    non_system = [m for m in messages if m["role"] != "system"]

    result = []
    for msg in reversed(non_system):  # 从最新的开始保留
        result.insert(0, msg)
        if count_tokens(system_msgs + result) > max_tokens:
            result.pop(0)
            break
    return system_msgs + result

策略三:摘要压缩

当历史太长时,用模型把早期对话压缩成一段摘要,用摘要替换原始历史。成本更低,但摘要是有损的,细节会丢失。适合长对话场景的第二版功能,不建议第一版就做。

这三种策略其实对应三种不同阶段的系统成熟度:

  • 固定条数截断:先解决"别无限长"
  • token 截断:开始精确管理成本和窗口
  • 摘要压缩:开始主动管理长任务记忆

对第一个项目来说,能把第一种做稳,已经足够有价值。别一开始就把问题复杂化。

最低验收标准

命令行最小版至少要做到:

  • 能读取环境变量里的 API Key 和模型名
  • 能连续问 3 轮,并让后续问题看到前面的上下文
  • 接口失败时能给出明确错误,而不是吞掉异常
  • 对话变长时有固定条数裁剪,且不会裁掉 System Prompt

网页第一版至少要做到:

  • 用户能在页面发送消息并看到模型回复
  • 连续发送 3 轮问题时,上下文能正确延续
  • 接口失败时,页面能明确提示,而不是静默卡住
  • 可以手动清空对话并重新开始
  • 对话超过 10 轮时,有机制防止 token 无限增长(至少实现策略一)

常见踩坑点

坑一:前端存历史,忘了同步给后端

多轮对话经常犯的错误:前端在界面上显示了对话历史,但后端每次调用时只发了最新一条消息。模型看不到上下文,会表现得"每次都不记得之前说了什么"。

坑二:流式输出没有结束标记

SSE 流式输出必须有明确的结束信号(如 [DONE]),否则前端不知道什么时候停止监听,容易出现 UI 卡住或重复追加内容的问题。

坑三:错误处理只在前端做

模型 API 会超时、会限流、会临时不可用。这些错误要在后端捕获并返回明确的错误信息,而不是让前端收到一个无法解读的空响应。

这个项目最容易做成“能演示,但不能用”

最常见的失败,不是页面做不出来,而是做出了一个只在自己电脑上、顺着理想路径才能跑通的 demo。

比如:

  • 连续问三轮就开始忘上下文
  • 接口一超时,页面就一直转圈
  • 历史消息越来越长,但系统没有任何裁剪
  • 前端能显示聊天记录,后端却没有真正使用它

这些问题看起来不如"模型回答错了"那么显眼,却更像真实产品和 demo 的分界线。

建议输出物

  • 一个可运行网页
  • 项目 README
  • 一份《聊天应用原理说明》(写清楚 messages 数组如何工作、你的裁剪策略是什么)

骨架代码参考

下面这份骨架不是生产级聊天系统,但它把最小闭环放在了正确的位置:后端负责组织 messages 和调用模型,前端只负责输入、展示和监听流式结果。

目录结构

text
chat-assistant/
├── server.py
├── llm_client.py
├── history.py
└── static/
    └── index.html

后端:server.py

python
import json
import os
from urllib.parse import unquote

from fastapi import FastAPI, Query
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles

from history import ChatHistoryStore
from llm_client import stream_chat_completion


app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")

histories = ChatHistoryStore(max_messages=10)


@app.get("/")
async def index():
    return FileResponse("static/index.html")


@app.get("/api/chat/stream")
async def chat_stream(
    message: str = Query(...),
    session_id: str = Query(default="default"),
):
    user_message = unquote(message).strip()
    histories.add_user_message(session_id, user_message)
    messages = histories.build_messages(session_id)

    async def generate():
        answer_parts: list[str] = []
        try:
            async for delta in stream_chat_completion(messages):
                answer_parts.append(delta)
                yield f"data: {json.dumps({'delta': delta})}\n\n"
            histories.add_assistant_message(session_id, "".join(answer_parts))
            yield "data: [DONE]\n\n"
        except Exception as exc:
            payload = {"error": f"模型调用失败:{exc}"}
            yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")


if __name__ == "__main__":
    import uvicorn

    port = int(os.getenv("PORT", "8000"))
    uvicorn.run(app, host="0.0.0.0", port=port)

这里用 GET + EventSource 是为了让前端保持最小。真实产品里,如果消息很长或需要传更多参数,可以换成 fetch 读取 ReadableStream,或者先用 POST 创建会话,再用 SSE 订阅结果。

后端:llm_client.py

python
import os
from collections.abc import AsyncIterator

from openai import AsyncOpenAI


client = AsyncOpenAI(
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url=os.getenv("OPENAI_BASE_URL") or None,
)


async def stream_chat_completion(messages: list[dict]) -> AsyncIterator[str]:
    model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")

    stream = await client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True,
        temperature=0.7,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta

OPENAI_BASE_URL 是可选项。使用 OpenAI 官方接口时不用设置;如果你用的是 OpenAI 兼容服务,就把它设成对应服务的 base url。

后端:history.py

python
SYSTEM_PROMPT = (
    "你是一个面向开发者的 AI 助手。回答要直接、准确;"
    "如果信息不足,先说明不确定,再给出可验证的下一步。"
)


class ChatHistoryStore:
    def __init__(self, max_messages: int = 10):
        self.max_messages = max_messages
        self._store: dict[str, list[dict]] = {}

    def add_user_message(self, session_id: str, content: str) -> None:
        self._add(session_id, "user", content)

    def add_assistant_message(self, session_id: str, content: str) -> None:
        self._add(session_id, "assistant", content)

    def build_messages(self, session_id: str) -> list[dict]:
        history = self._store.get(session_id, [])
        trimmed = history[-self.max_messages :]
        return [{"role": "system", "content": SYSTEM_PROMPT}] + trimmed

    def _add(self, session_id: str, role: str, content: str) -> None:
        self._store.setdefault(session_id, []).append(
            {"role": role, "content": content}
        )

这版只做固定条数裁剪,和前面"策略一"保持一致。system 消息不放进历史列表,而是在每次请求时重新拼进去,这样它不会被裁剪掉。

前端:static/index.html

html
<!doctype html>
<html lang="zh-CN">
  <head>
    <meta charset="UTF-8" />
    <title>AI 聊天助手</title>
    <style>
      body { font-family: system-ui, sans-serif; max-width: 760px; margin: 40px auto; }
      #messages { border: 1px solid #ddd; padding: 16px; min-height: 360px; white-space: pre-wrap; }
      .user { color: #0f766e; margin-top: 12px; }
      .assistant { color: #1f2937; margin-top: 12px; }
      form { display: flex; gap: 8px; margin-top: 16px; }
      input { flex: 1; padding: 10px; }
      button { padding: 10px 16px; }
    </style>
  </head>
  <body>
    <h1>AI 聊天助手</h1>
    <div id="messages"></div>
    <form id="form">
      <input id="input" autocomplete="off" placeholder="输入你的问题" />
      <button type="submit">发送</button>
    </form>

    <script>
      const form = document.querySelector("#form");
      const input = document.querySelector("#input");
      const messages = document.querySelector("#messages");
      const sessionId = crypto.randomUUID();

      function appendMessage(role, text = "") {
        const div = document.createElement("div");
        div.className = role;
        div.textContent = `${role === "user" ? "你" : "助手"}:${text}`;
        messages.appendChild(div);
        return div;
      }

      form.addEventListener("submit", (event) => {
        event.preventDefault();
        const text = input.value.trim();
        if (!text) return;

        appendMessage("user", text);
        input.value = "";
        const assistant = appendMessage("assistant");

        const params = new URLSearchParams({ message: text, session_id: sessionId });
        const source = new EventSource(`/api/chat/stream?${params.toString()}`);

        source.onmessage = (event) => {
          if (event.data === "[DONE]") {
            source.close();
            return;
          }
          const payload = JSON.parse(event.data);
          if (payload.error) {
            assistant.textContent += payload.error;
            source.close();
            return;
          }
          assistant.textContent += payload.delta;
        };

        source.onerror = () => {
          assistant.textContent += "\n连接中断,请稍后重试。";
          source.close();
        };
      });
    </script>
  </body>
</html>

本地运行

先安装依赖:

bash
pip install fastapi uvicorn openai

再设置环境变量:

bash
export OPENAI_API_KEY="你的 API Key"
export OPENAI_MODEL="gpt-4o-mini"
# 如果使用 OpenAI 兼容服务,再设置:
# export OPENAI_BASE_URL="https://your-provider.example.com/v1"

启动服务:

bash
python server.py

打开 http://localhost:8000,发送一条消息,能看到助手逐步输出,就说明前后端链路已经跑通。

这份骨架做了哪些简化

它把历史存在内存里,服务重启后对话会丢;session_id 也只是浏览器临时生成的,没有登录态和鉴权。它用固定条数裁剪控制上下文,没有做 token 级裁剪和摘要压缩。

这些简化是刻意的。第一版先验证消息流、流式输出和历史拼接,等链路稳定后,再考虑数据库、用户系统、Markdown 渲染、取消生成、重试和更精细的上下文管理。

升级路线

  1. 先做纯聊天版(多轮 + 流式 + 裁剪)
  2. 再加入 Tool Calling(让模型能查天气、查时间等实时数据)
  3. 最后把它升级成支持 RAG 的问答助手(基于你上传的资料回答)

回头看:哪些事值得早点想清楚

第一版先追求闭环稳定,功能可以少,但链路必须完整。

messages 数组就是你的对话状态。状态设计得清不清楚,直接决定后面能不能进化成更复杂的系统。很多人一开始不在意这个,等到接工具、接 RAG 的时候才发现状态管理一团乱。

流式输出在真实产品里承担着降低等待焦虑的作用,上线之后你会发现用户对它的感知远比你预期的强。裁剪策略也一样——越早有,系统越不容易在后面被长对话拖垮。

真正好的第一个项目,做完之后你回头看,会发现它已经隐约具备了后续接工具、接 RAG、接 Agent 的骨架。这比堆功能重要得多。

复盘问题

  1. 你现在的上下文拼接是否清晰可控?把一次请求里的 messages 数组打印出来看看。
  2. 如果对话越来越长,你的裁剪策略是什么?有没有测试过裁剪后模型是否还能正常延续上下文?
  3. 如果模型输出不稳定,你打算靠 Prompt、结构化输出还是工具来改善?
  4. 接口限流或超时时,你的用户体验是什么?

面向开发者系统学习 AI 应用开发、RAG、Agent 与 Vibe Coding。