Freigeben über


Schritt 6 (Pipelines). Implementieren von Datenpipelinekorrekturen

Datenpipeline

Führen Sie die folgenden Schritte aus, um Ihre Datenpipeline für folgende Zwecke zu ändern und auszuführen:

  1. Erstellen eines Vektorindex
  2. Erstellen einer MLflow-Ausführung mit den Metadaten der Datenpipeline

Auf die resultierende MLflow-Ausführung wird durch das Notebook B_quality_iteration/02_evaluate_fixes verwiesen.

Für die Änderung der Datenpipeline gibt es zwei Ansätze:

  • Implementierung jeweils einer Korrektur Bei diesem Ansatz wird eine einzelne Datenpipeline konfiguriert und anschließend ausgeführt. Dieser Modus ist am besten geeignet, wenn Sie ein einzelnes Einbettungsmodell ausprobieren und einen einzelnen neuen Parser testen möchten. Es empfiehlt sich, hier zu beginnen, um sich mit diesen Notebooks vertraut zu machen.
  • Gleichzeitige Implementierung mehrerer Korrekturen Dieser Ansatz wird auch als Sweep bezeichnet. Hierbei führen Sie parallel mehrere Datenpipelines aus, die jeweils über eine andere Konfiguration verfügen. Dieser Modus eignet sich am besten, wenn Sie viele verschiedene Strategien durchlaufen möchten, um beispielsweise drei PDF-Parser oder viele verschiedene Blockgrößen auszuwerten.

Den Beispielcode aus diesem Abschnitt finden Sie in diesem GitHub-Repository.

Ansatz 1: Implementierung jeweils einer Korrektur

  1. Öffnen Sie das Notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config.
  2. Führen Sie eines der folgenden Verfahren aus:
  3. Führen Sie die Pipeline auf eine der folgenden Arten aus:
  4. Fügen Sie den Namen der resultierenden MLflow-Ausführung hinzu, der in der DATA_PIPELINE_FIXES_RUN_NAMES-Variablen im Notebook B_quality_iteration/02_evaluate_fixes ausgegeben wird.

Hinweis

Die Datenvorbereitungspipeline verwendet strukturiertes Streaming in Spark, um Dateien inkrementell zu laden und zu verarbeiten. Das bedeutet, dass bereits geladene und vorbereitete Dateien in Prüfpunkten nachverfolgt und nicht erneut verarbeitet werden. Nur neu hinzugefügte Dateien werden geladen, vorbereitet und an die entsprechenden Tabellen angefügt.

Wenn Sie also die gesamte Pipeline von Grund auf neu erneut ausführen und alle Dokumente erneut verarbeiten möchten, müssen Sie die Prüfpunkte und Tabellen löschen. Hierfür können Sie das Notebook reset_tables_and_checkpoints verwenden.

Ansatz 2: Gleichzeitige Implementierung mehrerer Korrekturen

  1. Öffnen Sie das Notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
  2. Folgen Sie den Anweisungen im Notebook, um mehrere Konfigurationen der auszuführenden Datenpipeline hinzuzufügen.
  3. Führen Sie das Notebook aus, um diese Pipelines auszuführen.
  4. Fügen Sie den Namen der resultierenden MLflow-Ausführungen hinzu, die in der DATA_PIPELINE_FIXES_RUN_NAMES-Variablen im Notebook B_quality_iteration/02_evaluate_fixes ausgegeben werden.

Anhang

Hinweis

Die unten angegebenen Notebooks befinden sich im Verzeichnis single_fix bzw. multiple_fixes – je nachdem, ob Sie eine einzelne Korrektur oder mehrere Korrekturen implementieren.

Ausführliche Betrachtung der Konfigurationseinstellungen

Hier finden Sie die verschiedenen vordefinierten Konfigurationsoptionen für die Datenpipeline. Alternativ können Sie einen benutzerdefinierten Parser bzw. eine benutzerdefinierte Segmentierungskomponente implementieren.

  • vectorsearch_config: Geben Sie den Endpunkt Vektorsuche (muss bereits ausgeführt werden) und den Namen des zu erstellenden Index an. Definieren Sie außerdem die Art der Synchronisierung zwischen Quelltabelle und Index (Standardeinstellung: TRIGGERED).
  • embedding_config: Geben Sie das zu verwendende Einbettungsmodell und den Tokenizer an. Eine vollständige Liste der Optionen finden Sie im Notebook supporting_configs/embedding_models. Das Einbettungsmodell muss für einen ausgeführten Modellbereitstellungsendpunkt bereitgestellt werden. Je nach Segmentierungsstrategie wird der Tokenizer auch während der Aufteilung ausgeführt, um sicherzustellen, dass die Blöcke die Tokenobergrenze des Einbettungsmodells nicht überschreiten. Tokenizer werden hier verwendet, um die Anzahl von Token in den Textblöcken zu zählen und so sicherzustellen, dass sie die maximale Kontextlänge des ausgewählten Einbettungsmodells nicht übersteigen.

