Krok 6 (potoki). Implementowanie poprawek potoku danych
Wykonaj następujące kroki, aby zmodyfikować potok danych i uruchomić go w celu:
- Utwórz nowy indeks wektorowy.
- Utwórz przebieg platformy MLflow z metadanymi potoku danych.
Wynikowy przebieg MLflow jest przywoływany B_quality_iteration/02_evaluate_fixes
przez notes.
Istnieją dwa podejścia do modyfikowania potoku danych:
- Zaimplementuj pojedynczą poprawkę w czasie W tym podejściu skonfigurujesz i uruchomisz jeden potok danych jednocześnie. Ten tryb jest najlepszy, jeśli chcesz wypróbować pojedynczy model osadzania i przetestować pojedynczy nowy analizator. Usługa Databricks sugeruje rozpoczęcie pracy z tymi notesami.
- Zaimplementuj wiele poprawek jednocześnie W tym podejściu, nazywanym również zamiataniem, uruchamiasz równolegle wiele potoków danych, z których każda ma inną konfigurację. Ten tryb jest najlepszy, jeśli chcesz "zamiatać" w wielu różnych strategiach, na przykład ocenić trzy analizatory PLIKÓW PDF lub ocenić wiele różnych rozmiarów fragmentów.
Zobacz repozytorium GitHub, aby zapoznać się z przykładowym kodem w tej sekcji.
Podejście 1. Implementowanie pojedynczej poprawki naraz
- Otwórz notes B_quality_iteration/data_pipeline_fixes/single_fix/00_config
- Postępuj zgodnie z instrukcjami w jednym z poniższych elementów:
- Postępuj zgodnie z instrukcjami, aby zaimplementować nową konfigurację udostępnioną przez ten podręcznik.
- Wykonaj kroki, aby zaimplementować kod niestandardowy na potrzeby analizowania lub fragmentowania.
- Uruchom potok, wykonując jedną z następujących czynności:
- Otwieranie i uruchamianie notesu 00_Run_Entire_Pipeline .
- Wykonaj kroki ręcznego uruchamiania każdego kroku potoku.
- Dodaj nazwę wynikowego przebiegu MLflow, który jest zwracany do zmiennej w notesie
DATA_PIPELINE_FIXES_RUN_NAMES
B_quality_iteration/02_evaluate_fixes
Uwaga
Potok przygotowywania danych wykorzystuje przesyłanie strumieniowe ze strukturą platformy Spark w celu przyrostowego ładowania i przetwarzania plików. Oznacza to, że pliki już załadowane i przygotowane są śledzone w punktach kontrolnych i nie będą ponownie przetwarzane. Tylko nowo dodane pliki zostaną załadowane, przygotowane i dołączone do odpowiednich tabel.
W związku z tym, jeśli chcesz ponownie uruchomić cały potok od podstaw i ponownie przetworzyć wszystkie dokumenty, musisz usunąć punkty kontrolne i tabele. Można to zrobić za pomocą notesu reset_tables_and_checkpoints .
Podejście 2. Implementowanie wielu poprawek jednocześnie
- Otwórz notes B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
- Postępuj zgodnie z instrukcjami w notesie, aby dodać co najmniej dwie konfiguracje potoku danych do uruchomienia.
- Uruchom notes, aby wykonać te potoki.
- Dodaj nazwy wynikowych przebiegów MLflow, które są zwracane do zmiennej w notesie
DATA_PIPELINE_FIXES_RUN_NAMES
B_quality_iteration/02_evaluate_fixes.
Dodatek
Uwaga
Notesy, do których odwołujesz się poniżej, można znaleźć w katalogach single_fix i multiple_fixes w zależności od tego, czy wdrażasz pojedynczą poprawkę, czy wiele poprawek naraz.
Szczegółowe omówienie ustawień konfiguracji
Poniżej wymieniono różne wstępnie zaimplementowane opcje konfiguracji dla potoku danych. Alternatywnie można zaimplementować niestandardowy analizator/fragmenter.
vectorsearch_config
: Określ punkt końcowy wyszukiwania wektorowego (musi być uruchomiony i uruchomiony) oraz nazwę indeksu, który ma zostać utworzony. Ponadto zdefiniuj typ synchronizacji między tabelą źródłową a indeksem (wartość domyślna toTRIGGERED
).embedding_config
: Określ model osadzania do użycia wraz z tokenizatorem. Aby uzyskać pełną listę opcji, zobaczsupporting_configs/embedding_models
notes. Model osadzania musi zostać wdrożony w uruchomionym punkcie końcowym obsługującym model. W zależności od strategii fragmentowania tokenizer jest również podczas dzielenia, aby upewnić się, że fragmenty nie przekraczają limitu tokenu modelu osadzania. Tokenizatory są używane w tym miejscu do zliczania tokenów we fragmentach tekstu w celu zapewnienia, że nie przekraczają maksymalnej długości kontekstu wybranego modelu osadzania.
Poniżej przedstawiono tokenizator z aplikacji HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
Poniżej przedstawiono tokenizer z TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: definiuje analizator plików, fragmenter i ścieżkę do pola źródła. Analizatory i fragmenty są definiowane odpowiednio wparser_library
notesach ichunker_library
. Można je znaleźć w katalogach single_fix i multiple_fixes . Aby uzyskać pełną listę opcji, zobaczsupporting_configs/parser_chunker_strategies
notes, który jest ponownie dostępny w jednym i wielu katalogach poprawek. Różne analizatory lub fragmenty mogą wymagać różnych parametrów konfiguracji, w których<param x>
reprezentują potencjalne parametry wymagane dla określonego fragmentatora. Analizatory można również przekazywać wartości konfiguracji przy użyciu tego samego formatu.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementowanie niestandardowego analizatora/fragmentatora
Ten projekt ma strukturę ułatwiania dodawania niestandardowych analizatorów lub fragmentatorów do potoku przygotowywania danych.
Dodawanie nowego analizatora
Załóżmy, że chcesz dołączyć nowy analizator przy użyciu biblioteki PyMuPDF, aby przekształcić przeanalizowany tekst w format markdown. Wykonaj te kroki:
Zainstaluj wymagane zależności, dodając następujący kod do notesu
parser_library
single_fix
w katalogu ormultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
W notesie
parser_library
single_fix
w katalogu ormultiple_fix
dodaj nową sekcję analizatoraPyMuPdfMarkdown
i zaimplementuj funkcję analizowania. Upewnij się, że dane wyjściowe funkcji są zgodne z klasą zdefiniowanąParserReturnValue
na początku notesu. Zapewnia to zgodność z funkcjami zdefiniowanymi przez użytkownika platformy Spark. Bloktry
orexcept
uniemożliwia platformie Spark niepowodzenie całego zadania analizowania z powodu błędów w poszczególnych dokumentach podczas stosowania analizatora jako funkcji zdefiniowanej przez użytkownika w notesiesingle_fix
w02_parse_docs
katalogu lubmultiple_fix
. Ten notes sprawdzi, czy analizowanie nie powiodło się dla dowolnego dokumentu, poddać kwarantannie odpowiednie wiersze i zgłosić ostrzeżenie.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Dodaj nową funkcję analizowania do
parser_factory
elementu w notesieparser_library
single_fix
w katalogu lubmultiple_fix
, aby umożliwić jego skonfigurowanie wpipeline_config
notesie00_config
.W notesie
02_parse_docs
funkcje analizatora są przekształcane w funkcje zdefiniowane przez użytkownika platformy Spark w języku Python (zoptymalizowane pod kątem strzałki dla środowiska Databricks Runtime 14.0 lub nowszego) i stosowane do ramki danych zawierającej nowe binarne pliki PDF. Na potrzeby testowania i programowania dodaj prostą funkcję testowania do notesu parser_library, który ładuje plik test-document.pdf i potwierdza pomyślne analizowanie:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Dodawanie nowego fragmentatora
Proces dodawania nowego fragmentatora jest zgodny z opisanymi powyżej krokami dotyczącymi nowego analizatora.
- Dodaj wymagane zależności w notesie chunker_library .
- Dodaj nową sekcję dla fragmentatora i zaimplementuj funkcję, np.
chunk_parsed_content_newchunkername
. Dane wyjściowe nowej funkcji fragmentatora muszą być słownikiem języka Python zgodnym zChunkerReturnValue
klasą zdefiniowaną na początku notesu chunker_library . Funkcja powinna akceptować co najmniej ciąg analizowanego tekstu, który ma być fragmentowany. Jeśli fragmentator wymaga dodatkowych parametrów, możesz dodać je jako parametry funkcji. - Dodaj nowy fragment do
chunker_factory
funkcji zdefiniowanej w notesie chunker_library . Jeśli funkcja akceptuje dodatkowe parametry, użyj częściowych narzędzi functools, aby je wstępnie skonfigurować. Jest to konieczne, ponieważ funkcje zdefiniowane przez użytkownika akceptują tylko jeden parametr wejściowy, który będzie analizowany tekst w naszym przypadku. Funkcjachunker_factory
umożliwia konfigurowanie różnych metod fragmenterów w pipeline_config i zwraca funkcję UDF platformy Spark w języku Python (zoptymalizowaną pod kątem środowiska Databricks Runtime 14.0 lub nowszego). - Dodaj prostą sekcję testowania dla nowej funkcji fragmentowania. Ta sekcja powinna zawierać wstępnie zdefiniowany tekst podany jako ciąg.
Dostrajanie wydajności
Platforma Spark wykorzystuje partycje do równoległego przetwarzania. Dane są podzielone na fragmenty wierszy, a każda partycja jest domyślnie przetwarzana przez jeden rdzeń. Jednak gdy dane są początkowo odczytywane przez platformę Apache Spark, mogą nie tworzyć partycji zoptymalizowanych pod kątem żądanych obliczeń, szczególnie w przypadku funkcji zdefiniowanych przez użytkownika wykonujących zadania analizowania i fragmentowania. Ważne jest, aby zachować równowagę między tworzeniem partycji, które są wystarczająco małe do wydajnej równoległości, a nie tak małe, że obciążenie związane z zarządzaniem nimi przewyższa korzyści.
Liczbę partycji można dostosować przy użyciu polecenia df.repartitions(<number of partitions>)
. Podczas stosowania funkcji zdefiniowanych przez użytkownika należy dążyć do wielokrotności liczby rdzeni dostępnych w węzłach procesu roboczego. Na przykład w notesie 02_parse_docs można uwzględnić df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
utworzenie dwukrotnie większej liczby partycji jako liczby dostępnych rdzeni procesu roboczego. Zazwyczaj wielokrotności z zakresu od 1 do 3 powinny przynieść zadowalającą wydajność.
Ręczne uruchamianie potoku
Alternatywnie można uruchomić poszczególne notesy krok po kroku:
- Załaduj nieprzetworzone pliki przy użyciu notesu
01_load_files
. Spowoduje to zapisanie każdego pliku binarnego dokumentu jako jednego rekordu w tabeli z brązu (raw_files_table_name
) zdefiniowanej w plikudestination_tables_config
. Pliki są ładowane przyrostowo, przetwarzają tylko nowe dokumenty od ostatniego uruchomienia. - Przeanalizuj dokumenty za pomocą notesu
02_parse_docs
. Ten notes wykonujeparser_library
notes (upewnij się, że jest to pierwsza komórka, aby ponownie uruchomić język Python), dzięki czemu różne analizatory i powiązane narzędzia są dostępne. Następnie używa określonego analizatora w obiekciepipeline_config
, aby przeanalizować każdy dokument w postaci zwykłego tekstu. Na przykład przechwytywane są odpowiednie metadane, takie jak liczba stron oryginalnego pliku PDF obok analizowanego tekstu. Pomyślnie przeanalizowane dokumenty są przechowywane w srebrnej tabeli (parsed_docs_table_name
), podczas gdy wszystkie dokumenty nieparzyste są poddane kwarantannie w odpowiedniej tabeli. - Podziel przeanalizowane dokumenty przy użyciu notesu
03_chunk_docs
. Podobnie jak w przypadku analizowania, ten notes wykonujechunker_library
notes (ponownie uruchom go jako pierwszą komórkę). Dzieli on każdy przeanalizowany dokument na mniejsze fragmenty przy użyciu określonego fragmentatora z .pipeline_config
Każdy fragment ma przypisany unikatowy identyfikator przy użyciu skrótu MD5, który jest niezbędny do synchronizacji z indeksem wyszukiwania wektorowego. Ostatnie fragmenty są ładowane do złotej tabeli (chunked_docs_table_name
). - Utwórz/zsynchronizuj indeks wyszukiwania wektorów za pomocą polecenia
04_vector_index
. Ten notes weryfikuje gotowość określonego punktu końcowego wyszukiwania wektorów w plikuvectorsearch_config
. Jeśli skonfigurowany indeks już istnieje, inicjuje synchronizację z tabelą gold; w przeciwnym razie tworzy indeks i wyzwala synchronizację. Oczekuje się, że utworzenie punktu końcowego i indeksu wyszukiwania wektorów może zająć trochę czasu.
Następny krok
Przejdź do kroku 7. Wdrażanie i monitorowanie.