Sdílet prostřednictvím


Krok 6 (kanály). Implementace oprav datového kanálu

Datový kanál

Pokud chcete upravit datový kanál a spustit ho, postupujte takto:

  1. Vytvořte nový vektorový index.
  2. Vytvořte spuštění MLflow s metadaty datového kanálu.

Na výsledné spuštění MLflow odkazuje B_quality_iteration/02_evaluate_fixes poznámkový blok.

Existují dva přístupy k úpravě datového kanálu:

  • V tomto přístupu implementujete jednu opravu najednou , nakonfigurujete a spustíte jeden datový kanál najednou. Tento režim je nejlepší, pokud chcete vyzkoušet jeden model vkládání a otestovat jeden nový analyzátor. Databricks navrhuje začít odsud, abyste se seznámili s těmito poznámkovými bloky.
  • V tomto přístupu implementujte více oprav najednou , označované také jako uklidit, paralelně spustíte několik datových kanálů, které mají jinou konfiguraci. Tento režim je nejlepší, pokud chcete "uklidit" napříč mnoha různými strategiemi, například vyhodnotit tři analyzátory PDF nebo vyhodnotit mnoho různých velikostí bloků dat.

Podívejte se na úložiště GitHub pro vzorový kód v této části.

Přístup 1: Implementace jediné opravy najednou

  1. Otevření poznámkového bloku B_quality_iteration/data_pipeline_fixes/single_fix/00_config
  2. Postupujte podle pokynů v některém z následujících kroků:
  3. Kanál spusťte buď:
  4. Přidejte název výsledného běhu MLflow, který se vypíše do DATA_PIPELINE_FIXES_RUN_NAMES proměnné v poznámkovém bloku B_quality_iteration/02_evaluate_fixes

Poznámka:

Kanál pro přípravu dat využívá strukturované streamování Sparku k přírůstkovém načítání a zpracování souborů. To znamená, že soubory, které jsou už načtené a připravené, se sledují v kontrolních bodech a nebudou znovu zpracovány. Do odpovídajících tabulek se načtou, připraví a připojí pouze nově přidané soubory.

Proto pokud chcete znovu spustit celý kanál úplně od začátku a znovu zpracovat všechny dokumenty, musíte odstranit kontrolní body a tabulky. Toho můžete dosáhnout pomocí poznámkového bloku reset_tables_and_checkpoints .

Přístup 2: Implementace více oprav najednou

  1. Otevřete poznámkový blok B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
  2. Postupujte podle pokynů v poznámkovém bloku a přidejte dvě nebo více konfigurací datového kanálu, které se mají spustit.
  3. Spuštěním poznámkového bloku spusťte tyto kanály.
  4. Přidejte názvy výsledných spuštění MLflow, která jsou výstupem do DATA_PIPELINE_FIXES_RUN_NAMES proměnné v poznámkovém bloku B_quality_iteration/02_evaluate_fixes .

Dodatek

Poznámka:

Poznámkové bloky, na které se odkazuje níže, najdete v adresářích single_fix a multiple_fixes v závislosti na tom, jestli implementujete jednu opravu nebo více oprav najednou.

Podrobné informace o nastavení konfigurace

Níže jsou uvedeny různé předimplementované možnosti konfigurace datového kanálu. Alternativně můžete implementovat vlastní analyzátor nebo chunker.

  • vectorsearch_config: Zadejte koncový bod vektorového vyhledávání (musí být spuštěný) a název indexu, který se má vytvořit. Kromě toho definujte typ synchronizace mezi zdrojovými tabulkami a indexem (výchozí hodnota je TRIGGERED).
  • embedding_config: Zadejte model vkládání, který se má použít, spolu s tokenizátorem. Úplný seznam možností najdete v poznámkovém supporting_configs/embedding_models bloku. Model vkládání se musí nasadit do spuštěného koncového bodu obsluhy modelu. V závislosti na strategii vytváření bloků dat je tokenizátor také během rozdělování, aby se zajistilo, že bloky dat nepřekročí limit tokenu modelu vkládání. Tokenizátory se zde používají k počítání počtu tokenů v textových blocích, aby se zajistilo, že nepřekročí maximální kontextové délky vybraného modelu vkládání.

