Skip to main content

入库流水线

Written: 2026.06
第 16 章跟敲代码:codealong/chapters/ch16_ingestion_pipeline。 这部分代码是本章跟敲版,用来先跑通核心闭环;完整项目源码仍以本讲后文标注的 qa_core/scripts/ 等路径为准。
上一讲数据隔离与多租户设计
下一讲RAG 回归验收与入库质量

1. 本讲目标

  • 理解离线入库链路和在线问答链路的边界
  • 掌握文档加载、标准化、切分的完整流程
  • 理解 IndexManifest 的增量入库机制
  • 掌握 FAQ 从 CSV 到 Milvus 的完整流程
📖 前置阅读:如果你想深入理解 Parent-Child Chunking 的设计原理和 chunk size 的选择依据,请先阅读 附录G:文档切分策略

2. 前置知识 — 离线 vs 在线链路

2.1 清晰的工程边界

离线链路(入库)                      在线链路(问答)
─────────────────                    ─────────────────
定时/手动执行                         每次用户提问时执行
修改 Milvus 数据                      只读 Milvus 数据
可以慢(几分钟到几十分钟)            必须快(秒级响应)
可以重试、可以回滚                    必须一次成功
解析文件、切分、向量化                只做检索、不解析文件
关键原则:在线问答不解析文件、不执行 OCR、不写入知识库。

2.2 为什么分开

如果把文档解析放在在线链路:
  • 用户提问时临时解析 PDF → 首 token 延迟增加 5-10 秒
  • 文件解析失败时用户看到的是”PDF 损坏”而非答案
  • 无法做质量报告(因为解析是即时的,没有机会检查)
如果把向量化放在在线链路:
  • 用户问题需要等 Embedding 模型加载(冷启动 10+ 秒)
  • 无法预热 Embedding 模型

2.3 知识库构建总链路

本项目的离线知识库构建不是单独的“文档切分”或“FAQ 入库”,而是一条完整的版本化构建链路。可以用一句话概括:
离线链路负责把原始资料变成“带版本、带权限、可检索、可回滚”的知识资产;在线链路只读取当前 active 版本来回答问题。
生产或演示时有两个常用入口:
入口用途适合场景
scripts/rebuild_scenarios.py一次初始化/重建全部 8 个冻结场景新环境初始化、统一准备、Milvus schema 变更后全量修复
scripts/rebuild_kb_version.py只重建单个业务场景只修改了某个场景资料、演示单场景入库、定位某个场景问题
如果在 Docker Compose 里执行入库命令,先确认项目根目录存在 .env.compose。仓库只提交 .env.compose.example,首次部署需要生成本地配置文件:
if (!(Test-Path .env.compose)) { Copy-Item .env.compose.example .env.compose }
notepad .env.compose
如果是新环境,或者 Milvus collection schema 变更后需要全量修复,优先使用批量脚本:
python scripts/rebuild_scenarios.py --reset-collections
它会对 8 个冻结场景逐个执行“新建版本 → 强制入库 → 质量门禁 → 激活”,并在 --reset-collections 开启时删除旧 FAQ/Doc collection,确保 Milvus schema 按当前代码重新创建。 在 Docker Compose 模式下,对应命令是:
docker compose --env-file .env.compose up -d mysql etcd minio milvus
docker compose --env-file .env.compose build api
docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py --reset-collections
如果之前已经存在知识库,只是资料内容变化,批量重建全部 8 个场景时不加 --reset-collections
docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py
如果只重建一个场景,使用 scripts/rebuild_kb_version.py
python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --quality-gate --activate
Docker Compose 模式下,对应命令是:
docker compose --env-file .env.compose run --rm api python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --quality-gate --activate
如果 Milvus Collection 的 schema 发生过变化,例如 sparse 字段从普通 SparseVector 改成 BM25 Function 输出字段,需要加上 --reset-collections 删除旧集合并重新建表:
python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --reset-collections --quality-gate --activate
这里要区分两个参数:--force 只是忽略文件指纹、强制把资料重新写入新版本;--reset-collections 会删除 Milvus 里的 FAQ/Doc collection,让当前代码重新创建 schema。已有知识库只更新资料时,用 --force,不要默认加 --reset-collections 完整链路如下: 这条链路里有几个容易混淆的点:
环节作用解释
scenario.toml定义业务场景、collection、数据目录、默认权限先确定“这次给哪个业务场景建知识库”
--new-version创建一个新的 STAGED 版本新资料先进入候选版本,不直接覆盖线上
--force强制重新处理数据用于演示或需要重新入库时跳过增量判断
--reset-collections删除并重建 Milvus collection只在 schema 变化或旧集合不兼容时使用
FAQ 入库把 CSV 问答对写入 FAQ collection适合标准问答、政策口径、固定流程
文档入库把 PDF/Word/Markdown/表格等切成 chunk 写入 Doc collection适合长文档、制度、合同、手册
IndexManifest记录文件指纹和 chunk ID下次入库时判断文件是否变化,避免重复处理
质量报告统计入库质量问题把“知识库是否可靠”变成可检查的数据
--quality-gate质量门禁报告不达标就不激活,保护线上问答
--activate激活版本只有通过门禁的新版本才进入在线链路
所以第 16 讲后面的 FAQ 入库、文档加载、表格行入库、父子块切分、IndexManifest,都是这条总链路中的局部实现;第 14 讲负责解释版本状态机,第 17 讲负责解释质量门禁和评测。

3. 文档加载器注册表

3.1 注册表设计

# qa_core/indexing/document_loaders.py

# Loader 注册表:后缀 → 加载器工厂
DOCUMENT_LOADER_SPECS: tuple[DocumentLoaderSpec, ...] = (
    DocumentLoaderSpec(
        suffixes=(".txt", ".md"),
        factory=_utf8_text_loader,
        description="UTF-8 文本/Markdown"
    ),
    DocumentLoaderSpec(
        suffixes=(".pdf",),
        factory=_pdf_loader,
        description="PDF 文档"
    ),
    DocumentLoaderSpec(
        suffixes=(".docx",),
        factory=_docx_loader,
        description="Word 文档"
    ),
    DocumentLoaderSpec(
        suffixes=(".pptx", ".ppt"),
        factory=lambda p: UnstructuredPowerPointLoader(str(p)),
        description="PowerPoint"
    ),
    DocumentLoaderSpec(
        suffixes=(".csv", ".xlsx", ".xls"),
        factory=load_table_file,
        description="表格文件 — 按行解析"
    ),
)

DOCUMENT_LOADER_REGISTRY: dict[str, DocumentLoaderSpec] = {
    suffix: spec
    for spec in DOCUMENT_LOADER_SPECS
    for suffix in spec.suffixes
}

def get_document_loader_spec(path: Path) -> DocumentLoaderSpec | None:
    """根据文件后缀获取加载器注册项。"""
    return DOCUMENT_LOADER_REGISTRY.get(path.suffix.lower())

