Udostępnij za pośrednictwem


Krok 6 (potoki). Implementowanie poprawek potoku danych

Potok danych

Wykonaj następujące kroki, aby zmodyfikować potok danych i uruchomić go w celu:

  1. Utwórz nowy indeks wektorowy.
  2. Utwórz przebieg platformy MLflow z metadanymi potoku danych.

Wynikowy przebieg MLflow jest przywoływany B_quality_iteration/02_evaluate_fixes przez notes.

Istnieją dwa podejścia do modyfikowania potoku danych:

  • Zaimplementuj pojedynczą poprawkę w czasie W tym podejściu skonfigurujesz i uruchomisz jeden potok danych jednocześnie. Ten tryb jest najlepszy, jeśli chcesz wypróbować pojedynczy model osadzania i przetestować pojedynczy nowy analizator. Usługa Databricks sugeruje rozpoczęcie pracy z tymi notesami.
  • Zaimplementuj wiele poprawek jednocześnie W tym podejściu, nazywanym również zamiataniem, uruchamiasz równolegle wiele potoków danych, z których każda ma inną konfigurację. Ten tryb jest najlepszy, jeśli chcesz "zamiatać" w wielu różnych strategiach, na przykład ocenić trzy analizatory PLIKÓW PDF lub ocenić wiele różnych rozmiarów fragmentów.

Zobacz repozytorium GitHub, aby zapoznać się z przykładowym kodem w tej sekcji.

Podejście 1. Implementowanie pojedynczej poprawki naraz

  1. Otwórz notes B_quality_iteration/data_pipeline_fixes/single_fix/00_config
  2. Postępuj zgodnie z instrukcjami w jednym z poniższych elementów:
  3. Uruchom potok, wykonując jedną z następujących czynności:
    • Otwieranie i uruchamianie notesu 00_Run_Entire_Pipeline .
    • Wykonaj kroki ręcznego uruchamiania każdego kroku potoku.
  4. Dodaj nazwę wynikowego przebiegu MLflow, który jest zwracany do zmiennej w notesie DATA_PIPELINE_FIXES_RUN_NAMES B_quality_iteration/02_evaluate_fixes

Uwaga

Potok przygotowywania danych wykorzystuje przesyłanie strumieniowe ze strukturą platformy Spark w celu przyrostowego ładowania i przetwarzania plików. Oznacza to, że pliki już załadowane i przygotowane są śledzone w punktach kontrolnych i nie będą ponownie przetwarzane. Tylko nowo dodane pliki zostaną załadowane, przygotowane i dołączone do odpowiednich tabel.

W związku z tym, jeśli chcesz ponownie uruchomić cały potok od podstaw i ponownie przetworzyć wszystkie dokumenty, musisz usunąć punkty kontrolne i tabele. Można to zrobić za pomocą notesu reset_tables_and_checkpoints .

Podejście 2. Implementowanie wielu poprawek jednocześnie

  1. Otwórz notes B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
  2. Postępuj zgodnie z instrukcjami w notesie, aby dodać co najmniej dwie konfiguracje potoku danych do uruchomienia.
  3. Uruchom notes, aby wykonać te potoki.
  4. Dodaj nazwy wynikowych przebiegów MLflow, które są zwracane do zmiennej w notesie DATA_PIPELINE_FIXES_RUN_NAMES B_quality_iteration/02_evaluate_fixes.

Dodatek

Uwaga

Notesy, do których odwołujesz się poniżej, można znaleźć w katalogach single_fix i multiple_fixes w zależności od tego, czy wdrażasz pojedynczą poprawkę, czy wiele poprawek naraz.

Szczegółowe omówienie ustawień konfiguracji

Poniżej wymieniono różne wstępnie zaimplementowane opcje konfiguracji dla potoku danych. Alternatywnie można zaimplementować niestandardowy analizator/fragmenter.

  • vectorsearch_config: Określ punkt końcowy wyszukiwania wektorowego (musi być uruchomiony i uruchomiony) oraz nazwę indeksu, który ma zostać utworzony. Ponadto zdefiniuj typ synchronizacji między tabelą źródłową a indeksem (wartość domyślna to TRIGGERED).
  • embedding_config: Określ model osadzania do użycia wraz z tokenizatorem. Aby uzyskać pełną listę opcji, zobacz supporting_configs/embedding_models notes. Model osadzania musi zostać wdrożony w uruchomionym punkcie końcowym obsługującym model. W zależności od strategii fragmentowania tokenizer jest również podczas dzielenia, aby upewnić się, że fragmenty nie przekraczają limitu tokenu modelu osadzania. Tokenizatory są używane w tym miejscu do zliczania tokenów we fragmentach tekstu w celu zapewnienia, że nie przekraczają maksymalnej długości kontekstu wybranego modelu osadzania.

