步骤 6(管道)。 实施数据管道修复

数据管道

请按照以下步骤修改数据管道并运行它,从而实现以下目的:

  1. 创建新的矢量索引。
  2. 使用数据管道的元数据创建 MLflow 运行。

生成的 MLflow 运行由 B_quality_iteration/02_evaluate_fixes 笔记本引用。

可通过两种方法修改数据管道:

  • 一次实施一项修复:在此方法中,可以一次配置并运行一个数据管道。 如果想要试用单个嵌入模型并测试单个新的分析程序,则最好使用此方式。 Databricks 建议以此方式开始熟悉这些笔记本。
  • 一次实施多项修复:也称为整理,在此方法中,会并行运行多个具有不同配置的数据管道。 如果要跨许多不同的策略进行“整理”,例如,评估三个 PDF 分析程序或评估许多不同的区块大小,则最好使用此方式。

有关本部分中的示例代码,请参阅 GitHub 存储库

方法 1:一次实施一项修复

  1. 打开 B_quality_iteration/data_pipeline_fixes/single_fix/00_config 笔记本
  2. 按照以下其中一项中的说明操作:
  3. 通过以下任一方式运行管道:
  4. 添加生成的 MLflow 运行的名称,它们已输出到 B_quality_iteration/02_evaluate_fixes 笔记本内的 DATA_PIPELINE_FIXES_RUN_NAMES 变量中

注意

数据准备管道使用 Spark 结构化流以增量方式加载和处理文件。 这意味着会在检查点跟踪已加载和准备好的文件,并且不会重新对其进行处理。 只有新添加的文件才会被加载、准备并追加到相应的表中。

因此,如果要从头开始重新运行整个管道并重新处理所有文档,就需要删除检查点和表。 可以使用 reset_tables_and_checkpoints 笔记本来完成此操作。

方法 2:一次实施多项修复

  1. 打开 B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines 笔记本。
  2. 按照笔记本中的说明添加两个或多个要运行的数据管道配置。
  3. 运行笔记本,执行这些管道。
  4. 添加生成的 MLflow 运行的名称,它们已输出到 B_quality_iteration/02_evaluate_fixes 笔记本内的 DATA_PIPELINE_FIXES_RUN_NAMES 变量中。

附录

注意

可以在 single_fixmultiple_fixes 目录中找到下面提到的笔记本,具体目录取决于你是一次实施一项修复还是多项修复。

深入了解配置设置

下面列出了数据管道的各种预实现的配置选项。 或者,也可实现自定义分析程序/分块程序

  • vectorsearch_config:指定矢量搜索终结点(必须正常运行)和要创建的索引的名称。 另外,定义源表和索引之间的同步类型(默认为 TRIGGERED)。
  • embedding_config:指定要使用的嵌入模型和 tokenizer。 有关完整的选项列表,请参阅 supporting_configs/embedding_models 笔记本。 嵌入模型必须部署到正常运行的模型服务终结点。 根据分块策略,tokenizer 也会在拆分过程中确保区块不超过嵌入模型的词元限制。 此处使用 tokenizer 对文本块中的词元数进行计数,确保它们不超过所选嵌入模型的最大上下文长度。

下面显示了 HuggingFace 的 tokenizer:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

下面显示了 TikToken 的 tokenizer:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config:定义源字段的文件分析程序、分块程序和路径。 分析程序和分块程序分别在 parser_librarychunker_library 笔记本中定义。 可以在 single_fixmultiple_fixes 目录中找到这些笔记本。 有关完整的选项列表,请参阅 supporting_configs/parser_chunker_strategies 笔记本。single_fix 和 multiple_fixes 目录中也提供该笔记本。 不同的分析程序或分块程序可能需要不同的配置参数,其中 <param x> 表示特定分块程序所需的潜在参数。 还可以使用相同的格式向分析程序传递配置值。
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

实现自定义分析程序/分块程序

该项目的结构有助于向数据准备管道添加自定义分析程序或分块程序。

添加新的分析程序

