手順 6 (パイプライン)。 データ パイプライン修正を実装する
次の手順に従って、データ パイプラインを変更し、次の目的で実行します。
- 新しいベクトル インデックスを作成します。
- データ パイプラインのメタデータを使用して MLflow 実行を作成します。
結果の MLflow 実行は、B_quality_iteration/02_evaluate_fixes
ノートブックによって参照されます。
データ パイプラインを変更するには、次の 2 つの方法があります。
- 一度に 1 つの修正を実装する: この方法では、一度に 1 つのデータ パイプラインを構成して実行します。 このモードは、1 つの埋め込みモデルを試し、1 つの新しいパーサーをテストする場合に最適です。 Databricks では、これらのノートブックを理解するために、この方法から始めることが提案されています。
- 一度に複数の修正を実装する: この方法は一括処理とも呼ばれ、それぞれ異なる構成を持つ複数のデータ パイプラインを並列で実行します。 このモードは、3 つの PDF パーサーを評価する、各種チャンク サイズを評価するなど、多くの異なる戦略を一括処理で試す場合に最適です。
このセクションのサンプル コードは GitHub リポジトリを参照してください。
方法 1: 一度に 1 つの修正を実装する
- B_quality_iteration/data_pipeline_fixes/single_fix/00_config ノートブックを開きます
- 次のいずれかの手順に従います。
- このクックブックで提供される新しい構成を実装する手順に従います。
- 解析またはチャンクのカスタム コードを実装する手順に従います。
- 次のいずれかの方法で、パイプラインを実行します。
- 00_Run_Entire_Pipeline ノートブックを開き、実行します。
- パイプラインの各手順を手動で実行する手順に従います。
- B_quality_iteration/02_evaluate_fixes ノートブックの
DATA_PIPELINE_FIXES_RUN_NAMES
変数に出力される結果の MLflow 実行の名前を追加します
Note
データ準備パイプラインは、Spark Structured Streaming を使用して、ファイルを増分読み込みして処理します。 これには、ファイルは既に読み込み済みで準備されており、チェックポイントで追跡されている必要があり、これから再処理されない必要があります。 新しく追加されたファイルのみが読み込まれ、準備され、対応するテーブルに追加されます。
そのため、パイプライン全体をゼロから再実行し、すべてのドキュメントを再処理する場合は、チェックポイントとテーブルを削除する必要があります。 これを行うには、reset_tables_and_checkpoints ノートブックを使用します。
方法 2: 一度に複数の修正を実装する
- B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines ノートブックを開きます。
- ノートブックの指示に従って、実行するデータ パイプラインの構成を 2 つ以上追加します。
- ノートブックを実行し、これらのパイプラインを実行します。
- B_quality_iteration/02_evaluate_fixes ノートブックの
DATA_PIPELINE_FIXES_RUN_NAMES
変数に出力される結果の MLflow 実行の名前を追加します。
付録
Note
一度に 1 つの修正を実装するか複数の修正を実装するかに応じて、single_fix ディレクトリか multiple_fixes ディレクトリかで、次で参照されているノートブックが見つかります。
構成設定の詳細情報
データ パイプラインに対して事前に実装されているさまざまな構成オプションを次に示します。 または、カスタム パーサー/チャンカーを実装することもできます。
vectorsearch_config
: ベクトル検索エンドポイント (稼働している必要がある) と、作成するインデックスの名前を指定します。 さらに、ソース テーブルとインデックスの間に synchronization 型を定義します (既定値はTRIGGERED
)。embedding_config
: 使用する埋め込みモデルをトークナイザーと共に指定します。 オプションの完全な一覧については、supporting_configs/embedding_models
ノートブックを参照してください。 埋め込みモデルは、実行中のモデル サービス エンドポイントにデプロイする必要があります。 チャンク戦略に応じて、トークナイザーも、分割中にチャンクが埋め込みモデルのトークン制限を超えないようにします。 ここではトークナイザーを使用して、テキスト チャンク内のトークンの数をカウントして、選択した埋め込みモデルの最大コンテキスト長を超えないようにします。
HuggingFace のトークナイザーを次に示します。
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
TikToken のトークナイザーを次に示します。
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: ファイル パーサー、チャンカー、ソース フィールドへのパスを定義します。 パーサーとチャンカーは、それぞれparser_library
ノートブックとchunker_library
ノートブックで定義されます。 これらは、single_fix ディレクトリと multiple_fixes ディレクトリにあります。 オプションの完全な一覧については、supporting_configs/parser_chunker_strategies
ノートブックを参照してください。このノートブックも、1 つの修正または複数の修正用の両方のディレクトリで使用できます。 パーサーまたはチャンカーが異なると、特定のチャンカーに必要な可能性があるパラメーターを<param x>
が表す必要な構成パラメーターが異なる場合があります。 パーサーは、同じ形式を使用して構成値を渡すこともできます。
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
カスタム パーサー/チャンカーの実装
このプロジェクトは、カスタム パーサーまたはチャンカーをデータ準備パイプラインに追加しやすくするために構成されています。
新しいパーサーを追加する
たとえば、PyMuPDF ライブラリを使用して新しいパーサーを組み込み、解析されたテキストを Markdown 形式に変換するとします。 次のステップを実行します。
single_fix
またはmultiple_fix
ディレクトリのparser_library
ノートブックに次のコードを追加して、必要な依存関係をインストールします。# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
single_fix
またはmultiple_fix
ディレクトリのparser_library
ノートブックで、PyMuPdfMarkdown
パーサーの新しいセクションを追加し、解析関数を実装します。 この関数の出力が、ノートブックの先頭で定義されているParserReturnValue
クラスに準拠していることを確認します。 これにより、Spark UDF との互換性が確保されます。try
またはexcept
ブロックを使用すると、single_fix
またはmultiple_fix
ディレクトリの02_parse_docs
ノートブックにパーサーを UDF として適用するときに、個々のドキュメントのエラーが原因で Spark が解析ジョブ全体に失敗することが防止できます。 このノートブックは、解析に失敗したドキュメントがあるかどうかを確認し、対応する行を隔離し、警告を発生させます。import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
新しい解析関数を
single_fix
またはmultiple_fix
ディレクトリのparser_library
ノートブックのparser_factory
に追加して、00_config
ノートブックのpipeline_config
で構成できるようにします。02_parse_docs
ノートブックで、パーサー関数は Spark Python UDF (Databricks Runtime 14.0 以降の場合は Arrow 最適化) に変換され、新しいバイナリ PDF ファイルを含むデータフレームに適用されます。 テストと開発のために、test-document.pdf ファイルを読み込み、解析が成功したことをアサートする parser_library ノートブックに単純なテスト関数を追加します。します。with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
新しいチャンカーを追加する
新しいチャンカーを追加するプロセスは、新しいパーサーについて上記で説明したものと同様の手順に従います。
- chunker_library ノートブックに、必要な依存関係を追加します。
- チャンカー用の新しいセクションを追加し、
chunk_parsed_content_newchunkername
などの関数を実装します。 新しいチャンカー関数の出力は、chunker_library ノートブックの先頭で定義されたChunkerReturnValue
クラスに準拠する Python ディクショナリである必要があります。 この関数は、少なくともチャンク対象の解析されたテキストの文字列を受け入れる必要があります。 チャンカーに追加のパラメーターが必要な場合は、それらを関数パラメーターとして追加できます。 - chunker_library ノートブックで定義されている
chunker_factory
関数に、新しいチャンカーを追加します。 関数が追加のパラメーターを受け入れる場合は、functools の partial を使用してそれらを事前に構成します。 これは、UDF が受け入れる入力パラメーターが 1 つだけであるためです。この場合、入力パラメーターは解析されたテキストです。chunker_factory
を使用すると、pipeline_config でさまざまなチャンカー メソッドを構成し、Spark Python UDF (Databricks Runtime 14.0 以降用に最適化) を返すことができます。 - 新しいチャンク関数の単純なテスト セクションを追加します。 このセクションでは、文字列として指定された定義済みのテキストをチャンクする必要があります。
パフォーマンスのチューニング
Spark は、並列処理のためにパーティションを使用します。 データは行のチャンクに分割され、各パーティションは既定で 1 つのコアによって処理されます。 ただし、データが Apache Spark によって最初に読み取られた場合、特に解析タスクとチャンク タスクを実行する UDF 用では、目的の計算用に最適化されたパーティションが作成されない場合があります。 効率的な並列化のためにパーティションは十分に小さく作成しつつ、パーティションを管理するオーバーヘッドが、小さくすることの利点を上回るほど小さくならないように、バランスを取る必要があります。
df.repartitions(<number of partitions>)
を使用してパーティションの数を調整できます。 UDF を適用するときは、ワーカー ノードで使用可能なコア数の倍数を目指します。 たとえば、02_parse_docs ノートブックに、使用可能なワーカー コアの数の 2 倍の数のパーティションを作成する df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
を含められます。 通常、1 倍と 3 倍の間で、妥当なパフォーマンスが得られます。
パイプラインを手動で実行する
または、個々のノートブックをステップ バイ ステップで実行することもできます。
01_load_files
ノートブックを使用して、RAW ファイルを読み込みます。 これにより、各ドキュメント バイナリが、destination_tables_config
で定義された bronze テーブル (raw_files_table_name
) に 1 つのレコードとして保存されます。 ファイルは増分的に読み込まれ、前回の実行以降の新しいドキュメントのみが処理されます。02_parse_docs
ノートブックを使用してドキュメントを解析します。 このノートブックは、parser_library
ノートブックを実行し (これは必ず Python を再起動するための最初のセルとして実行してください)、さまざまなパーサーと関連ユーティリティを使用できるようにします。 次に、pipeline_config
で指定されたパーサーを使用して、各ドキュメントをプレーン テキストに解析します。 たとえば、元の PDF のページ数などの関連メタデータが、解析されたテキストと一緒にキャプチャされます。 正常に解析されたドキュメントは、silver テーブル (parsed_docs_table_name
) に格納され、解析されていないドキュメントは、対応するテーブルに隔離されます。03_chunk_docs
ノートブックを使用して、解析されたドキュメントをチャンクします。 解析と同様に、このノートブックは、chunker_library
ノートブック (ここでも最初のセルとして実行) を実行します。pipeline_config
から指定されたチャンカーを使用して、解析された各ドキュメントをより小さなチャンクに分割します。 各チャンクには、ベクトル検索インデックスとの同期に必要な MD5 ハッシュを使用して一意の ID が割り当てられます。 最終的なチャンクは、gold テーブル (chunked_docs_table_name
) に読み込まれます。- ベクトル検索インデックスを作成し、
04_vector_index
と同期します。 このノートブックは、vectorsearch_config
内の指定されたベクトル検索エンドポイントの準備状況を確認します。 構成済みのインデックスが既に存在する場合は、gold テーブルとの同期が開始されます。それ以外の場合は、インデックスが作成され、同期がトリガーされます。 ベクトル検索エンドポイントとインデックスがまだ作成されていない場合は、しばらく時間がかかると予想されます。