实现一个RAG系统,构建专属客服

导入

RAG(Retrieval-Augmented Generation,检索增强生成) 是一种将信息检索(Information Retrieval)与文本生成(Text Generation)相结合的架构,通过动态检索外部知识库,将相关文档作为上下文输入给 LLM,从而生成更准确、可靠、可追溯的回答。

简单来说,就是将自己专属的技术文档,比如公司产品介绍,产品技术文档等汇集在一起,然后当用户对公司以及产品进行提问时,大语言模型可以从这些技术文档中找到最准确的答案来回答你,这相当于一个精通公司所有产品技术细节的专属客服在不停歇的回答客户的问题,且答案要比人类更精准,并且整个过程无需对大模型本身做任何修改,因为本质上是一种提示词工程。当然公司专属客服只是一个例子,任何的个人领域中,都可以利用RAG制作属于你的专属客服。

流程:

  • 分片 → 将知识库文档切分为语义完整的片段,大模型之后给出的答案就是从这些片段中获取

  • 索引 → 将文本片段向量化并构建高效检索结构,利用向量数据库,将片段向量化之后存储在向量数据库中

  • 召回 → 根据用户问题检索 Top-K 相关片段,通过计算向量相似度的方式

  • 重排 → 对召回结果按相关性精细排序,进一步选出比如top-3之类的更少的片段,然后把这个片段传入大模型

  • 生成 → 将重排后的上下文 + 问题输入 LLM 生成答案

具体解析

下面对RAG的流程进行更详细的解析:

分片

因为个人或者公司的技术文档可能数量和大小很庞大,因此必须切割成一个个小片段,这些小片段也是之后输入大模型的内容。主要有几种分片方式,比如按句子分段,按段落分段,滑动窗口分段,或者按照语义分段,不过这个就复杂了一点,实际按段落或者句子效果都是不错的,下面自己实现的RAG中就要求按句子生成,为什么是要求,因为代码全部由AI生成,几乎没什么修改,你只需要宏观提要求即可。

索引

将文本片段转换为向量表示,并构建高效检索结构。核心的组件由两个:

  • 嵌入模型(Embedding Model):将文本映射为稠密向量
  • 向量索引(Vector Index):支持近似最近邻搜索(ANN)

文本的向量化表示,传统方式词袋模型,word2vec之类,现代直接使用大预言嵌入模型集了,因为是中文,所以使用了bge-small-zh-v1.5模型。至于向量的索引,我要求使用了Facebook以前发布的一个库,叫FAISS,是一个高效的相似度搜索库。

召回

召回需要根据向量相似度搜索向量数据库,当然简单的例子中不需要使用数据库,直接保存到内存即可。这里相似度计算的要求是快速,对精确度要求低一些,主要进行第一轮的筛选,选出top-k,比如说top-10,也就是10个最相似的片段,并且按相似度排序,此时相似度排序高的不一定就是更相似,还是那句话,因为这里相似度计算比较简单。具体怎么计算呢?一般要么L2近似,要么余弦近似就可以,这里代码中要求使用余弦近似,也就是向量归一化之后计算内积,就是余弦近似度,内积使用FAISS库的IndexFlatIP即可。

重排

重排阶段需要在召回的top-k中进一步筛选,比如筛选出top-3,那么这三个片段就会和问题一起送到大模型,让大模型参考这三个片段进行问题的回复。这里也要计算向量相似度,但是就不是召回中那么简单了,而是使用Cross-Encoder计算,而召回中其实可以算是Bi-EncoderCross-Encoder一般要借助模型,而不是简单计算了,中文模型使用BGE-reranker-base效果就还可以了

生成

将重排后的上下文 + 用户问题输入 LLM,生成准确、流畅、符合指令的回答,前面说了RAG也属于提示词工程,主要就在这里体现,我们需要使用类似这样的提示词:

你是一个专业助手,请严格根据以下上下文回答问题。
如果上下文没有相关信息,请回答“根据提供的资料无法回答”。

上下文:
{context}

问题:
{query}

回答:

其中问题就是问题,上下文就是RAG重排后选出的片段,可以发现,在LLM的视角中,它是不知道RAG是什么,也不关心它是怎么实现的,在它眼中就是这一段提示词,只不过只有我们才知道,这个提示词包含了RAG整套流程走下来的结果。

完整代码

下面是AI生成的完整代码:

# -*- coding: utf-8 -*-
"""
交互式中文 RAG 系统(支持多次问答 + 流式输出)
作者:AI助手
功能:
  - 加载本地文档(.txt / .json)
  - 智能中文分片(句子边界 + 滑动窗口)
  - BGE 嵌入 + FAISS 索引
  - BGE 重排
  - Gemini 流式生成(支持 1.5-flash 回退)
  - 交互式 CLI:连续问答,输入 quit/exit 退出
"""
import re
import json
import sys
from typing import List, Tuple, Generator

import faiss
import numpy as np

from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import google.generativeai as genai

# ======================
# 1. 配置 Google API Key
# ======================
GOOGLE_API_KEY = "your key"  # ←←← 请在此处填写你的 API Key
genai.configure(api_key=GOOGLE_API_KEY)

if GOOGLE_API_KEY == "YOUR_GOOGLE_API_KEY":
    print("❌ 请先在代码中设置你的 Google API Key!")
    sys.exit(1)

# ======================
# 2. 文档加载与中文智能分片
# ======================


def load_documents(file_path: str) -> str:
    """加载 .txt 或 .json 文档"""
    with open(file_path, 'r', encoding='utf-8') as f:
        if file_path.endswith('.json'):
            data = json.load(f)
            if isinstance(data, list):
                text = "\n".join([item.get("text", "") for item in data])
            else:
                text = data.get("text", "")
        else:
            text = f.read()
    return text


def chinese_sentence_split(text: str) -> List[str]:
    """使用标点符号分割中文句子"""
    sentence_endings = re.compile(r'[。!?.!?]+')
    sentences = [s.strip() for s in sentence_endings.split(text) if s.strip()]
    return sentences


def smart_chunking(text: str, chunk_size: int = 512, overlap: int = 64) -> List[str]:
    """基于句子的智能分片,避免词语断裂"""
    sentences = chinese_sentence_split(text)
    chunks = []
    current = ""
    for sent in sentences:
        if len(current) + len(sent) <= chunk_size:
            current += sent + "。"
        else:
            if current:
                chunks.append(current)
            current = sent + "。"
            if len(current) > chunk_size:
                chunks.append(current[:chunk_size])
                current = current[chunk_size:]
    if current:
        chunks.append(current)
    
    # 添加滑动窗口重叠
    if overlap > 0 and len(chunks) > 1:
        overlapped = []
        for i in range(len(chunks)):
            if i == 0:
                overlapped.append(chunks[i])
            else:
                prev_tail = chunks[i-1][-overlap:] if len(chunks[i-1]) > overlap else chunks[i-1]
                overlapped.append(prev_tail + chunks[i])
        return overlapped
    return chunks

# ======================
# 3. 向量存储(BGE 中文嵌入)
# ======================


class VectorStore:
    def __init__(self):
        print("📥 正在加载 BGE 嵌入模型 (BAAI/bge-small-zh-v1.5)...")
        self.embedding_model = SentenceTransformer('BAAI/bge-small-zh-v1.5')
        self.index = None
        self.chunks = []

    def add_documents(self, documents: List[str]):
        self.chunks = documents
        print(f"🧠 正在为 {len(documents)} 个文本块生成嵌入向量...")
        embeddings = self.embedding_model.encode(
            documents,
            batch_size=32,
            show_progress_bar=True,
            normalize_embeddings=True
        )
        embeddings = np.array(embeddings).astype('float32')
        faiss.normalize_L2(embeddings)
        dim = embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dim)
        self.index.add(embeddings)
        print("✅ 向量索引构建完成!")

    def search(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]:
        query = f"为这个句子生成表示以用于检索相关文章: {query}"
        query_vec = self.embedding_model.encode([query], normalize_embeddings=True).astype('float32')
        faiss.normalize_L2(query_vec)
        scores, indices = self.index.search(query_vec, top_k)
        return [(self.chunks[i], float(scores[0][j])) for j, i in enumerate(indices[0])]

# ======================
# 4. 重排器(BGE Reranker)
# ======================


class ReRanker:
    def __init__(self):
        print("📥 正在加载 BGE 重排模型 (BAAI/bge-reranker-base)...")
        self.tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-reranker-base')
        self.model = AutoModelForSequenceClassification.from_pretrained('BAAI/bge-reranker-base')
        self.model.eval()
        print("✅ 重排模型加载完成!")

    def rerank(self, query: str, candidates: List[str], top_k: int = 3) -> List[str]:
        pairs = [[query, doc] for doc in candidates]
        with torch.no_grad():
            inputs = self.tokenizer(pairs, padding=True, truncation=True, return_tensors='pt', max_length=512)
            scores = self.model(**inputs).logits.view(-1).sigmoid()
        idx = torch.argsort(scores, descending=True)[:top_k]
        return [candidates[i] for i in idx]