Poniżej przedstawiono tokenizator z aplikacji HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Poniżej przedstawiono tokenizer z TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: definiuje analizator plików, fragmenter i ścieżkę do pola źródła. Analizatory i fragmenty są definiowane odpowiednio w parser_library notesach i chunker_library . Można je znaleźć w katalogach single_fix i multiple_fixes . Aby uzyskać pełną listę opcji, zobacz supporting_configs/parser_chunker_strategies notes, który jest ponownie dostępny w jednym i wielu katalogach poprawek. Różne analizatory lub fragmenty mogą wymagać różnych parametrów konfiguracji, w których <param x> reprezentują potencjalne parametry wymagane dla określonego fragmentatora. Analizatory można również przekazywać wartości konfiguracji przy użyciu tego samego formatu.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementowanie niestandardowego analizatora/fragmentatora

Ten projekt ma strukturę ułatwiania dodawania niestandardowych analizatorów lub fragmentatorów do potoku przygotowywania danych.

Dodawanie nowego analizatora

Załóżmy, że chcesz dołączyć nowy analizator przy użyciu biblioteki PyMuPDF, aby przekształcić przeanalizowany tekst w format markdown. Wykonaj te kroki:

  1. Zainstaluj wymagane zależności, dodając następujący kod do notesu parser_library single_fix w katalogu or multiple_fix :

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. W notesie parser_library single_fix w katalogu or multiple_fix dodaj nową sekcję analizatora PyMuPdfMarkdown i zaimplementuj funkcję analizowania. Upewnij się, że dane wyjściowe funkcji są zgodne z klasą zdefiniowaną ParserReturnValue na początku notesu. Zapewnia to zgodność z funkcjami zdefiniowanymi przez użytkownika platformy Spark. Blok try or except uniemożliwia platformie Spark niepowodzenie całego zadania analizowania z powodu błędów w poszczególnych dokumentach podczas stosowania analizatora jako funkcji zdefiniowanej przez użytkownika w notesie single_fix w 02_parse_docs katalogu lubmultiple_fix. Ten notes sprawdzi, czy analizowanie nie powiodło się dla dowolnego dokumentu, poddać kwarantannie odpowiednie wiersze i zgłosić ostrzeżenie.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Dodaj nową funkcję analizowania do parser_factory elementu w notesie parser_library single_fix w katalogu lub multiple_fix , aby umożliwić jego skonfigurowanie w pipeline_config notesie 00_config .

  4. W notesie 02_parse_docs funkcje analizatora są przekształcane w funkcje zdefiniowane przez użytkownika platformy Spark w języku Python (zoptymalizowane pod kątem strzałki dla środowiska Databricks Runtime 14.0 lub nowszego) i stosowane do ramki danych zawierającej nowe binarne pliki PDF. Na potrzeby testowania i programowania dodaj prostą funkcję testowania do notesu parser_library, który ładuje plik test-document.pdf i potwierdza pomyślne analizowanie:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Dodawanie nowego fragmentatora

Proces dodawania nowego fragmentatora jest zgodny z opisanymi powyżej krokami dotyczącymi nowego analizatora.

  1. Dodaj wymagane zależności w notesie chunker_library .
  2. Dodaj nową sekcję dla fragmentatora i zaimplementuj funkcję, np. chunk_parsed_content_newchunkername. Dane wyjściowe nowej funkcji fragmentatora muszą być słownikiem języka Python zgodnym z ChunkerReturnValue klasą zdefiniowaną na początku notesu chunker_library . Funkcja powinna akceptować co najmniej ciąg analizowanego tekstu, który ma być fragmentowany. Jeśli fragmentator wymaga dodatkowych parametrów, możesz dodać je jako parametry funkcji.
  3. Dodaj nowy fragment do chunker_factory funkcji zdefiniowanej w notesie chunker_library . Jeśli funkcja akceptuje dodatkowe parametry, użyj częściowych narzędzi functools, aby je wstępnie skonfigurować. Jest to konieczne, ponieważ funkcje zdefiniowane przez użytkownika akceptują tylko jeden parametr wejściowy, który będzie analizowany tekst w naszym przypadku. Funkcja chunker_factory umożliwia konfigurowanie różnych metod fragmenterów w pipeline_config i zwraca funkcję UDF platformy Spark w języku Python (zoptymalizowaną pod kątem środowiska Databricks Runtime 14.0 lub nowszego).
  4. Dodaj prostą sekcję testowania dla nowej funkcji fragmentowania. Ta sekcja powinna zawierać wstępnie zdefiniowany tekst podany jako ciąg.