3.2 扩展性

新增文件格式只需添加一个注册项:
# 新增:支持 .html 文件
DOCUMENT_LOADER_REGISTRY[".html"] = DOCUMENT_LOADER_REGISTRY[".htm"] = DocumentLoaderSpec(
    suffixes=(".html", ".htm"),
    factory=lambda p: UnstructuredHTMLLoader(str(p)),
    description="HTML 网页"
)
注册表模式比 if/elif 分支更可维护。当文件类型增长时,if/elif 会变成几百行难以维护的代码。

4. 文档入库主流程

4.1 ingest_directory() 完整流程

# qa_core/indexing/service.py
def ingest_directory(
    directory_path: str,
    source: str | None = None,
    *,
    scenario_id: str | None = None,
    tenant_id: str | None = None,
    dataset_id: str | None = None,
    visibility: str | None = None,
    allowed_roles: list[str] | None = None,
    force: bool = False,
    kb_version: str | None = None,
    create_new_version: bool = False,
    activate: bool = False,
    description: str = "",
) -> int:
    """把目录中的业务文档增量写入 Milvus,逐文件委托 _ingest_single_file 处理。"""

    # Step 1:解析场景、构建数据域、确定业务分类
    scenario = resolve_scenario(scenario_id)
    data_scope = resolve_data_scope(
        tenant_id=tenant_id, dataset_id=dataset_id,
        visibility=visibility, user_roles=allowed_roles,
    )
    root = Path(directory_path)
    resolved_source = source or normalize_source_from_path(root)
    if resolved_source not in scenario.valid_sources:
        raise ValueError(f"无效的业务分类:{resolved_source}")

    # Step 2:创建/确认知识库版本
    version_store = get_kb_version_store(scenario.scenario_id)
    version = version_store.ensure_version(
        kb_version, create_new=create_new_version,
        description=description, created_by="ingest_directory",
    )
    active_kb_version = version.kb_version

    # Step 3:打开增量清单 + 文档存储
    manifest = IndexManifest(path=scenario.index_manifest_path)
    doc_store = get_doc_store(scenario.doc_collection)

    # Step 4:遍历目录,逐个文件委托给 _ingest_single_file
    total_chunks = 0
    skipped_files = 0
    for current_root, _, files in os.walk(root):
        for file_name in files:
            path = Path(current_root) / file_name
            chunks, skipped = _ingest_single_file(
                path, resolved_source, active_kb_version, scenario,
                data_scope, allowed_roles, doc_store, manifest, force,
            )
            if skipped:
                skipped_files += 1
            else:
                total_chunks += chunks

    # Step 5:持久化清单 + 记录入库统计
    manifest.save()
    version_store.record_ingest_result(
        active_kb_version, content_type="doc",
        count=total_chunks, source=resolved_source,
    )

    # Step 6:可选激活版本(需要 --activate 参数)
    if activate:
        version_store.activate_version(active_kb_version)

    return total_chunks
_ingest_single_file() 负责单个文件的增量入库逻辑,被 ingest_directory 的循环调用:
def _ingest_single_file(
    path, resolved_source, active_kb_version, scenario,
    data_scope, allowed_roles, doc_store, manifest, force,
) -> tuple[int, bool]:
    """处理单个文件:未变化则跳过,否则删除旧 chunk 后重新入库。"""
    if get_document_loader_spec(path) is None:
        raise ValueError(f"不支持的文档类型:{path}")
    fingerprint = file_fingerprint(path)
    existing = manifest.get(
        resolved_source, path, active_kb_version, scenario.scenario_id
    )
    # 指纹、Embedding 模型版本、chunk schema 均未变化时跳过
    if (
        not force
        and existing
        and existing.fingerprint == fingerprint
        and existing.embedding_model_version == get_settings().embedding_model_version
        and existing.chunk_schema_version == get_settings().chunk_schema_version
    ):
        return 0, True

    # 存在旧 chunk 时先删除,再重新加载、标准化、切分
    if existing and existing.chunk_ids:
        doc_store.delete_ids(existing.chunk_ids)
    docs = normalize_documents(
        load_file(path), path, resolved_source,
        active_kb_version, scenario.scenario_id,
        data_scope, allowed_roles,
    )
    chunks, ids = split_documents(docs)
    if chunks:
        doc_store.add_documents(chunks, ids=ids)
        manifest.update(
            resolved_source, path, fingerprint, ids,
            scenario_id=scenario.scenario_id,
            kb_version=active_kb_version,
            embedding_model_version=get_settings().embedding_model_version,
            chunk_schema_version=get_settings().chunk_schema_version,
        )
        return len(chunks), False
    return 0, False

4.2 normalize_documents 的作用

def normalize_documents(
    documents: list[Document],
    file_path: Path,
    source: str,
    kb_version: str | None = None,
    scenario_id: str | None = None,
    data_scope: DataScope | None = None,
    allowed_roles: list[str] | None = None,
) -> list[Document]:
    """为文档补充项目标准元数据,供过滤和引用使用。"""
    doc_id = file_fingerprint(file_path)
    scenario = resolve_scenario(scenario_id)
    scope = data_scope or resolve_data_scope()
    version_meta = version_metadata(kb_version, scenario.scenario_id)
    normalized: list[Document] = []
    for index, doc in enumerate(documents):
        metadata = dict(doc.metadata or {})
        metadata.update(
            {
                "source": source,
                "scenario_id": scenario.scenario_id,
                **scope.metadata(allowed_roles=allowed_roles),
                "file_path": str(file_path),
                "file_name": file_path.name,
                "file_type": file_path.suffix.lower(),
                "doc_id": doc_id,
                "page_index": metadata.get("page", index),
                "content_type": metadata.get("content_type") or "text",
                **version_meta,
            }
        )
        normalized.append(Document(page_content=doc.page_content, metadata=metadata))
    return normalized

5. 表格 CSV / Excel 专用入库设计

5.1 为什么表格不能按普通文本切分

普通制度、流程、手册是一段段自然语言,适合用 Parent-Child Chunking 按章节和字符长度切分。 但 CSV / Excel 表格不是自然段,而是一条条行记录。一行里多个单元格共同表达一个完整业务事实:
材料名称=施工照片
状态=待补交
责任人=项目经理
截止日期=2026-05-30
如果把表格当普通文本递归切分,可能出现:
  • 检索命中了“施工照片”,但状态被切到另一个 chunk;
  • 检索命中了“金额”,但付款节点、责任人丢失;
  • 两行不同记录被拼到同一个 chunk,答案把 A 行状态说成 B 行状态;
  • 答案引用只能定位到文件,不能定位到工作表和行号。
所以本项目对表格资料的原则是:
一行表格 = 一个完整业务语义单元。

5.2 文件读取策略