# ======================
# 5. 流式生成答案(Gemini)
# ======================


def generate_answer_stream(query: str, retrieved_docs: List[str]) -> Generator[str, None, None]:
    context = "\n\n".join(retrieved_docs)
    prompt = f"""你是一个专业的中文助手,请严格根据以下上下文回答问题。如果上下文没有相关信息,请回答“根据提供的资料无法回答”。

上下文:
{context}

问题:
{query}

回答:"""

    model_name = 'gemini-1.5-flash'
    try:
        model = genai.GenerativeModel(model_name)
        response_stream = model.generate_content(prompt, stream=True)
        for chunk in response_stream:
            if chunk.text:
                yield chunk.text
        return  # 成功则退出
    except Exception as e:
        print(f"⚠️ 模型 {model_name} 调用失败,尝试下一个: {e}")


# ======================
# 6. RAG 系统主类
# ======================


class RAGSystem:
    def __init__(self, doc_path: str):
        print(f"📂 正在加载文档: {doc_path}")
        raw_text = load_documents(doc_path)
        self.chunks = smart_chunking(raw_text, chunk_size=512, overlap=64)
        print(f"✂️  文档已分片为 {len(self.chunks)} 个块")

        self.vector_store = VectorStore()
        self.vector_store.add_documents(self.chunks)

        self.reranker = ReRanker()

    def get_answer_stream(self, question: str, top_k_retrieve: int = 10, top_k_rerank: int = 3) -> Generator[str, None, None]:
        # 1. 召回
        retrieved = self.vector_store.search(question, top_k=top_k_retrieve)
        retrieved_texts = [text for text, _ in retrieved]
        # 2. 重排
        reranked_docs = self.reranker.rerank(question, retrieved_texts, top_k=top_k_rerank)
        # 3. 流式生成
        yield from generate_answer_stream(question, reranked_docs)

# ======================
# 7. 交互式主循环
# ======================


def main():
    # 创建示例文档(若不存在)
    DOC_PATH = "knowledge_base.txt"
    # 初始化 RAG 系统
    print("\n🚀 正在初始化 RAG 系统...\n")
    rag = RAGSystem(DOC_PATH)

    # 交互循环
    print("\n💬 RAG 问答系统已启动!")
    print("👉 输入你的问题(输入 'quit' 或 'exit' 退出)\n")

    while True:
        try:
            question = input("❓ 你的问题: ").strip()
            if not question:
                continue
            if question.lower() in ['quit', 'exit', '退出']:
                print("👋 再见!")
                break

            print("🤖 回答: ", end="", flush=True)
            for token in rag.get_answer_stream(question):
                print(token, end="", flush=True)
            print("\n")  # 换行

        except KeyboardInterrupt:
            print("\n\n👋 用户中断,再见!")
            break
        except Exception as e:
            print(f"\n❌ 系统错误: {e}\n")


if __name__ == "__main__":
    main()


大模型的选择是自由的,想用什么都可以,如果是其他模型,可能需要使用openai库,这里我选择的是google的gemini-1.5-flash模型,所以使用google库。除了模型外,只需要修改DOC_PATH指向你的技术文档即可,这里只针对单个文档,如果想要支持多个文档,让AI帮你改一下就好,同样,如果是生产中,还需要使用向量数据库。

我这里的文档,同样使用的是AI帮我生成的一段武侠小说剧情描写,简单几百个字:

朔风如刀,卷着鹅毛大雪,将整座“落雁镇”裹进一片苍茫。镇东头的“醉仙楼”里,炭火将熄,酒气混着寒气,凝成一层薄雾。

角落一张旧木桌旁,坐着个青衫男子。他面容清癯,眼神却沉静如深潭,指节分明的手正轻轻摩挲着一只粗陶酒碗。他叫萧寒,江湖人称“断水剑”——只因他出剑时,连奔流的江水也能一斩而断。

这时,门被猛地推开,寒风裹着一个黑衣人闯了进来。那人脸上戴着半张铁面具,只露出紧抿的薄唇和一双锐利如鹰的眼睛。他径直走到萧寒对面坐下,声音低沉:“萧寒,交出《寒江剑谱》,饶你不死。”