Dostrajanie wydajności

Platforma Spark wykorzystuje partycje do równoległego przetwarzania. Dane są podzielone na fragmenty wierszy, a każda partycja jest domyślnie przetwarzana przez jeden rdzeń. Jednak gdy dane są początkowo odczytywane przez platformę Apache Spark, mogą nie tworzyć partycji zoptymalizowanych pod kątem żądanych obliczeń, szczególnie w przypadku funkcji zdefiniowanych przez użytkownika wykonujących zadania analizowania i fragmentowania. Ważne jest, aby zachować równowagę między tworzeniem partycji, które są wystarczająco małe do wydajnej równoległości, a nie tak małe, że obciążenie związane z zarządzaniem nimi przewyższa korzyści.

Liczbę partycji można dostosować przy użyciu polecenia df.repartitions(<number of partitions>). Podczas stosowania funkcji zdefiniowanych przez użytkownika należy dążyć do wielokrotności liczby rdzeni dostępnych w węzłach procesu roboczego. Na przykład w notesie 02_parse_docs można uwzględnić df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) utworzenie dwukrotnie większej liczby partycji jako liczby dostępnych rdzeni procesu roboczego. Zazwyczaj wielokrotności z zakresu od 1 do 3 powinny przynieść zadowalającą wydajność.

Ręczne uruchamianie potoku

Alternatywnie można uruchomić poszczególne notesy krok po kroku:

  1. Załaduj nieprzetworzone pliki przy użyciu notesu 01_load_files . Spowoduje to zapisanie każdego pliku binarnego dokumentu jako jednego rekordu w tabeli z brązu (raw_files_table_name) zdefiniowanej w pliku destination_tables_config. Pliki są ładowane przyrostowo, przetwarzają tylko nowe dokumenty od ostatniego uruchomienia.
  2. Przeanalizuj dokumenty za pomocą notesu 02_parse_docs . Ten notes wykonuje parser_library notes (upewnij się, że jest to pierwsza komórka, aby ponownie uruchomić język Python), dzięki czemu różne analizatory i powiązane narzędzia są dostępne. Następnie używa określonego analizatora w obiekcie pipeline_config , aby przeanalizować każdy dokument w postaci zwykłego tekstu. Na przykład przechwytywane są odpowiednie metadane, takie jak liczba stron oryginalnego pliku PDF obok analizowanego tekstu. Pomyślnie przeanalizowane dokumenty są przechowywane w srebrnej tabeli (parsed_docs_table_name), podczas gdy wszystkie dokumenty nieparzyste są poddane kwarantannie w odpowiedniej tabeli.
  3. Podziel przeanalizowane dokumenty przy użyciu notesu 03_chunk_docs . Podobnie jak w przypadku analizowania, ten notes wykonuje chunker_library notes (ponownie uruchom go jako pierwszą komórkę). Dzieli on każdy przeanalizowany dokument na mniejsze fragmenty przy użyciu określonego fragmentatora z .pipeline_config Każdy fragment ma przypisany unikatowy identyfikator przy użyciu skrótu MD5, który jest niezbędny do synchronizacji z indeksem wyszukiwania wektorowego. Ostatnie fragmenty są ładowane do złotej tabeli (chunked_docs_table_name).
  4. Utwórz/zsynchronizuj indeks wyszukiwania wektorów za pomocą polecenia 04_vector_index. Ten notes weryfikuje gotowość określonego punktu końcowego wyszukiwania wektorów w pliku vectorsearch_config. Jeśli skonfigurowany indeks już istnieje, inicjuje synchronizację z tabelą gold; w przeciwnym razie tworzy indeks i wyzwala synchronizację. Oczekuje się, że utworzenie punktu końcowego i indeksu wyszukiwania wektorów może zająć trochę czasu.

Następny krok

Przejdź do kroku 7. Wdrażanie i monitorowanie.