表格文件在 Loader 注册表中作为独立类型接入:
# qa_core/indexing/document_loaders.py
DocumentLoaderSpec(
    suffixes=(".csv", ".xlsx", ".xls"),
    factory=_table_loader,
    description="CSV/Excel 表格解析;按行保留表头、sheet 和单元格键值。",
)
读取规则:
文件类型读取方式说明
.csvpandas.read_csv(..., encoding="utf-8-sig")兼容带 BOM 的中文 CSV
.xlsxpandas.read_excel(..., sheet_name=None, engine="openpyxl")一次读取全部工作表
.xlspandas.read_excel(..., sheet_name=None, engine="xlrd")兼容旧版 Excel
Excel 会逐个 sheet 处理,避免把多个业务表混成一张表。

5.3 表格清洗

表格入库前先做轻量清洗:
def _normalize_frame(frame: pd.DataFrame) -> pd.DataFrame:
    """清理表格空行空列,并把缺失表头补成稳定列名。"""
    data = frame.dropna(how="all").dropna(axis=1, how="all").fillna("")
    columns = []
    for index, column in enumerate(data.columns, start=1):
        name = str(column).strip()
        if not name or name.lower().startswith("unnamed:"):
            name = f"列{index}"
        columns.append(name)
    data.columns = columns
    return data
清洗目标不是复杂 ETL,而是保证表格行进入 RAG 时不会因为空行、空列表头、Unnamed 列名造成检索噪声。 单元格值也会转成适合检索的短文本:
def _cell_text(value: object) -> str:
    text = str(value).strip()
    if text.endswith(".0") and text[:-2].isdigit():
        return text[:-2]
    return text
这样 1000.0 会变成 1000,金额、编号、数量类问题更容易命中。

5.4 每行转换为 Document

表格 loader 会把每一行转换成一个 LangChain Document
content = "\n".join(
    [
        f"表格文件:{path.name}",
        f"工作表:{sheet_name}",
        f"表头:{' / '.join(headers)}",
        f"行号:{row_number}",
        "单元格:",
        *cell_lines,
    ]
)
生成后的正文类似:
表格文件:验收清单.xlsx
工作表:材料验收
表头:材料名称 / 状态 / 责任人 / 截止日期
行号:3
单元格:
- 材料名称:施工照片
- 状态:待补交
- 责任人:项目经理
- 截止日期:2026-05-30
这样做有两个好处:
  1. 语义完整:同一行的字段和值不会被拆散;
  2. 适合向量检索和 BM25:既有自然语言标签,也有明确的列名和值。

5.5 metadata 设计

表格行必须携带可追溯 metadata:
metadata={
    "content_type": "table_row",
    "table_id": table_id,
    "sheet_name": str(sheet_name),
    "row_number": row_number,
    "row_count": len(normalized),
    "column_count": len(headers),
    "table_headers": " | ".join(headers),
}
字段含义:
字段作用
content_type=table_row告诉切分、质量检测、检索上下文:这是表格行
table_id标识同一个文件下的同一个工作表
sheet_name支持答案引用到具体工作表
row_number支持答案引用到具体行
row_count / column_count质量报告和容量评估使用
table_headers帮助回看表结构,也便于后续扩展表头召回

5.6 表格行不再递归切分

split_documents() 会识别 content_type=table_row
# qa_core/indexing/chunking.py
if is_table_metadata(doc.metadata):
    parent_docs = [doc]
else:
    parent_docs = parent_splitter.split_documents([doc])
也就是说,表格行不会再进入普通字符切分器。 原因是:表格行已经是完整业务单元,再切一次反而会破坏“列名 -> 单元格值”的关系。

5.7 检索策略中的 prefer_table

表格入库只是第一步。检索时还要识别用户是否在问表格问题。 本项目通过 is_table_query() 判断问题是否包含表格、清单、台账、字段、行号、工作表、状态、金额、责任人等表达:
prefer_table = is_table_query(compact_query)
params = _apply_table_preference(prefer_table, params["run_doc"], params, settings)
prefer_table=True 时:
  • 扩大 doc_top_k,多召回一些候选表格行;
  • 扩大 final_context_top_n,给表格证据更多上下文空间;
  • 设置 faq_direct_exact_only=True,禁止相似 FAQ 直接回答;
  • 上下文构建时把表格行排在普通正文前。
为什么要禁用相似 FAQ 直出?
用户问:验收材料清单里测试报告那一行是什么状态?
相似 FAQ:验收需要提交哪些材料?

这两个问题都包含“验收”“材料”“测试报告”,相似度可能不低。
但 FAQ 回答的是材料范围,用户问的是某一行字段值。
所以表格类问题只允许精确 FAQ 直出,相似 FAQ 必须让位给文档 RAG。

5.8 答案引用和兜底

表格资料的答案必须能回到原始证据。当前项目在来源标签中追加工作表和行号:
[1] 验收清单.xlsx / 工作表:材料验收 / 第 3 行
另外,表格类问题经常涉及状态、金额、责任人、日期等精确值。LLM 有时会概括回答而漏掉某个关键单元格,所以项目里增加了表格行兜底:
def enforce_table_row_details(answer: str, context_docs: list[Document]) -> str:
    """确保表格类答案在模型遗漏关键单元格时,确定性追加表格行要点。"""
如果模型回答没有覆盖表格行里的核心字段,系统会追加:
表格行要点:状态:待补交;责任人:项目经理 [1]
这不是替代 LLM,而是对表格精确字段的一层确定性保护。

5.9 面试话术

如果面试官问“Excel 和 CSV 怎么入库”,可以这样回答:
Excel 和 CSV 不能按普通文本切分。我们把每一行转成一个带表头、工作表、行号和单元格键值的 LangChain Document,并写入 content_type=table_row。切分阶段识别到表格行后不会再递归切分;检索阶段如果问题命中表格、清单、台账、金额、状态等关键词,会启用 prefer_table,扩大文档召回并优先保留表格行。答案引用会展示文件、工作表和行号,如果模型漏掉关键单元格,系统会追加表格行要点,保证表格类问题能追溯、能复核、字段不丢。

5.10 表格入库练习

这组练习用于确认“表格读取 → 行级 Document → 检索偏好 → 答案引用”已经闭环。 准备一个最小 CSV:
材料名称,状态,责任人,截止日期,备注
施工图纸,已提交,设计负责人,2026-05-10,版本为 V3
隐蔽工程照片,待补交,项目经理,2026-05-18,缺少二层西侧照片
验收测试报告,已通过,质量负责人,2026-05-20,检测编号 QA-2026-021
建议把它放到工程项目资料问答场景的数据目录中,并按常规知识库重建流程入库。学习时重点观察四件事:
检查点期望结果为什么检查
入库后的 metadata包含 content_type=table_rowsheet_namerow_number证明表格行没有被当成普通正文。
chunk 数量每个有效数据行生成一个可检索 Document证明行级证据粒度正确。
检索计划表格类问题命中 prefer_table=True证明检索策略知道当前问题更适合查表格。
答案来源来源中能看到文件、工作表、行号证明答案可以回到原始证据复核。
可以在页面或接口中提问:
验收清单里隐蔽工程照片是什么状态,责任人是谁?
理想回答应该包含:
  • 状态是“待补交”;
  • 责任人是“项目经理”;
  • 引用来源能定位到 CSV/Excel 的对应行;
  • 如果模型遗漏状态或责任人,系统会追加“表格行要点”。
