Schritt 6 (Pipelines). Implementieren von Datenpipelinekorrekturen
Führen Sie die folgenden Schritte aus, um Ihre Datenpipeline für folgende Zwecke zu ändern und auszuführen:
- Erstellen eines Vektorindex
- Erstellen einer MLflow-Ausführung mit den Metadaten der Datenpipeline
Auf die resultierende MLflow-Ausführung wird durch das Notebook B_quality_iteration/02_evaluate_fixes
verwiesen.
Für die Änderung der Datenpipeline gibt es zwei Ansätze:
- Implementierung jeweils einer Korrektur Bei diesem Ansatz wird eine einzelne Datenpipeline konfiguriert und anschließend ausgeführt. Dieser Modus ist am besten geeignet, wenn Sie ein einzelnes Einbettungsmodell ausprobieren und einen einzelnen neuen Parser testen möchten. Es empfiehlt sich, hier zu beginnen, um sich mit diesen Notebooks vertraut zu machen.
- Gleichzeitige Implementierung mehrerer Korrekturen Dieser Ansatz wird auch als Sweep bezeichnet. Hierbei führen Sie parallel mehrere Datenpipelines aus, die jeweils über eine andere Konfiguration verfügen. Dieser Modus eignet sich am besten, wenn Sie viele verschiedene Strategien durchlaufen möchten, um beispielsweise drei PDF-Parser oder viele verschiedene Blockgrößen auszuwerten.
Den Beispielcode aus diesem Abschnitt finden Sie in diesem GitHub-Repository.
Ansatz 1: Implementierung jeweils einer Korrektur
- Öffnen Sie das Notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config.
- Führen Sie eines der folgenden Verfahren aus:
- Folgen Sie der Anleitung für die Implementierung einer neuen Konfiguration, die durch dieses Cookbook bereitgestellt wird.
- Führen Sie die Schritte zum Implementieren von benutzerdefiniertem Code für eine Analyse oder Segmentierung aus.
- Führen Sie die Pipeline auf eine der folgenden Arten aus:
- Öffnen Sie das Notebook 00_Run_Entire_Pipeline, und führen Sie es aus.
- Folgen Sie den Schritten, um jeden Schritt der Pipeline manuell auszuführen.
- Fügen Sie den Namen der resultierenden MLflow-Ausführung hinzu, der in der
DATA_PIPELINE_FIXES_RUN_NAMES
-Variablen im Notebook B_quality_iteration/02_evaluate_fixes ausgegeben wird.
Hinweis
Die Datenvorbereitungspipeline verwendet strukturiertes Streaming in Spark, um Dateien inkrementell zu laden und zu verarbeiten. Das bedeutet, dass bereits geladene und vorbereitete Dateien in Prüfpunkten nachverfolgt und nicht erneut verarbeitet werden. Nur neu hinzugefügte Dateien werden geladen, vorbereitet und an die entsprechenden Tabellen angefügt.
Wenn Sie also die gesamte Pipeline von Grund auf neu erneut ausführen und alle Dokumente erneut verarbeiten möchten, müssen Sie die Prüfpunkte und Tabellen löschen. Hierfür können Sie das Notebook reset_tables_and_checkpoints verwenden.
Ansatz 2: Gleichzeitige Implementierung mehrerer Korrekturen
- Öffnen Sie das Notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
- Folgen Sie den Anweisungen im Notebook, um mehrere Konfigurationen der auszuführenden Datenpipeline hinzuzufügen.
- Führen Sie das Notebook aus, um diese Pipelines auszuführen.
- Fügen Sie den Namen der resultierenden MLflow-Ausführungen hinzu, die in der
DATA_PIPELINE_FIXES_RUN_NAMES
-Variablen im Notebook B_quality_iteration/02_evaluate_fixes ausgegeben werden.
Anhang
Hinweis
Die unten angegebenen Notebooks befinden sich im Verzeichnis single_fix bzw. multiple_fixes – je nachdem, ob Sie eine einzelne Korrektur oder mehrere Korrekturen implementieren.
Ausführliche Betrachtung der Konfigurationseinstellungen
Hier finden Sie die verschiedenen vordefinierten Konfigurationsoptionen für die Datenpipeline. Alternativ können Sie einen benutzerdefinierten Parser bzw. eine benutzerdefinierte Segmentierungskomponente implementieren.
vectorsearch_config
: Geben Sie den Endpunkt Vektorsuche (muss bereits ausgeführt werden) und den Namen des zu erstellenden Index an. Definieren Sie außerdem die Art der Synchronisierung zwischen Quelltabelle und Index (Standardeinstellung:TRIGGERED
).embedding_config
: Geben Sie das zu verwendende Einbettungsmodell und den Tokenizer an. Eine vollständige Liste der Optionen finden Sie im Notebooksupporting_configs/embedding_models
. Das Einbettungsmodell muss für einen ausgeführten Modellbereitstellungsendpunkt bereitgestellt werden. Je nach Segmentierungsstrategie wird der Tokenizer auch während der Aufteilung ausgeführt, um sicherzustellen, dass die Blöcke die Tokenobergrenze des Einbettungsmodells nicht überschreiten. Tokenizer werden hier verwendet, um die Anzahl von Token in den Textblöcken zu zählen und so sicherzustellen, dass sie die maximale Kontextlänge des ausgewählten Einbettungsmodells nicht übersteigen.
Hier sehen Sie einen Tokenizer von HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
Hier sehen Sie einen Tokenizer von TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: Definiert den Dateiparser, die Segmentierungskomponente und den Pfad zum Quellfeld. Parser und Segmentierungskomponenten werden in im Notebookparser_library
bzw. im Notebookchunker_library
definiert. Diese befinden sich im Verzeichnis single_fix bzw. multiple_fixes. Eine vollständige Liste der Optionen finden Sie im Notebooksupporting_configs/parser_chunker_strategies
, das auch wieder im Verzeichnis für eine einzelne Korrektur sowie im Verzeichnis für mehrere Korrekturen enthalten ist. Für verschiedene Parser oder Segmentierungskomponenten sind möglicherweise unterschiedliche Konfigurationsparameter erforderlich, wobei<param x>
die potenziellen, für eine bestimmte Segmentierungskomponente erforderlichen Parameter darstellt. An Parser können auch Konfigurationswerte mit dem gleichen Format übergeben werden.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementieren eines benutzerdefinierten Parsers bzw. einer benutzerdefinierten Segmentierungskomponente
Dieses Projekt ist strukturiert, um das Hinzufügen von benutzerdefinierten Parsern oder Segmentierungskomponenten zur Datenvorbereitungspipeline zu erleichtern.
Hinzufügen eines neuen Parsers
Angenommen, Sie möchten einen neuen Parser mithilfe der PyMuPDF-Bibliothek integrieren, um analysierten Text in das Markdown-Format zu transformieren. Führen Sie folgende Schritte aus:
Installieren Sie die erforderlichen Abhängigkeiten, indem Sie dem Notebook
parser_library
im Verzeichnissingle_fix
odermultiple_fix
den folgenden Code hinzufügen:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
Fügen Sie im Notebook
parser_library
im Verzeichnissingle_fix
odermultiple_fix
einen neuen Abschnitt für denPyMuPdfMarkdown
-Parser hinzu, und implementieren Sie die Analysefunktion. Stellen Sie sicher, dass die Ausgabe der Funktion mit derParserReturnValue
-Klasse konform ist, die am Anfang des Notebooks definiert ist. Dadurch wird die Kompatibilität mit Spark-UDFs sichergestellt. Der Blocktry
oderexcept
verhindert, dass Spark den gesamten Analyseauftrag im Falle von Fehlern in einzelnen Dokumenten als nicht erfolgreich einstuft, wenn der Parser im Notebook02_parse_docs
im Verzeichnissingle_fix
odermultiple_fix
als UDF angewendet wird. Dieses Notebook überprüft, ob die Analyse für ein Dokument nicht erfolgreich war, stellt die entsprechenden Zeilen unter Quarantäne und gibt eine Warnung aus.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Fügen Sie Ihre neue Parserfunktion zu
parser_factory
im Notebookparser_library
im Verzeichnissingle_fix
odermultiple_fix
hinzu, um sie inpipeline_config
des Notebooks00_config
konfigurierbar zu machen.Im Notebook
02_parse_docs
werden Parserfunktionen in Spark Python-UDFs (Arrow-optimiert für Databricks Runtime 14.0 oder höhere Versionen) umgewandelt und auf den DataFrame angewendet, der die neuen binären PDF-Dateien enthält. Fügen Sie dem Notebook „parser_library“ zu Test- und Entwicklungszwecken eine einfache Testfunktion hinzu, die die Datei test-document.pdf lädt und eine erfolgreiche Analyse bestätigt:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Hinzufügen einer neuen Segmentierungskomponente
Die Schritte zum Hinzufügen einer neuen Segmentierungskomponente ähneln den Schritten, die weiter oben für das Hinzufügen eines neuen Parsers erläutert wurden.
- Fügen Sie die erforderlichen Abhängigkeiten im Notebook chunker_library hinzu.
- Fügen Sie einen neuen Abschnitt für Ihre Segmentierungskomponente hinzu, und implementieren Sie eine Funktion (z. B.
chunk_parsed_content_newchunkername
). Bei der Ausgabe der neuen Segmentierungsfunktion muss es sich um ein Python-Wörterbuch handeln, das mit derChunkerReturnValue
-Klasse konform ist, die am Anfang des Notebooks chunker_library definiert ist. Die Funktion muss mindestens eine Zeichenfolge des analysierten Texts akzeptieren, der segmentiert werden soll. Sollte Ihre Segmentierungskomponente weitere Parameter benötigen, können Sie diese als Funktionsparameter hinzufügen. - Fügen Sie Ihre neue Segmentierungskomponente der im Notebook chunker_library definierten
chunker_factory
-Funktion hinzu. Wenn Ihre Funktion zusätzliche Parameter akzeptiert, verwenden Sie „partial“ von functools, um sie vorab zu konfigurieren. Dies ist erforderlich, da UDFs nur einen einzelnen Eingabeparameter akzeptieren. In unserem Fall handelt es sich dabei um den analysierten Text.chunker_factory
ermöglicht das Konfigurieren verschiedener Segmentierungsmethoden in pipeline_config und gibt eine Spark Python-UDF (optimiert für Databricks Runtime 14.0 und höhere Versionen) zurück. - Fügen Sie einen einfachen Testabschnitt für Ihre neue Segmentierungsfunktion hinzu. In diesem Abschnitt soll ein vordefinierter Text segmentiert werden, der als Zeichenfolge bereitgestellt wird.
Leistungsoptimierung
Spark verwendet Partitionen, um die Verarbeitung zu parallelisieren. Daten werden in Zeilenblöcke aufgeteilt, und jede Partition wird standardmäßig von einem einzelnen Kern verarbeitet. Wenn Daten jedoch erstmals von Apache Spark gelesen werden, werden möglicherweise keine Partitionen erstellt, die für die gewünschte Berechnung optimiert sind. Das gilt insbesondere für unsere UDFs, die Analyse- und Segmentierungsaufgaben ausführen. Es ist wichtig, ein ausgewogenes Verhältnis zu erzielen, um Partitionen zu erstellen, die einerseits klein genug für eine effiziente Parallelisierung, andererseits aber nicht so klein sind, dass der Zusatzaufwand für ihre Verwaltung die Vorteile überwiegt.
Sie können die Anzahl von Partitionen mithilfe von df.repartitions(<number of partitions>)
anpassen. Peilen Sie bei der Anwendung von UDFs ein Vielfaches der Anzahl von Kernen an, die auf den Workerknoten verfügbar sind. Im Notebook 02_parse_docs können Sie beispielsweise df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
einschließen, um doppelt so viele Partitionen zu erstellen wie Workerkerne verfügbar sind. Ein Vielfaches zwischen 1 und 3 liefert in der Regel eine zufriedenstellende Leistung.
Manuelles Ausführen der Pipeline
Alternativ können Sie jedes einzelne Notebook Schritt für Schritt ausführen:
- Laden Sie die Rohdateien mithilfe des Notebooks
01_load_files
. Dadurch wird jedes Dokument binär als einzelner Datensatz in einer Bronzetabelle (raw_files_table_name
) gespeichert, die indestination_tables_config
definiert ist. Dateien werden inkrementell geladen, wodurch nur neue Dokumente seit der letzten Ausführung verarbeitet werden. - Analysieren Sie die Dokumente mithilfe des Notebooks
02_parse_docs
. Dieses Notebook führt das Notebookparser_library
aus. (Stellen Sie sicher, dass dies als erste Zelle ausgeführt wird, um Python neu zu starten.) Dadurch werden verschiedene Parser und zugehörige Hilfsprogramme verfügbar. Anschließend wird der angegebene Parser inpipeline_config
verwendet, um die einzelnen Dokumente zu analysieren und in Nur-Text umzuwandeln. So können beispielsweise relevante Metadaten wie die Anzahl von Seiten des ursprünglichen PDF-Dokuments zusammen mit dem analysierten Text erfasst werden. Erfolgreich analysierte Dokumente werden in einer Silbertabelle (parsed_docs_table_name
) gespeichert, während alle nicht analysierten Dokumente in einer entsprechenden Tabelle unter Quarantäne gestellt werden. - Segmentieren Sie die analysierten Dokumente mithilfe des Notebooks
03_chunk_docs
. Ähnlich wie beim Analysieren führt dieses Notebook das Notebookchunker_library
aus. (Auch hier muss es als erste Zelle ausgeführt werden.) Jedes analysierte Dokument wird mithilfe der angegebenen Segmentierungskomponente auspipeline_config
in kleinere Blöcke aufgeteilt. Jedem Block wird eine eindeutige ID mit einem MD5-Hash zugewiesen. Dies ist für die Synchronisierung mit dem Vektorsuchindex erforderlich. Die letzten Blöcke werden in eine Goldtabelle (chunked_docs_table_name
) geladen. - Erstellen/Synchronisieren Sie den Vektorsuchindex mit
04_vector_index
. Dieses Notebook überprüft die Bereitschaft des angegebenen Vektorsuchendpunkts invectorsearch_config
. Ist der konfigurierte Index bereits vorhanden, wird die Synchronisierung mit der Goldtabelle initiiert. Andernfalls wird der Index erstellt und die Synchronisierung ausgelöst. Dies wird voraussichtlich einige Zeit in Anspruch nehmen, wenn der Vektorsuchendpunkt und der Index noch nicht erstellt wurden.
Nächster Schritt
Fahren Sie mit Schritt 7: Bereitstellen und Überwachen fort.