Hier sehen Sie einen Tokenizer von HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Hier sehen Sie einen Tokenizer von TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: Definiert den Dateiparser, die Segmentierungskomponente und den Pfad zum Quellfeld. Parser und Segmentierungskomponenten werden in im Notebook parser_library bzw. im Notebook chunker_library definiert. Diese befinden sich im Verzeichnis single_fix bzw. multiple_fixes. Eine vollständige Liste der Optionen finden Sie im Notebook supporting_configs/parser_chunker_strategies, das auch wieder im Verzeichnis für eine einzelne Korrektur sowie im Verzeichnis für mehrere Korrekturen enthalten ist. Für verschiedene Parser oder Segmentierungskomponenten sind möglicherweise unterschiedliche Konfigurationsparameter erforderlich, wobei <param x> die potenziellen, für eine bestimmte Segmentierungskomponente erforderlichen Parameter darstellt. An Parser können auch Konfigurationswerte mit dem gleichen Format übergeben werden.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementieren eines benutzerdefinierten Parsers bzw. einer benutzerdefinierten Segmentierungskomponente

Dieses Projekt ist strukturiert, um das Hinzufügen von benutzerdefinierten Parsern oder Segmentierungskomponenten zur Datenvorbereitungspipeline zu erleichtern.

Hinzufügen eines neuen Parsers

Angenommen, Sie möchten einen neuen Parser mithilfe der PyMuPDF-Bibliothek integrieren, um analysierten Text in das Markdown-Format zu transformieren. Führen Sie folgende Schritte aus:

  1. Installieren Sie die erforderlichen Abhängigkeiten, indem Sie dem Notebook parser_library im Verzeichnis single_fix oder multiple_fix den folgenden Code hinzufügen:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. Fügen Sie im Notebook parser_library im Verzeichnis single_fix oder multiple_fix einen neuen Abschnitt für den PyMuPdfMarkdown-Parser hinzu, und implementieren Sie die Analysefunktion. Stellen Sie sicher, dass die Ausgabe der Funktion mit der ParserReturnValue-Klasse konform ist, die am Anfang des Notebooks definiert ist. Dadurch wird die Kompatibilität mit Spark-UDFs sichergestellt. Der Block try oder except verhindert, dass Spark den gesamten Analyseauftrag im Falle von Fehlern in einzelnen Dokumenten als nicht erfolgreich einstuft, wenn der Parser im Notebook 02_parse_docs im Verzeichnis single_fix oder multiple_fix als UDF angewendet wird. Dieses Notebook überprüft, ob die Analyse für ein Dokument nicht erfolgreich war, stellt die entsprechenden Zeilen unter Quarantäne und gibt eine Warnung aus.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Fügen Sie Ihre neue Parserfunktion zu parser_factory im Notebook parser_library im Verzeichnis single_fix oder multiple_fix hinzu, um sie in pipeline_config des Notebooks 00_config konfigurierbar zu machen.

  4. Im Notebook 02_parse_docs werden Parserfunktionen in Spark Python-UDFs (Arrow-optimiert für Databricks Runtime 14.0 oder höhere Versionen) umgewandelt und auf den DataFrame angewendet, der die neuen binären PDF-Dateien enthält. Fügen Sie dem Notebook „parser_library“ zu Test- und Entwicklungszwecken eine einfache Testfunktion hinzu, die die Datei test-document.pdf lädt und eine erfolgreiche Analyse bestätigt:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Hinzufügen einer neuen Segmentierungskomponente

Die Schritte zum Hinzufügen einer neuen Segmentierungskomponente ähneln den Schritten, die weiter oben für das Hinzufügen eines neuen Parsers erläutert wurden.

  1. Fügen Sie die erforderlichen Abhängigkeiten im Notebook chunker_library hinzu.
  2. Fügen Sie einen neuen Abschnitt für Ihre Segmentierungskomponente hinzu, und implementieren Sie eine Funktion (z. B. chunk_parsed_content_newchunkername). Bei der Ausgabe der neuen Segmentierungsfunktion muss es sich um ein Python-Wörterbuch handeln, das mit der ChunkerReturnValue-Klasse konform ist, die am Anfang des Notebooks chunker_library definiert ist. Die Funktion muss mindestens eine Zeichenfolge des analysierten Texts akzeptieren, der segmentiert werden soll. Sollte Ihre Segmentierungskomponente weitere Parameter benötigen, können Sie diese als Funktionsparameter hinzufügen.
  3. Fügen Sie Ihre neue Segmentierungskomponente der im Notebook chunker_library definierten chunker_factory-Funktion hinzu. Wenn Ihre Funktion zusätzliche Parameter akzeptiert, verwenden Sie „partial“ von functools, um sie vorab zu konfigurieren. Dies ist erforderlich, da UDFs nur einen einzelnen Eingabeparameter akzeptieren. In unserem Fall handelt es sich dabei um den analysierten Text. chunker_factory ermöglicht das Konfigurieren verschiedener Segmentierungsmethoden in pipeline_config und gibt eine Spark Python-UDF (optimiert für Databricks Runtime 14.0 und höhere Versionen) zurück.
  4. Fügen Sie einen einfachen Testabschnitt für Ihre neue Segmentierungsfunktion hinzu. In diesem Abschnitt soll ein vordefinierter Text segmentiert werden, der als Zeichenfolge bereitgestellt wird.