这个练习的目的不是测试模型文采,而是验证表格证据没有在切分和生成阶段丢失。

5.11 当前边界

一期表格入库只覆盖“规范二维表”。复杂 Excel 能力不能无边界扩散,否则会把 RAG 项目变成 Office 解析项目。
边界场景一期处理策略推荐做法
合并单元格不默认还原层级语义入库前整理成普通二维表。
多级表头不自动推断复杂表头关系人工扁平化字段名,比如“合同-金额”“合同-付款节点”。
公式单元格读取解析后的单元格值,不重新计算业务公式关键计算逻辑应在业务系统或数据准备阶段完成。
图表不把柱状图、折线图直接转成结构化证据导出图表背后的原始数据表再入库。
截图表格不走 CSV/Excel 表格 loader进入 OCR/VLM 图文资料治理链路。
超大 Excel不在一期做复杂分布式解析拆分工作表、拆分文件,或按业务周期归档。
隐藏行列和批注不作为可信主证据重要内容必须整理成显式列。
透视表不直接作为原始证据导出明细表或汇总表后再入库。
面试时可以这样说:
我们一期支持的是规范 CSV/Excel 的行级语义入库,不追求解析所有复杂 Office 特性。这样做是为了保证 RAG 主链路清晰可控:表格行能召回、字段能引用、来源能复核。合并单元格、截图表格、图表解释这类复杂资料会进入后续 OCR/VLM 和资料治理链路,而不是塞进普通表格 loader 里。

6. IndexManifest 增量机制

6.1 为什么需要增量入库

假设知识库有 500 个 PDF 文件,每次修改一个文件就要全部重新入库:
  • 耗时:500 个 PDF 全部解析、切分、向量化 → 可能 20-30 分钟
  • 浪费:499 个未变化的文件被重复处理
  • 风险:重新入库过程中如果出错,旧数据也会被删除
增量入库:只处理变化的文件,未变化的跳过。

6.2 Manifest 文件结构

// .index_manifest/enterprise_knowledge/documents.json
{
    "scenario_id": "enterprise_knowledge",
    "last_full_ingest": "2026-05-07T15:00:00Z",
    "files": {
        "data/hr_data/入职流程.pdf": {
            "fingerprint": "a1b2c3d4e5f6...",
            "chunk_ids": ["chunk_001", "chunk_002", ...],
            "indexed_at": "2026-05-07T15:01:23Z",
            "doc_count": 12
        },
        "data/it_data/系统账号管理.md": {
            "fingerprint": "f6e5d4c3b2a1...",
            "chunk_ids": ["chunk_045", "chunk_046", ...],
            "indexed_at": "2026-05-07T15:02:45Z",
            "doc_count": 8
        }
    }
}

6.3 核心方法

class IndexManifest(JsonFileStore):
    @staticmethod
    def key(source, file_path, kb_version=None, scenario_id=None):
        """根据来源、路径、版本和场景生成稳定清单键。"""
        return stable_hash(scenario_id or "", source, kb_version or "", str(Path(file_path).resolve()))

    def get(self, source, file_path, kb_version=None, scenario_id=None):
        """如果文件曾经入库,返回对应清单记录。"""
        key = self.key(source, file_path, kb_version, scenario_id)
        raw = self.data.get("files", {}).get(key)
        if not raw:
            return None
        return ManifestRecord(key=key, **raw)

    def is_unchanged(self, source, file_path, fingerprint, kb_version=None, scenario_id=None):
        """检查当前文件指纹是否与清单一致。"""
        record = self.get(source, file_path, kb_version, scenario_id)
        return bool(record and record.fingerprint == fingerprint)

    def update(self, source, file_path, fingerprint, chunk_ids, *, scenario_id="", kb_version="", embedding_model_version="", chunk_schema_version=""):
        """记录一次成功入库及其生成的 chunk id。"""
        key = self.key(source, file_path, kb_version, scenario_id)
        self.data.setdefault("files", {})[key] = {
            "scenario_id": scenario_id,
            "source": source,
            "path": str(Path(file_path).resolve()),
            "fingerprint": fingerprint,
            "chunk_ids": chunk_ids,
            "updated_at": utc_now(),
            "kb_version": kb_version,
            "embedding_model_version": embedding_model_version,
            "chunk_schema_version": chunk_schema_version,
        }

    def iter_records(self, *, scenario_id=None, source=None, kb_version=None):
        """按条件列出清单记录。"""
        records = []
        for key, raw in self.data.get("files", {}).items():
            record = ManifestRecord(key=key, **raw)
            if scenario_id and record.scenario_id != scenario_id:
                continue
            if source and record.source != source:
                continue
            if kb_version and record.kb_version != kb_version:
                continue
            records.append(record)
        return records

6.4 文件指纹计算

前置知识:如果你不熟悉 SHA256 哈希和增量检测原理,请先阅读 附录B:SHA256 内容指纹与增量检测
def file_fingerprint(path: str | Path) -> str:
    """根据路径、修改时间和大小生成本地文件指纹,供增量入库使用。

    这里不读取文件全文,是为了让大文件入库前的变化判断更快。
    代价是极少数情况下如果内容变化但 mtime/size 不变,可能不会被识别;
    需要强制重建时使用 --force。
    """
    p = Path(path)
    stat = p.stat()
    return stable_hash(str(p.resolve()), stat.st_mtime_ns, stat.st_size)
基于路径、修改时间和大小的指纹确保文件元数据变化时被检测到,比全文 SHA256 更快且适合大文件场景(极端情况下内容变化但 mtime/size 不变时使用 —force 重建)。

7. FAQ 入库流程

7.1 CSV 格式

FAQ 使用 CSV 文件管理,每行一个问答对:
source,question,answer
hr,入职需要准备哪些材料,入职当天需要携带:身份证原件及复印件、学历证书复印件、离职证明、体检报告、银行卡信息...
hr,试用期转正流程是什么,试用期转正流程:1. 员工提交转正申请 2. 直属领导评估 3. HR 审核 4. 部门负责人审批...
it,VPN 连接失败怎么办,请按以下步骤排查:1. 确认账号密码正确 2. 检查网络连接 3. 尝试切换 VPN 节点...
billing,如何申请发票,在订单页面点击"申请发票",选择发票类型(电子/纸质),填写发票抬头...

7.2 入库实现

# qa_core/indexing/faq_ingestion.py