Následující příklad ukazuje tokenizátor z HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Následující příklad ukazuje tokenizátor z TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: Definuje analyzátor souborů, chunker a cestu k poli zdroje. Analyzátory a bloky dat jsou definovány v parser_library chunker_library poznámkových blocích a poznámkových blocích. Najdete je v adresářích single_fix a multiple_fixes . Úplný seznam možností najdete v poznámkovém supporting_configs/parser_chunker_strategies bloku, který je opět dostupný v adresářích s jednou i několika opravami. Různé analyzátory nebo bloky dat mohou vyžadovat různé parametry konfigurace, kde <param x> představují potenciální parametry požadované pro konkrétní chunker. Analyzátory lze předat také konfigurační hodnoty ve stejném formátu.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementace vlastního analyzátoru nebo chunkeru

Tento projekt je strukturovaný tak, aby usnadnil přidání vlastních analyzátorů nebo bloků dat do kanálu pro přípravu dat.

Přidání nového analyzátoru

Předpokládejme, že chcete začlenit nový analyzátor pomocí knihovny PyMuPDF k transformaci parsovaného textu do formátu Markdownu. Postupujte následovně:

  1. Nainstalujte požadované závislosti přidáním následujícího kódu do poznámkového parser_library bloku v single_fix adresáři:multiple_fix

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. V poznámkovém parser_library bloku v adresáři single_fix přidejte multiple_fix nový oddíl analyzátoru PyMuPdfMarkdown a implementujte funkci analýzy. Ujistěte se, že výstup funkce odpovídá ParserReturnValue třídě definované na začátku poznámkového bloku. Tím se zajistí kompatibilita s funkcemi definovanými uživatelem Sparku. Nebo try except blokování brání Sparku v selhání celé úlohy analýzy kvůli chybám v jednotlivých dokumentech při použití analyzátoru jako funkce definované uživatelem v poznámkovém bloku v 02_parse_docs adresáři nebo multiple_fix v poznámkovém single_fix bloku. Tento poznámkový blok zkontroluje, jestli se analýza jakéhokoli dokumentu nezdařila, umístí odpovídající řádky do karantény a vyvolá upozornění.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Přidejte novou funkci analýzy do parser_factory poznámkového parser_library bloku v poznámkovém bloku nebo single_fix multiple_fix v adresáři, abyste ji mohli konfigurovat v pipeline_config poznámkovém 00_config bloku.

  4. V poznámkovém 02_parse_docs bloku jsou funkce analyzátoru převedeny na uživatelem definované uživatelem Sparku v Pythonu (optimalizované pro Databricks Runtime 14.0 nebo novější) a použijí se u datového rámce obsahujícího nové binární soubory PDF. Pro účely testování a vývoje přidejte do poznámkového bloku parser_library jednoduchou testovací funkci, která načte soubor test-document.pdf a ověří úspěšné parsování:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Přidání nového chunkeru

Proces přidání nového chunkeru se řídí podobnými kroky, které jsou vysvětleny výše pro nový analyzátor.

  1. Do poznámkového bloku chunker_library přidejte požadované závislosti.
  2. Přidejte nový oddíl pro váš chunker a implementujte funkci, chunk_parsed_content_newchunkernamenapříklad . Výstupem nové funkce chunker musí být slovník Pythonu, který odpovídá ChunkerReturnValue třídě definované na začátku poznámkového bloku chunker_library . Funkce by měla přijmout alespoň řetězec analyzovaného textu, který má být blokován. Pokud váš chunker vyžaduje další parametry, můžete je přidat jako parametry funkce.
  3. Přidejte nový chunker do chunker_factory funkce definované v poznámkovém bloku chunker_library . Pokud vaše funkce přijímá další parametry, použijte k jejich předběžné konfiguraci část functools. To je nezbytné, protože funkce definované uživatelem přijímají pouze jeden vstupní parametr, což bude parsovaný text v našem případě. Umožňuje chunker_factory nakonfigurovat různé metody změn v pipeline_config a vrátit uživatelem definované uživatelem Spark Pythonu (optimalizované pro Databricks Runtime 14.0 a vyšší).
  4. Přidejte jednoduchý testovací oddíl pro novou funkci bloků dat. Tento oddíl by měl obsahovat blok předdefinovaného textu zadaného jako řetězec.

