步驟 6 (管線)。 實作資料管線修正
請遵循下列步驟來修改您的資料管線,並將其執行至:
- 建立新的向量索引。
- 使用數據管線的元數據建立 MLflow 執行。
筆記本會 B_quality_iteration/02_evaluate_fixes
參考產生的 MLflow 執行。
有兩種方法可以修改數據管線:
- 一次 實作單一修正 在此方法中,您會一次設定並執行單一數據管線。 如果您想要嘗試單一內嵌模型並測試單一新的剖析器,則此模式是最佳模式。 Databricks 建議從這裡開始 get 熟悉這些筆記本。
- 一次 實作多個修正 在這種方法中,也稱為掃掠,您可以平行執行多個具有不同組態的數據管線。 如果您想要跨許多不同的策略進行「掃掠」,例如,評估三個 PDF 剖析器或評估許多不同的區塊大小,則此模式是最佳模式。
如需本節中的範例程式碼,請參閱 GitHub 存放庫。
方法 1:一次實作單一修正
- 開啟 B_quality_iteration/data_pipeline_fixes/single_fix/00_config 筆記本
- 請遵循下列其中一項中的指示:
- 執行管線,方法是:
- 開啟並執行 00_Run_Entire_Pipeline 筆記本。
- 請遵循手動執行管線每個步驟的步驟。
- 將產生的 MLflow Run 名稱新增至
DATA_PIPELINE_FIXES_RUN_NAMES
B_quality_iteration/02_evaluate_fixes 筆記本中的變數
注意
數據準備管線會採用 Spark 結構化串流,以累加方式載入和處理檔案。 這需要已在檢查點中追蹤已載入和備妥的檔案,且不會重新處理。 只有新加入的檔案會被載入、準備並附加至對應的 tables。
因此,如果您要 從頭開始重新執行整個管線, 並重新處理所有檔,您需要刪除檢查點並 tables。 您可以使用 reset_tables_and_checkpoints Notebook 來完成這項作業。
方法 2:一次實作多個修正
- 開啟 B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines 筆記本。
- 請遵循筆記本中的指示,新增兩個或多個要執行的數據管線組態。
- 執行筆記本以執行這些管線。
- 將產生的 MLflow 執行名稱新增至
DATA_PIPELINE_FIXES_RUN_NAMES
B_quality_iteration/02_evaluate_fixes 筆記本中的變數。
附錄
注意
您可以根據您一次實作單一修正或多個修正,在 single_fix 和 multiple_fixes 目錄中找到下列參考的筆記本。
組態設定深入探討
下列列出數據管線的各種預先實作組態選項。 或者,您可以實 作自定義剖析器/區塊器。
-
vectorsearch_config
:指定 向量搜尋 端點(必須啟動並執行),以及要建立的索引名稱。 此外,定義來源 table 與索引之間的 同步處理 類型(預設值為TRIGGERED
)。 -
embedding_config
:指定要搭配Tokenizer使用的內嵌模型。 如需查看完整的 list 選項,請參閱supporting_configs/embedding_models
筆記本。 內嵌模型必須部署到執行中的模型服務端點。 根據區塊化策略,分詞器還會在分割時進行操作,以確保區塊不會超過嵌入模型的令牌限制 limit。 這裡會使用Tokenizer來計算文字區塊中的令牌數目,以確保它們不會超過所選內嵌模型的內容長度上限。
下列顯示 HuggingFace 中的 Tokenizer:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
下列顯示來自 TikToken 的 Tokenizer:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
-
pipeline_config
:定義來源欄位的檔案剖析器、區塊器和路徑。 剖析器和區塊器分別定義於和parser_library
筆記本中chunker_library
。 這些可以在single_fix和multiple_fixes目錄中找到。 如需選項的完整 list,請參閱supporting_configs/parser_chunker_strategies
筆記本,這可在單一和多個修正目錄中再次使用。 不同的剖析器或區塊器可能需要不同的組態 parameterswhere<param x>
代表特定區塊器所需的潛在參數 parameters。 剖析器也可以透過相同格式來傳遞組態 values。
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
實作自定義剖析器/區塊器
此專案是結構化的,可協助將自定義剖析器或區塊器新增至數據準備管線。
新增剖析器
假設您想要使用 PyMuPDF 連結庫 來合併新的剖析器,將剖析的文字轉換成 Markdown 格式。 執行下列步驟:
將下列程式代碼新增至 或
parser_library
目錄中的single_fix
multiple_fix
筆記本,以安裝必要的相依性:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
在
parser_library
或 目錄中的single_fix
筆記本中,新增剖multiple_fix
析器的新區段,並實作剖PyMuPdfMarkdown
析函式。 請確定函式的輸出符合ParserReturnValue
筆記本開頭所定義的類別。 這可確保與Spark UDF的相容性。try
或except
區塊可防止 Spark 在 或02_parse_docs
目錄中將剖析器套用為筆記本single_fix
中的multiple_fix
UDF 時,因為個別文件中發生錯誤,導致 Spark 無法完成整個剖析作業。 此筆記本會檢查是否有任何檔剖析失敗、隔離對應的數據列並引發警告。import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
將新的剖析函式新增至
parser_factory
或parser_library
目錄中的single_fix
,multiple_fix
使其可在筆記本的pipeline_config
中00_config
設定。在筆記本中
02_parse_docs
,剖析器函式會轉換成 Spark Python UDF(針對 Databricks Runtime 14.0 或更新版本優化箭號優化 ),並套用至包含新二進位 PDF 檔案的數據框架。 若要進行測試和開發,請將簡單的測試函式新增至載入test-document.pdf檔案並判斷提示成功剖析parser_library筆記本:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
新增區塊器
新增區塊器的程式會遵循上述針對新剖析器所說明的步驟。
- 在chunker_library筆記本中新增必要的相依性。
- 為區塊器新增區段並實作函式,例如
chunk_parsed_content_newchunkername
。 新區塊器函式的輸出必須是符合chunker_libraryChunkerReturnValue
所定義的類別的 Python 字典。 函式應該至少接受要區塊化之剖析文字的字串。 如果您的區塊器需要額外的 parameters,您可以將它們新增為函式 parameters。 - 將新的區塊器新增至
chunker_factory
chunker_library筆記本中定義的函式。 如果您的函式接受其他 parameters,請使用 functools 的部分 預先設定它們。 這是必要的,因為 UDF 只接受一個輸入參數,這是我們案例中剖析的文字。chunker_factory
可讓您在pipeline_config中設定不同的區塊器方法,並傳回Spark Python UDF(已針對 Databricks Runtime 14.0 和更新版本優化)。 - 為新的區塊化函式新增簡單的測試區段。 本節應該將預先定義的文字區塊化為字串。
效能微調
Spark 會利用數據分割來平行處理。 數據會分割成數據列的區塊,而且每個 partition 預設會由單一核心處理。 不過,當 Apache Spark 最初讀取數據時,它可能不會建立針對所需計算優化的分割區,特別是針對執行剖析和區塊化工作的 UDF。 建立足夠小的分割區,以便進行有效率的平行處理,而不需要那麼小,管理分割區的額外負荷就超過了優點,這一點非常重要。
您可以使用 來調整分割 df.repartitions(<number of partitions>)
區數目。 套用 UDF 時,請針對背景工作節點上可用的多個核心。 例如,在02_parse_docs筆記本中,您可以包含df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
建立與可用背景工作核心數目一樣多的分割區。 一般而言,介於 1 到 3 之間的倍數應該會產生令人滿意的效能。
手動執行管線
或者,您可以逐步執行每個個別的 Notebook:
-
01_load_files
。 這會將每個檔案的二進位檔儲存為定義在destination_tables_config
的青銅 table (raw_files_table_name
) 中的一筆記錄。 檔案會以累加方式載入,只處理自上次執行以來的新檔。 -
02_parse_docs
。 此筆記本會parser_library
執行筆記本(確保執行此動作作為重新啟動 Python 的第一個數據格),讓不同的剖析器和相關的公用程式可供使用。 然後,它會使用 中的pipeline_config
指定剖析器,將每個檔剖析成純文本。 例如,擷取與剖析文字一起的原始 PDF 頁數等相關元數據。 成功解析的檔案會儲存在銀 table(parsed_docs_table_name
),如果無法解析的檔案則會被隔離到相應的 table。 -
03_chunk_docs
區塊化。 類似於剖析,此筆記本會chunker_library
執行筆記本(同樣地,執行為第一個數據格)。 它會使用 指定的區塊器pipeline_config
,將每個剖析的檔分割成較小的區塊。 每個區塊都會使用 MD5 哈希來指派唯一標識碼,這是與向量搜尋索引同步處理的必要專案。 最後的區塊會載入金色 table(chunked_docs_table_name
)。 -
使用
04_vector_index
建立/Sync 向量搜尋索引。 此筆記本會驗證 中vectorsearch_config
指定向量搜尋端點的整備程度。 如果已設定的索引已經存在,它會起始與金 table的同步處理;否則,它會建立索引並觸發同步處理。 如果尚未建立向量搜尋端點和索引,這可能需要一些時間。
後續步驟
繼續進行 步驟 7。部署和監視。