def faq_documents_from_csv(
    csv_path: str,
    kb_version: str | None = None,
    scenario_id: str | None = None,
    tenant_id: str | None = None,
    dataset_id: str | None = None,
    visibility: str | None = None,
    allowed_roles: list[str] | None = None,
) -> tuple[list[Document], list[str]]:
    """把 FAQ CSV 转换为可写入 Milvus 的问题文档。

    FAQ 的 page_content 只放"标准问题",答案放在 metadata.answer。这样检索时匹配的是
    用户问题和标准问题的相似度;一旦高置信命中,就可以直接返回 metadata.answer。
    """
    scenario = resolve_scenario(scenario_id)
    data_scope = resolve_data_scope(tenant_id=tenant_id, dataset_id=dataset_id, visibility=visibility, user_roles=allowed_roles)
    version_meta = version_metadata(kb_version, scenario.scenario_id)
    data = pd.read_csv(csv_path, encoding="utf-8")
    docs: list[Document] = []
    ids: list[str] = []
    seen_ids: set[str] = set()
    for _, row in data.iterrows():
        question = str(row.get("问题") or row.get("question") or "").strip()
        answer = str(row.get("答案") or row.get("answer") or "").strip()
        subject = str(
            row.get("source")
            or row.get("source_filter")
            or row.get("业务分类")
            or row.get("subject_name")
            or ""
        ).strip()
        if not question or not answer:
            continue

        source = normalize_faq_source(subject, scenario=scenario, question=question)
        faq_id = stable_hash(scenario.scenario_id, kb_version or "", source, question)
        if faq_id in seen_ids:
            faq_id = stable_hash(scenario.scenario_id, kb_version or "", source, question, answer)
        if faq_id in seen_ids:
            continue
        seen_ids.add(faq_id)
        docs.append(
            Document(
                page_content=question,
                metadata={
                    "faq_id": faq_id,
                    "scenario_id": scenario.scenario_id,
                    **data_scope.metadata(allowed_roles=allowed_roles),
                    "standard_question": question,
                    "answer": answer,
                    "source": source,
                    "subject_name": subject,
                    "status": "published",
                    **version_meta,
                },
            )
        )
        ids.append(faq_id)
    return docs, ids
存储策略
  • page_content = FAQ 标准问题 → 用于向量检索
  • metadata.answer = 标准答案 → 检索命中后直接取 metadata 返回
  • metadata.source = 当前场景 valid_sources 中的标准分类 → 用于 Milvus 过滤和数据隔离
这样 FAQ 直出时不需要再调用 LLM,直接从 metadata 读取答案即可。 normalize_faq_source() 只依赖当前场景包的 valid_sourcessource_patterns。如果 CSV 中的分类无法映射到当前场景,系统会直接报错,而不是偷偷写入 Milvus。这样可以保证 FAQ 入库的业务边界和场景配置一致。

8. 清理与维护

8.1 清理已删除的本地文件

当本地文档被删除时,Milvus 中的旧 chunk 不会自动消失。需要运行清理脚本:
# 预览将要清理的内容(默认 dry-run)
python scripts/cleanup_missing_docs.py --scenario enterprise_knowledge

# 实际执行清理
python scripts/cleanup_missing_docs.py --scenario enterprise_knowledge --no-dry-run

8.2 cleanup_missing_document_chunks 原理

def cleanup_missing_document_chunks(
    *,
    scenario_id: str | None = None,
    source: str | None = None,
    kb_version: str | None = None,
    dry_run: bool = True,
) -> dict[str, Any]:
    """清理 manifest 中已不存在本地文件的文档 chunk。

    该操作会删除 Milvus 数据,默认 dry-run 先预览再执行。
    """
    scenario = resolve_scenario(scenario_id)
    manifest = IndexManifest(path=scenario.index_manifest_path)
    records = manifest.iter_records(
        scenario_id=scenario.scenario_id,
        source=source,
        kb_version=kb_version,
    )
    missing = [r for r in records if r.path and not Path(r.path).exists()]

    if dry_run:
        return {
            "dry_run": True,
            "missing_file_count": len(missing),
            "affected_chunk_count": sum(len(r.chunk_ids) for r in missing),
            "missing_files": [
                {"path": r.path, "chunk_count": len(r.chunk_ids)}
                for r in missing
            ],
        }

    # 实际删除
    doc_store = get_doc_store(scenario.doc_collection)
    for record in missing:
        doc_store.delete_ids(record.chunk_ids)
        manifest.remove_by_key(record.key)

    manifest.save()
    return {
        "dry_run": False,
        "deleted_chunk_count": sum(len(r.chunk_ids) for r in missing),
        "deleted_file_count": len(missing),
    }
默认 dry-run:先预览再执行,防止误删。

9. 复杂图文资料入库治理

9.1 这属于多模态吗

导入文档中同时存在文字、图片、截图、扫描页、流程图、设备照片时,本质上已经进入了多模态资料处理范围。 但在当前一期项目里,它应该被定位为:
多模态入库治理,不是多模态在线问答。
两者区别如下:
类型做什么当前一期定位
多模态入库治理离线解析图片、扫描件、图文 PDF,把结果转成可复核文本或图文块可以讲清楚设计边界,谨慎接入
多模态在线问答用户实时上传图片,模型现场看图回答不放一期主链路
多模态检索同时存文本向量和图片向量,用 CLIP/VLM 做跨模态召回更适合二期或三期
这样设计的原因是:在线问答必须稳定、低延迟、可追踪;图片解析、OCR、VLM 描述成本高且失败率高,如果直接塞进在线链路,会让 RAG 主流程变慢、变重、变不可控。

9.2 为什么不能“图片 OCR 一下就入库”

真实企业资料中的图片经常包含:
  • 合同扫描件;
  • 审批截图;
  • 设备告警截图;
  • 流程图;
  • 验收照片;
  • 表格截图;
  • 盖章文件;
  • 票据和单证照片。
这些内容的风险不只是“能不能识别出文字”,而是:
风险示例
OCR 识别错误金额 8000 被识别成 B000
上下文断裂图片中的“处理步骤”脱离前后正文后无法理解
来源不可追溯回答引用了图片内容,但不知道来自第几页第几张图
证据未确认扫描件内容未经人工复核,不能作为正式制度口径
图中信息不全流程图箭头、颜色、图例无法仅靠 OCR 还原
所以复杂图文资料不能简单走“OCR -> 普通文本切分 -> 入库”。正确流程应该是:
图文资料
  -> 抽取文本层
  -> 识别图片/扫描页
  -> OCR 或 VLM 生成候选说明
  -> 绑定附近正文、页码、图片编号
  -> 人工复核
  -> 生成 image_text_block
  -> 入库质量检查
  -> 新知识库版本激活

9.3 三类资料的处理策略