萧寒抬眼,目光平静:“剑谱不在我身上。”

“少废话!”黑衣人一掌拍在桌上,酒碗震得嗡嗡作响,“三日前,你从‘寒江派’总坛带出此物,我亲眼所见。你若不交,今日便葬身此地。”

萧寒沉默片刻,忽然问:“你是‘铁面门’的人?还是‘血刀堂’的?”

黑衣人冷笑:“你不必知道。只需明白,你逃不掉。”

话音未落,萧寒袖中寒光一闪!一柄薄如蝉翼的软剑已抵在黑衣人咽喉。这正是他的成名绝技——“袖里青蛇”,快得连影子都追不上。

黑衣人竟不闪避,反而低笑一声:“果然名不虚传。”他右手猛地探出,五指如钩,竟精准扣住剑脊!指上戴着精钢指套,与剑刃摩擦出刺耳声响。

两人僵持片刻,萧寒忽然手腕一抖,剑尖如灵蛇吐信,绕开指套,直刺对方心口。黑衣人急退,衣襟却被剑气划开一道口子,露出内里绣着暗红火焰的黑袍。

“血刀堂?”萧寒眼神一凝。

黑衣人不再多言,抽出腰间一柄弯刀,刀身泛着血光。两人在狭小的酒楼内交起手来,刀光剑影搅得酒气翻腾,桌椅碎裂声不绝于耳。

激战正酣,萧寒忽然跃向窗边,反手将半截断剑掷向身后酒坛。酒液泼洒,他借势冲入风雪,只留下一句:“剑谱早已焚毁,你们争的,不过是一场空!”

黑衣人立在窗前,望着雪中远去的背影,铁面具下传来一声轻叹。他弯腰拾起地上一片被剑气削落的衣角,上面隐约可见“寒江”二字。

风雪更急了,醉仙楼内,只剩一地狼藉,和半坛未饮尽的浊酒。

运行效果

🚀 正在初始化 RAG 系统...

📂 正在加载文档: knowledge_base.txt
✂️  文档已分片为 2 个块
📥 正在加载 BGE 嵌入模型 (BAAI/bge-small-zh-v1.5)...
🧠 正在为 2 个文本块生成嵌入向量...
Batches: 100%|███████████████████████████████████████████████████████| 1/1 [00:00<00:00,  4.10it/s]
✅ 向量索引构建完成!
📥 正在加载 BGE 重排模型 (BAAI/bge-reranker-base)...
✅ 重排模型加载完成!

💬 RAG 问答系统已启动!
👉 输入你的问题(输入 'quit' 或 'exit' 退出)

❓ 你的问题: 萧寒的外号“断水剑”因何得名?
🤖 回答: 萧寒的外号“断水剑”得名,是因为他出剑时,连奔流的江水也能一斩而断。

❓ 你的问题: 黑衣人属于哪个门派?
🤖 回答: 黑衣人属于“血刀堂”。

❓ 你的问题: 《寒江剑谱》真的被萧寒毁掉了吗?
🤖 回答: 根据提供的资料,萧寒声称“剑谱早已焚毁”。资料中没有其他信息可以证实或证伪这一说法,也没有明
确提及《寒江剑谱》是否真的被毁掉。

❓ 你的问题: 衣角上的“寒江”二字说明了什么?
🤖 回答: 根据提供的资料,衣角上的“寒江”二字说明了这片衣角上绣着或印着“寒江”二字,但资料没有说明这两
个字具体代表或说明了什么。

❓ 你的问题: 为什么黑衣人说“亲眼看见”萧寒带走剑谱
🤖 回答: 根据提供的资料,黑衣人说“亲眼看见”萧寒带走剑谱,是因为他声称在三日前从“寒江派”总坛亲眼见到
萧寒带出了《寒江剑谱》。

我随便问了几个问题,有的是文档中能推理出来的,有的是文档几乎直接给出的,有的是文档不太能推理出来的,你可以看到AI都可以比较准确的回答,推理不出来的问题,也会跟你说无法推断,不会自由发挥,这就是因为提示词中明确了如果上下文中找不到答案,就回复无法从资源推断之类的话。

其实就上面给出的简单代码,扩充完技术文档后,再加个向量数据库,用flask给出api接口,就是一个完成度不错的个人专属客服了。