Leistungsoptimierung

Spark verwendet Partitionen, um die Verarbeitung zu parallelisieren. Daten werden in Zeilenblöcke aufgeteilt, und jede Partition wird standardmäßig von einem einzelnen Kern verarbeitet. Wenn Daten jedoch erstmals von Apache Spark gelesen werden, werden möglicherweise keine Partitionen erstellt, die für die gewünschte Berechnung optimiert sind. Das gilt insbesondere für unsere UDFs, die Analyse- und Segmentierungsaufgaben ausführen. Es ist wichtig, ein ausgewogenes Verhältnis zu erzielen, um Partitionen zu erstellen, die einerseits klein genug für eine effiziente Parallelisierung, andererseits aber nicht so klein sind, dass der Zusatzaufwand für ihre Verwaltung die Vorteile überwiegt.

Sie können die Anzahl von Partitionen mithilfe von df.repartitions(<number of partitions>) anpassen. Peilen Sie bei der Anwendung von UDFs ein Vielfaches der Anzahl von Kernen an, die auf den Workerknoten verfügbar sind. Im Notebook 02_parse_docs können Sie beispielsweise df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) einschließen, um doppelt so viele Partitionen zu erstellen wie Workerkerne verfügbar sind. Ein Vielfaches zwischen 1 und 3 liefert in der Regel eine zufriedenstellende Leistung.

Manuelles Ausführen der Pipeline

Alternativ können Sie jedes einzelne Notebook Schritt für Schritt ausführen:

  1. Laden Sie die Rohdateien mithilfe des Notebooks 01_load_files. Dadurch wird jedes Dokument binär als einzelner Datensatz in einer Bronzetabelle (raw_files_table_name) gespeichert, die in destination_tables_config definiert ist. Dateien werden inkrementell geladen, wodurch nur neue Dokumente seit der letzten Ausführung verarbeitet werden.
  2. Analysieren Sie die Dokumente mithilfe des Notebooks 02_parse_docs. Dieses Notebook führt das Notebook parser_library aus. (Stellen Sie sicher, dass dies als erste Zelle ausgeführt wird, um Python neu zu starten.) Dadurch werden verschiedene Parser und zugehörige Hilfsprogramme verfügbar. Anschließend wird der angegebene Parser in pipeline_config verwendet, um die einzelnen Dokumente zu analysieren und in Nur-Text umzuwandeln. So können beispielsweise relevante Metadaten wie die Anzahl von Seiten des ursprünglichen PDF-Dokuments zusammen mit dem analysierten Text erfasst werden. Erfolgreich analysierte Dokumente werden in einer Silbertabelle (parsed_docs_table_name) gespeichert, während alle nicht analysierten Dokumente in einer entsprechenden Tabelle unter Quarantäne gestellt werden.
  3. Segmentieren Sie die analysierten Dokumente mithilfe des Notebooks 03_chunk_docs. Ähnlich wie beim Analysieren führt dieses Notebook das Notebook chunker_library aus. (Auch hier muss es als erste Zelle ausgeführt werden.) Jedes analysierte Dokument wird mithilfe der angegebenen Segmentierungskomponente aus pipeline_config in kleinere Blöcke aufgeteilt. Jedem Block wird eine eindeutige ID mit einem MD5-Hash zugewiesen. Dies ist für die Synchronisierung mit dem Vektorsuchindex erforderlich. Die letzten Blöcke werden in eine Goldtabelle (chunked_docs_table_name) geladen.
  4. Erstellen/Synchronisieren Sie den Vektorsuchindex mit 04_vector_index. Dieses Notebook überprüft die Bereitschaft des angegebenen Vektorsuchendpunkts in vectorsearch_config. Ist der konfigurierte Index bereits vorhanden, wird die Synchronisierung mit der Goldtabelle initiiert. Andernfalls wird der Index erstellt und die Synchronisierung ausgelöst. Dies wird voraussichtlich einige Zeit in Anspruch nehmen, wenn der Vektorsuchendpunkt und der Index noch nicht erstellt wurden.

Nächster Schritt

Fahren Sie mit Schritt 7: Bereitstellen und Überwachen fort.