资料类型处理方式是否直接进入 active 知识库
有文本层的 PDF / Word / PPT正文先按普通文档入库,图片进入风险报告正文可以,图片不直接进
扫描件 / 图片 PDF进入离线 OCR,生成待复核 Markdown不直接进
图片和正文强相关资料生成图文语义块 image_text_block复核后才可以进
当前项目已有离线 OCR 脚本:
python scripts/ocr/run_offline_ocr.py --input-dir incoming_scans --output-dir reports/ocr/batch_001
python scripts/ocr/promote_ocr_candidates.py --input-dir reports/ocr/batch_001 --scenario engineering_project_qa --source quality --apply
第一条命令只生成待复核资料,第二条命令才把复核后的 Markdown 提升到场景资料目录。提升后仍然要执行知识库版本重建、入库质量检查和RAG 回归验收。

9.4 image_text_block 推荐结构

对于图片和正文强相关的资料,不应该把 OCR 文本当成普通段落直接切分,而应该生成专门的图文块:
page_content:
  第 3 页第 2 张图片说明:设备告警面板显示 E102,温度超过 85°C。
  图片附近正文:处理方式为先停机检查冷却风扇,再联系运维。

metadata:
  content_type: image_text_block
  file_name: equipment_alarm_manual.pdf
  page_index: 3
  image_index: 2
  ocr_confidence: 0.91
  review_status: reviewed
  parent_content: 第 3 页完整上下文
关键字段说明:
字段作用
content_type=image_text_block告诉检索和上下文构建:这是图文块,不是普通正文
page_index / image_index支持答案引用到具体页和具体图片
ocr_confidence用于入库质量检查,低置信度不能直接激活
review_status只有 reviewed 才允许进入 active 知识库
parent_content保留图片附近正文,避免图片文字脱离上下文

9.5 分块策略

图文混排资料的切分原则是:
  1. 正文按章节或父子块切分:继续复用当前 split_documents() 的 Parent-Child Chunking。
  2. 表格按行切分:CSV/Excel 仍然使用 content_type=table_row,不参与普通递归切分。
  3. 图片 OCR 文本不单独裸切:必须绑定页码、图片编号和附近正文。
  4. 未复核图文块不进 active:只能作为候选资料进入复核区或治理报告。
  5. 低置信度图文块阻断激活:避免把错误金额、日期、合同号写入正式知识库。
也就是说,图文资料的最小语义单元不是“识别出的一行字”,而是:
图片 OCR 文本 + 图片附近正文 + 页码 + 图片编号 + 置信度 + 复核状态

9.6 检索策略

图文块进入知识库后,也不应该和普通正文完全同权。 推荐策略:
  • 普通知识问题:优先使用文本 chunk 和表格 chunk。
  • 用户问题包含“图片、截图、扫描件、照片、图中、流程图、告警面板”等表达时,提高 image_text_block 权重。
  • 如果命中的图文块 review_status != reviewed,回答必须标记“未确认”,不能把它当成正式证据。
  • 来源展示必须包含文件名、页码和图片编号。
这样既能让图文资料参与 RAG,又不会让未确认图片内容污染正式答案。

9.7 面试话术

如果面试官问“你们项目支持多模态吗”,可以这样回答:
我们一期没有做实时多模态对话,而是把多模态能力收敛在知识库入库治理侧。图片、扫描件、图文 PDF 会先通过离线 OCR 或 VLM 生成可复核文本,再绑定页码、图片编号、附近正文和置信度。只有人工复核通过的图文块才会以 image_text_block 形式进入知识库,并继续经过入库质量检查、版本激活和回归评测。这样既能处理企业资料中的多模态信息,又不会让在线问答链路变重、变慢、变不稳定。
这段内容的重点不是展示 OCR 接入,而是理解企业 RAG 中多模态资料必须经过治理、复核、入库质量检查和版本化上线。

10. data_packs 与企业资料增强包

项目里除了 scenarios/,还有一个 data_packs/ 目录。它们不是同一种东西,学习时要先区分清楚。
scenarios/                         当前正式知识库数据源
  enterprise_knowledge/
  equipment_ops/
  ...

data_packs/enterprise_realistic_pack/   企业仿真增强资料包
  clean_overlay/                        可治理、可预检的增强候选资料
  dirty_samples/                        只用于资料治理演示的脏样本

10.1 scenarios 是主链路数据源

scenarios/ 是当前 8 个冻结业务场景的正式资料目录。执行下面命令时,默认读取的就是 scenarios/
python scripts/rebuild_scenarios.py --reset-collections
单场景重建也是一样:
python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --quality-gate --activate
所以第一遍跑通主链路时只需要关心 scenarios/,它保证主链路足够稳定、可控、可复现。

10.2 clean_overlay 是增强候选,不自动入库

data_packs/enterprise_realistic_pack/clean_overlay/ 用来模拟更真实的企业资料,例如:
  • 区域差异和例外规则;
  • 角色权限和金额阈值;
  • 审批链和补签流程;
  • 合同付款风险;
  • 跨境单证金额变更;
  • 理赔材料不一致;
  • SaaS 企业客户账单和集成问题。
它不是 active 知识库的一部分,也不会被 rebuild_scenarios.py 自动读取。这样设计是为了避免“增强资料还没治理完,就污染正式知识库”。 clean overlay 的正确流程是:
clean_overlay
  -> 构建预览数据集
  -> 入库质量预检
  -> overlay 就绪检查
  -> 生成上线计划
  -> 执行经过校验的 rebuild_kb_version.py
  -> 激活新版本
  -> 跑 overlay 回归评测
常用命令:
python scripts/enterprise_overlay/build_enterprise_overlay_dataset.py --all-scenarios --output reports/verification/enterprise_overlay_build_latest.json
python scripts/enterprise_overlay/check_enterprise_overlay_readiness.py --output reports/verification/enterprise_overlay_readiness_latest.json
python scripts/enterprise_overlay/plan_enterprise_overlay_activation.py --output reports/verification/enterprise_overlay_activation_plan_latest.json
python scripts/enterprise_overlay/run_enterprise_overlay_activation.py --plan reports/verification/enterprise_overlay_activation_plan_latest.json --output reports/verification/enterprise_overlay_activation_run_latest.json

10.3 dirty_samples 只用于治理演示

data_packs/enterprise_realistic_pack/dirty_samples/ 不能直接入库。它里面放的是用来讲资料治理风险的样本,例如:
脏样本类型风险
过期制度可能覆盖当前有效口径
OCR 噪声金额、日期、编号可能识别错误
表格导出混乱字段缺失、列名不规范、行语义不完整
命名混乱source 难以推断,影响检索过滤
FAQ/正文冲突标准答案和正文口径不一致
dirty samples 的正确流向是:
dirty_samples
  -> 风险识别
  -> 人工清洗/复核
  -> 变成 clean_overlay
  -> 再走 overlay 预检和版本激活
对应分析命令:
python scripts/enterprise_overlay/analyze_dirty_enterprise_samples.py --output reports/verification/dirty_enterprise_samples_latest.json

10.4 三者关系总结