假设你想要使用 PyMuPDF 库合并新的分析程序,从而将分析的文本转换为 Markdown 格式。 执行以下步骤:

  1. 可通过将以下代码添加到 single_fixmultiple_fix 目录下的 parser_library 笔记本中来安装所需依赖项:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. single_fixmultiple_fix 目录下的 parser_library 笔记本中,为 PyMuPdfMarkdown 分析程序添加一个新的部分并实现分析函数。 确保该函数的输出符合笔记本开头定义的 ParserReturnValue 类。 这样可以确保与 Spark UDF 兼容。 tryexcept 块可防止在 single_fixmultiple_fix 目录下的 02_parse_docs 笔记本中将分析程序作为 UDF 应用时因各个文档中的错误导致 Spark 无法完成整个分析作业的问题。 此笔记本将检查任何文档的分析是否失败、隔离相应的行并发出警告。

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. 将新的分析函数添加到 single_fixmultiple_fix 目录下 parser_library 笔记本内的 parser_factory 中,使其可在 00_config 笔记本的 pipeline_config 中进行配置。

  4. 02_parse_docs 笔记本中,分析程序函数转换为 Spark Python UDF(针对 Databricks Runtime 14.0 或更高版本进行了 Arrow 优化)并应用于包含新的二进制 PDF 文件的 DataFrame。 若要进行测试和开发,请将一个简单的测试函数添加到 parser_library 笔记本中,该函数的作用是加载 test-document.pdf 文件并断言成功的分析:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

添加新的分块程序

添加新分块程序的过程与上面介绍的新分析程序的步骤类似。

  1. chunker_library 笔记本中添加所需的依赖项。
  2. 为分块程序添加一个新的部分并实现一个函数,例如 chunk_parsed_content_newchunkername。 新分块程序函数的输出必须是符合 chunker_library 笔记本开头定义的 ChunkerReturnValue 类的 Python 字典。 该函数应至少接受要分块的已分析文本的字符串。 如果分块程序需要其他参数,则可以将其添加为函数参数。
  3. 将新的分块程序添加到 chunker_library 笔记本内定义的 chunker_factory 函数中。 如果你的函数接受其他参数,请使用 functools 的 partial 来对其进行预配置。 此操作是必要的,因为 UDF 只接受一个输入参数,即本例中已分析的文本。 chunker_factory 让你可以在 pipeline_config 中配置不同的分块程序方法,并返回 Spark Python UDF(已针对 Databricks Runtime 14.0 及更高版本进行了优化)。
  4. 为新的分块函数添加简单的测试部分。 该部分应该对作为字符串提供的预定义文本进行分块。

性能调优

Spark 利用分区来进行并行处理。 数据被分为行块,每个分区默认由单个核心处理。 但是,当 Apache Spark 最初读取数据时,它可能不会创建针对所需计算进行优化的分区,尤其是对于执行分析和分块任务的 UDF 来说。 在创建分区时,找到平衡点非常重要。分区要足够小,以便能够高效地并行处理;但又不能太小,以免管理这些分区的开销超过它们所带来的好处。

可以使用 df.repartitions(<number of partitions>) 调整分区数。 应用 UDF 时,目标是工作器节点上可用核心数量的倍数。 例如,在 02_parse_docs 笔记本中,可以添加 df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) 来创建数量是可用工作器核心数两倍的分区。 通常,1 到 3 之间的倍数应该会产生令人满意的性能。

手动运行管道

或者,可以分步运行每个笔记本:

  1. 使用 01_load_files 笔记本加载原始文件。 这样会将每个文档二进制文件保存为 destination_tables_config 中定义的青铜表 (raw_files_table_name) 中的一条记录。 文件以增量方式加载,仅处理自上次运行以来的新文档。
  2. 使用 02_parse_docs 笔记本分析文档。 此笔记本执行 parser_library 笔记本(确保将其作为第一个单元运行以重启 Python),使不同的分析程序和相关实用工具可供使用。 然后,它使用 pipeline_config 中的指定分析程序将每个文档分析为纯文本。 例如,捕获原始 PDF 的页数和已分析的文本等相关元数据。 成功分析的文档存储在白银表 (parsed_docs_table_name) 中,而任何未分析的文档将被隔离到相应的表中。
  3. 使用 03_chunk_docs 笔记本对已分析的文档进行分块。 与分析类似,此笔记本执行 chunker_library 笔记本(同样作为第一个单元运行)。 它使用 pipeline_config 中指定的分块程序将每个分析的文档拆分为较小的区块。 使用 MD5 哈希为每个区块分配一个唯一 ID,与矢量搜索索引同步时必须要有此 ID。 最终的区块会被加载到黄金表中 (chunked_docs_table_name)。
  4. 使用 04_vector_index 创建/同步矢量搜索索引。 此笔记本验证 vectorsearch_config 中指定矢量搜索终结点的就绪情况。 如果配置的索引已存在,它将启动与黄金表的同步过程;否则,它将创建索引并触发同步。 如果尚未创建矢量搜索终结点和索引,这可能需要一些时间。

下一步

继续执行步骤 7.部署和监视