Ladění výkonu

Spark využívá oddíly k paralelizaci zpracování. Data se dělí na bloky řádků a každý oddíl se ve výchozím nastavení zpracovává jedním jádrem. Pokud ale apache Spark načte data, nemusí vytvářet oddíly optimalizované pro požadovaný výpočet, zejména pro naše funkce definované uživatelem provádějící úlohy analýzy a vytváření bloků dat. Je zásadní zajistit rovnováhu mezi vytvářením oddílů, které jsou dostatečně malé pro efektivní paralelizaci, a ne tak malé, že režie při jejich správě převáží nad výhodami.

Počet oddílů můžete upravit pomocí df.repartitions(<number of partitions>). Při použití funkcí definovaných uživatelem se snažte dosáhnout násobku počtu jader dostupných na pracovních uzlech. Například v poznámkovém bloku 02_parse_docs můžete zahrnout df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) , abyste vytvořili dvakrát tolik oddílů jako počet dostupných pracovních jader. Obvykle by měl násobek mezi 1 a 3 přinést uspokojivý výkon.

Ruční spuštění kanálu

Případně můžete jednotlivé poznámkové bloky spustit postupně:

  1. Načtěte nezpracované soubory pomocí poznámkového 01_load_files bloku. Tím se každý binární soubor dokumentu uloží jako jeden záznam v bronzové tabulce (raw_files_table_name) definované v souboru destination_tables_config. Soubory se načítají přírůstkově a od posledního spuštění se zpracovávají jenom nové dokumenty.
  2. Parsujte dokumenty s poznámkovým blokem 02_parse_docs . Tento poznámkový blok spustí parser_library poznámkový blok (nezapomeňte ho spustit jako první buňku pro restartování Pythonu), aby byly k dispozici různé analyzátory a související nástroje. Potom pomocí zadaného analyzátoru pipeline_config v dokumentu parsuje každý dokument do prostého textu. Jako příklad se zachytí relevantní metadata, jako je počet stránek původního SOUBORU PDF spolu s analyzovaným textem. Úspěšně analyzované dokumenty jsou uloženy ve stříbrné tabulce (parsed_docs_table_name), zatímco všechny neoddělené dokumenty jsou v karanténě do odpovídající tabulky.
  3. Blokujte analyzované dokumenty pomocí poznámkového 03_chunk_docs bloku. Podobně jako při analýze tento poznámkový blok spustí chunker_library poznámkový blok (znovu se spustí jako první buňka). Rozdělí každý analyzovaný dokument na menší bloky pomocí zadaného chunkeru z objektu pipeline_config. Každému bloku je přiřazeno jedinečné ID pomocí hodnoty hash MD5, které je nezbytné pro synchronizaci s indexem vektorového vyhledávání. Poslední bloky dat se načtou do zlaté tabulky (chunked_docs_table_name).
  4. Vytvořte nebo synchronizujte index vektorového vyhledávání pomocí funkce 04_vector_index. Tento poznámkový blok ověřuje připravenost zadaného koncového bodu hledání vektoru v objektu vectorsearch_config. Pokud nakonfigurovaný index již existuje, zahájí synchronizaci se zlatou tabulkou; jinak vytvoří synchronizaci indexu a aktivuje synchronizaci. To by mělo nějakou dobu trvat, pokud se koncový bod a index vektorového vyhledávání ještě nevytvořily.

Další krok

Pokračujte krokem 7. Nasazení a monitorování