目录是否默认入库作用
scenarios/当前正式知识库资料,8 场景初始化读取这里
data_packs/.../clean_overlay/企业仿真增强候选资料,预检通过后才能按计划激活
data_packs/.../dirty_samples/资料治理教学样本,只用于风险识别和清洗演示
面试话术:
我们没有把所有资料都直接塞进 active 知识库。scenarios/ 是当前正式数据源,保证主链路稳定;clean_overlay 是企业仿真增强候选,需要先通过预检、就绪检查和回归评测;dirty_samples 只用于演示真实企业资料治理问题,不能直接入库。这样既能保持教学主链路可控,又能讲清企业资料从脏数据到可上线知识资产的治理过程。

11. 入库失败排查手册

入库链路牵涉 MySQL、Milvus、Embedding、Reranker、场景配置、质量门禁和版本激活。排查时不要直接猜原因,按下面顺序查,速度最快。

11.1 先确认当前 active 版本

页面提示“信息不足”、检索结果为空、或者刚重建后仍然回答旧内容时,先查 active 版本:
docker compose --env-file .env.compose run --rm api python -c "from qa_core.config.settings import get_settings; from qa_core.scenarios.registry import get_scenario; from qa_core.governance.kb_versions import KnowledgeBaseVersionStore; s=get_settings(); sc=get_scenario(s.active_scenario_id); store=KnowledgeBaseVersionStore(sc.kb_versions_path); print(sc.id); print(store.get_active_version())"
判断:
现象含义处理
active=None没有激活版本,在线问答不知道查哪批数据重新执行 rebuild_kb_version.py --quality-gate --activate
active 不是刚构建的版本新版本停留在 staged 或 gate 失败查看质量报告,修复后重新激活
active 是新版本但仍没答案继续查 collection 和过滤条件看 10.2/10.3

11.2 再确认 Milvus collection 是否存在且有数据

docker compose --env-file .env.compose run --rm api python -c "from pymilvus import MilvusClient; from qa_core.config.settings import get_settings; c=MilvusClient(uri=get_settings().milvus_uri); print(c.list_collections())"
如果 collection 不存在,说明入库没有真正写到 Milvus;如果 collection 存在但实体数量很少或为 0,需要回看入库日志。 常见原因:
现象原因处理
collection 不存在场景配置里的 collection 名和实际不一致,或入库任务失败检查 scenario.toml,重新构建
只有 FAQ 没有 Doc文档目录为空,或 --skip-docs 被使用检查 scenarios/<id>/docs
只有 Doc 没有 FAQFAQ CSV 不存在,或 --skip-faq 被使用检查 faq.csv

11.3 schema 不兼容时使用 reset-collections

如果日志出现:
sparse 字段不是 BM25 Function 输出字段
nq [0] is invalid
BM25 Function / sparse 字段不兼容
通常表示复用了旧 schema collection。处理方式是删除旧 collection 并重建:
docker compose --env-file .env.compose run --rm api python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --reset-collections --quality-gate --activate
排查口径:
只手动删除 collection 不会自动生成新知识库版本。必须重新跑入库脚本,让脚本重新创建 collection、写入 FAQ/Doc、生成质量报告并激活版本。

11.4 质量门禁失败先看报告,不要直接跳过

--quality-gate 失败时,说明资料里可能存在空文件、重复 FAQ、source 无效、FAQ/正文冲突或低质量 chunk。 排查顺序:
查看 reports/quality/
  -> 找到对应 scenario 和 kb_version 的报告
  -> 先修复 failed_files / unsupported_files / empty_files
  -> 再修复 duplicate_faq_questions / invalid_sources
  -> 最后再考虑调整阈值
不要为了让命令通过就直接去掉 --quality-gate。这会让低质量资料进入 active 知识库,后面在线问答会变成“能检索,但答得不可靠”。

11.5 重建后页面还是旧答案

按这个顺序检查:
  1. 页面右侧当前状态里的知识库版本是否变成新版本。
  2. .env.composeACTIVE_SCENARIO_ID 是否是你刚重建的场景。
  3. API 容器是否重新加载了 .env.compose
docker compose --env-file .env.compose up -d --force-recreate api
docker logs -f knowforge-api
  1. 是否有多个 Milvus 实例:宿主机脚本连的是 127.0.0.1:19530,容器内脚本连的是 http://milvus:19530。要确认两者指向同一个 Docker Compose 服务。

11.6 八场景全量初始化的推荐命令

如果你希望在新环境中一次性把全部 8 个场景初始化到可演示状态,使用:
if (!(Test-Path .env.compose)) { Copy-Item .env.compose.example .env.compose }
notepad .env.compose
docker compose --env-file .env.compose up -d mysql etcd minio milvus
docker compose --env-file .env.compose build api
docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py --reset-collections
如果之前已经存在知识库,只是资料内容变化,重建全部 8 个场景时不要删除 collection:
docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py
如果容器镜像里还没有最新脚本,执行 docker compose --env-file .env.compose build api 后再运行入库命令。

12. 本讲实践闭环

项目内容
本讲类型项目实现 + 工程治理
实践产物文档/FAQ/表格入库链路、质量预检、rebuild_kb_version.pyrebuild_scenarios.py
是否进入最终项目
验收方式单场景或 8 场景重建成功,Milvus collection 有数据,active 版本已激活
后续落点第 17 讲对入库结果做质量评估和门禁
通过标准:资料能从文件进入可检索知识库,metadata、版本、隔离字段完整,失败可按排查手册定位。

12.1 本讲从 0 到 1 实现闭环

这一讲是离线链路的完整交付:把文件、FAQ、表格资料变成线上可检索的数据。实现顺序如下:
  1. 先实现文件 Loader 注册表,让不同后缀走不同解析器。
  2. 再实现标准化,把 source、scenario、kb_version、DataScope 写入 metadata。
  3. 然后实现 chunking,把长文档切成适合检索的片段。
  4. FAQ 单独入库:问题作为检索文本,标准答案放 metadata。
  5. 最后由 ingest_directory() 串起加载、标准化、切分、写入 Milvus、更新 Manifest。
实现完成后,相关代码结构应该是下面这张图: 来源:真实代码调用点,见 qa_core/indexing/document_loaders.py
LOADER_REGISTRY = {
    ".md": load_markdown,
    ".txt": load_text,
    ".csv": load_csv_rows,
    ".xlsx": load_excel_rows,
}

def load_documents(path):
    loader = LOADER_REGISTRY[path.suffix.lower()]
    return loader(path)
标准化阶段是离线链路和在线链路的接口。在线检索依赖的过滤字段,必须在这里写全。 来源:真实代码调用点,见 qa_core/indexing/document_normalizer.py
def normalize_documents(docs, scenario, kb_version, scope):
    for doc in docs:
        doc.metadata["scenario_id"] = scenario.id
        doc.metadata["kb_version"] = kb_version
        doc.metadata["tenant_id"] = scope.tenant_id
        doc.metadata["source"] = infer_source(doc, scenario)
    return docs
文档入库主流程要能重复执行。没变化的文件通过 Manifest 跳过;文件变化、Embedding 版本变化或 chunk schema 变化时,要先删除旧 chunk,再重新解析和写入。 来源:真实代码逻辑压缩版,对应 qa_core/indexing/service.py::ingest_directory()_ingest_single_file()
def ingest_directory(directory_path, source=None, *, scenario_id=None,
                     tenant_id=None, dataset_id=None, visibility=None,
                     allowed_roles=None, force=False, kb_version=None,
                     create_new_version=False, activate=False):
    scenario = resolve_scenario(scenario_id)
    data_scope = resolve_data_scope(
        tenant_id=tenant_id,
        dataset_id=dataset_id,
        visibility=visibility,
        user_roles=allowed_roles,
    )
    root = Path(directory_path)
    resolved_source = source or normalize_source_from_path(root)
    if resolved_source not in scenario.valid_sources:
        raise ValueError(f"无效的业务分类:{resolved_source}")

    version = get_kb_version_store(scenario.scenario_id).ensure_version(
        kb_version,
        create_new=create_new_version,
        created_by="ingest_directory",
    )
    manifest = IndexManifest(path=scenario.index_manifest_path)
    doc_store = get_doc_store(scenario.doc_collection)

    for path in walk_supported_files(root):
        chunks, skipped = _ingest_single_file(
            path, resolved_source, version.kb_version, scenario,
            data_scope, allowed_roles, doc_store, manifest, force,
        )
        total_chunks += chunks

    manifest.save()
    version_store.record_ingest_result(version.kb_version, content_type="doc", count=total_chunks, source=resolved_source)
    if activate:
        version_store.activate_version(version.kb_version)

def _ingest_single_file(path, source, kb_version, scenario, data_scope, allowed_roles, doc_store, manifest, force):
    if get_document_loader_spec(path) is None:
        raise ValueError(f"不支持的文档类型:{path}")

    fingerprint = file_fingerprint(path)
    existing = manifest.get(source, path, kb_version, scenario.scenario_id)
    if not force and existing and existing.fingerprint == fingerprint \
       and existing.embedding_model_version == settings.embedding_model_version \
       and existing.chunk_schema_version == settings.chunk_schema_version:
        return 0, True

    if existing and existing.chunk_ids:
        doc_store.delete_ids(existing.chunk_ids)

    docs = normalize_documents(load_file(path), path, source, kb_version, scenario.scenario_id, data_scope, allowed_roles)
    chunks, ids = split_documents(docs)
    if chunks:
        doc_store.add_documents(chunks, ids=ids)
        manifest.update(source, path, fingerprint, ids, scenario_id=scenario.scenario_id, kb_version=kb_version)
        return len(chunks), False
    return 0, False
FAQ 入库不能把“答案”也作为主要检索文本,否则标准答案过长时会稀释问题语义。本项目让问题参与检索,答案作为 metadata 被命中后直出。 来源:真实代码调用点,见 qa_core/indexing/faq_ingestion.py
Document(
    page_content=row["question"],
    metadata={
        "answer": row["answer"],
        "hit_type": "faq",
        "source": row["source"],
    },
)
单场景验收用 rebuild_kb_version.py,全量初始化 8 个场景用 rebuild_scenarios.py 来源:命令行验收,对应 scripts/rebuild_kb_version.pyscripts/rebuild_scenarios.py
python scripts/rebuild_scenarios.py --reset-collections
已有知识库只刷新资料内容时,使用:
python scripts/rebuild_scenarios.py
闭环验证重点:
验证项验证方式期望结果
Loader 注册放入不同格式文件能按后缀选择加载器
metadata 标准化查看 chunk metadatasource、scenario、version、scope 完整
文档切分入库长文档chunk 粒度合理且可追溯
FAQ 入库检索标准问题命中后可直接取 metadata.answer
Manifest 增量重复执行入库未变化文件可跳过
schema/模型升级修改 embedding 或 chunk schema 版本旧 chunk 被删除后重新写入
source 校验传入非法 source直接报错,防止写错业务分类
版本激活activate=True新版本成为 active
8 场景初始化执行 rebuild_scenarios.py所有场景生成 active 版本
验收重点:文件能进入 Milvus,active 版本能更新,metadata、版本、隔离字段完整,失败时能用排查手册定位。

13. 重点掌握

优先级内容原因
★★★ 必会离线入库 vs 在线问答的清晰边界:入库修改数据(可慢可重试),问答只读(必须快)理解两条链路不能混淆的根本原因
★★★ 必会ingest_directory() 的完整流程:确认版本 → 遍历文件 → 指纹比对 → 加载/标准化 → 切分 → 写入 Milvus → 更新 Manifest文档入库的主流程
★★★ 必会IndexManifest 增量机制:通过文件指纹(路径+修改时间+大小)判断文件是否变化,未变化跳过避免每次全量重建的关键设计
★★ 理解注册表模式管理 Document Loaders:后缀→工厂函数的映射,扩展新格式只需添加注册项可扩展性设计模式
★★ 理解normalize_documents() 补充项目标准元数据(source、scenario_id、kb_version、data_scope 等)保证每个 chunk 有完整的过滤字段
★★ 理解FAQ 入库:page_content 存问题、metadata.answer 存答案,检索命中后直接返回FAQ 直出不走 LLM 的实现基础
★★ 理解表格 CSV/Excel 按行入库:每行为一个完整业务语义单元,不递归切分表格资料的特殊处理策略
★★ 理解scenarios/clean_overlay/dirty_samples/ 的边界防止把候选增强资料或脏样本直接污染 active 知识库
★ 了解复杂图文资料的多模态入库治理流程了解扩展方向
★ 了解cleanup_missing_document_chunks() 清理已删除文件对应的 Milvus chunk了解维护工具

14. 本讲小结

  • 离线入库 ≠ 在线问答:入库负责解析文件、切分、向量化、写入 Milvus;问答只做检索和生成
  • 注册表模式管理文件格式→Loader 的映射,扩展新格式只需添加注册项
  • 表格 CSV/Excel 按行入库:每行是一个 table_row,保留表头、工作表、行号和单元格键值,不再递归切分
  • IndexManifest 记录每个文件的指纹和 chunk ID,实现增量入库(只处理变化的文件)
  • FAQ 入库将标准问题作为检索内容、标准答案存储在 metadata 中,检索命中后直接返回
  • 复杂图文资料属于多模态入库治理:OCR/VLM 结果必须绑定上下文、人工复核、入库质量检查和版本激活后才能进入 active 知识库
  • data_packs 不是默认数据源clean_overlay 是增强候选,dirty_samples 是治理演示样本,默认 8 场景初始化只读取 scenarios/
  • 清理脚本默认 dry-run,先预览再执行,防止误删
下一讲RAG 回归验收与入库质量 — 入库质量报告、评测指标、回归验收体系、Bad Case 闭环