Krok 6 (kanály). Implementace oprav datového kanálu
Pokud chcete upravit datový kanál a spustit ho, postupujte takto:
- Vytvořte nový vektorový index.
- Vytvořte spuštění MLflow s metadaty datového kanálu.
Na výsledné spuštění MLflow odkazuje B_quality_iteration/02_evaluate_fixes
poznámkový blok.
Existují dva přístupy k úpravě datového kanálu:
- V tomto přístupu implementujete jednu opravu najednou , nakonfigurujete a spustíte jeden datový kanál najednou. Tento režim je nejlepší, pokud chcete vyzkoušet jeden model vkládání a otestovat jeden nový analyzátor. Databricks navrhuje začít odsud, abyste se seznámili s těmito poznámkovými bloky.
- V tomto přístupu implementujte více oprav najednou , označované také jako uklidit, paralelně spustíte několik datových kanálů, které mají jinou konfiguraci. Tento režim je nejlepší, pokud chcete "uklidit" napříč mnoha různými strategiemi, například vyhodnotit tři analyzátory PDF nebo vyhodnotit mnoho různých velikostí bloků dat.
Podívejte se na úložiště GitHub pro vzorový kód v této části.
Přístup 1: Implementace jediné opravy najednou
- Otevření poznámkového bloku B_quality_iteration/data_pipeline_fixes/single_fix/00_config
- Postupujte podle pokynů v některém z následujících kroků:
- Postupujte podle pokynů k implementaci nové konfigurace poskytované touto kuchařkou.
- Postupujte podle pokynů k implementaci vlastního kódu pro analýzu nebo blokování dat.
- Kanál spusťte buď:
- Otevření a spuštění poznámkového bloku 00_Run_Entire_Pipeline
- Postupujte podle kroků pro ruční spuštění jednotlivých kroků kanálu.
- Přidejte název výsledného běhu MLflow, který se vypíše do
DATA_PIPELINE_FIXES_RUN_NAMES
proměnné v poznámkovém bloku B_quality_iteration/02_evaluate_fixes
Poznámka:
Kanál pro přípravu dat využívá strukturované streamování Sparku k přírůstkovém načítání a zpracování souborů. To znamená, že soubory, které jsou už načtené a připravené, se sledují v kontrolních bodech a nebudou znovu zpracovány. Do odpovídajících tabulek se načtou, připraví a připojí pouze nově přidané soubory.
Proto pokud chcete znovu spustit celý kanál úplně od začátku a znovu zpracovat všechny dokumenty, musíte odstranit kontrolní body a tabulky. Toho můžete dosáhnout pomocí poznámkového bloku reset_tables_and_checkpoints .
Přístup 2: Implementace více oprav najednou
- Otevřete poznámkový blok B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
- Postupujte podle pokynů v poznámkovém bloku a přidejte dvě nebo více konfigurací datového kanálu, které se mají spustit.
- Spuštěním poznámkového bloku spusťte tyto kanály.
- Přidejte názvy výsledných spuštění MLflow, která jsou výstupem do
DATA_PIPELINE_FIXES_RUN_NAMES
proměnné v poznámkovém bloku B_quality_iteration/02_evaluate_fixes .
Dodatek
Poznámka:
Poznámkové bloky, na které se odkazuje níže, najdete v adresářích single_fix a multiple_fixes v závislosti na tom, jestli implementujete jednu opravu nebo více oprav najednou.
Podrobné informace o nastavení konfigurace
Níže jsou uvedeny různé předimplementované možnosti konfigurace datového kanálu. Alternativně můžete implementovat vlastní analyzátor nebo chunker.
vectorsearch_config
: Zadejte koncový bod vektorového vyhledávání (musí být spuštěný) a název indexu, který se má vytvořit. Kromě toho definujte typ synchronizace mezi zdrojovými tabulkami a indexem (výchozí hodnota jeTRIGGERED
).embedding_config
: Zadejte model vkládání, který se má použít, spolu s tokenizátorem. Úplný seznam možností najdete v poznámkovémsupporting_configs/embedding_models
bloku. Model vkládání se musí nasadit do spuštěného koncového bodu obsluhy modelu. V závislosti na strategii vytváření bloků dat je tokenizátor také během rozdělování, aby se zajistilo, že bloky dat nepřekročí limit tokenu modelu vkládání. Tokenizátory se zde používají k počítání počtu tokenů v textových blocích, aby se zajistilo, že nepřekročí maximální kontextové délky vybraného modelu vkládání.
Následující příklad ukazuje tokenizátor z HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
Následující příklad ukazuje tokenizátor z TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: Definuje analyzátor souborů, chunker a cestu k poli zdroje. Analyzátory a bloky dat jsou definovány vparser_library
chunker_library
poznámkových blocích a poznámkových blocích. Najdete je v adresářích single_fix a multiple_fixes . Úplný seznam možností najdete v poznámkovémsupporting_configs/parser_chunker_strategies
bloku, který je opět dostupný v adresářích s jednou i několika opravami. Různé analyzátory nebo bloky dat mohou vyžadovat různé parametry konfigurace, kde<param x>
představují potenciální parametry požadované pro konkrétní chunker. Analyzátory lze předat také konfigurační hodnoty ve stejném formátu.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementace vlastního analyzátoru nebo chunkeru
Tento projekt je strukturovaný tak, aby usnadnil přidání vlastních analyzátorů nebo bloků dat do kanálu pro přípravu dat.
Přidání nového analyzátoru
Předpokládejme, že chcete začlenit nový analyzátor pomocí knihovny PyMuPDF k transformaci parsovaného textu do formátu Markdownu. Postupujte následovně:
Nainstalujte požadované závislosti přidáním následujícího kódu do poznámkového
parser_library
bloku vsingle_fix
adresáři:multiple_fix
# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
V poznámkovém
parser_library
bloku v adresářisingle_fix
přidejtemultiple_fix
nový oddíl analyzátoruPyMuPdfMarkdown
a implementujte funkci analýzy. Ujistěte se, že výstup funkce odpovídáParserReturnValue
třídě definované na začátku poznámkového bloku. Tím se zajistí kompatibilita s funkcemi definovanými uživatelem Sparku. Nebotry
except
blokování brání Sparku v selhání celé úlohy analýzy kvůli chybám v jednotlivých dokumentech při použití analyzátoru jako funkce definované uživatelem v poznámkovém bloku v02_parse_docs
adresáři nebomultiple_fix
v poznámkovémsingle_fix
bloku. Tento poznámkový blok zkontroluje, jestli se analýza jakéhokoli dokumentu nezdařila, umístí odpovídající řádky do karantény a vyvolá upozornění.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Přidejte novou funkci analýzy do
parser_factory
poznámkovéhoparser_library
bloku v poznámkovém bloku nebosingle_fix
multiple_fix
v adresáři, abyste ji mohli konfigurovat vpipeline_config
poznámkovém00_config
bloku.V poznámkovém
02_parse_docs
bloku jsou funkce analyzátoru převedeny na uživatelem definované uživatelem Sparku v Pythonu (optimalizované pro Databricks Runtime 14.0 nebo novější) a použijí se u datového rámce obsahujícího nové binární soubory PDF. Pro účely testování a vývoje přidejte do poznámkového bloku parser_library jednoduchou testovací funkci, která načte soubor test-document.pdf a ověří úspěšné parsování:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Přidání nového chunkeru
Proces přidání nového chunkeru se řídí podobnými kroky, které jsou vysvětleny výše pro nový analyzátor.
- Do poznámkového bloku chunker_library přidejte požadované závislosti.
- Přidejte nový oddíl pro váš chunker a implementujte funkci,
chunk_parsed_content_newchunkername
například . Výstupem nové funkce chunker musí být slovník Pythonu, který odpovídáChunkerReturnValue
třídě definované na začátku poznámkového bloku chunker_library . Funkce by měla přijmout alespoň řetězec analyzovaného textu, který má být blokován. Pokud váš chunker vyžaduje další parametry, můžete je přidat jako parametry funkce. - Přidejte nový chunker do
chunker_factory
funkce definované v poznámkovém bloku chunker_library . Pokud vaše funkce přijímá další parametry, použijte k jejich předběžné konfiguraci část functools. To je nezbytné, protože funkce definované uživatelem přijímají pouze jeden vstupní parametr, což bude parsovaný text v našem případě. Umožňujechunker_factory
nakonfigurovat různé metody změn v pipeline_config a vrátit uživatelem definované uživatelem Spark Pythonu (optimalizované pro Databricks Runtime 14.0 a vyšší). - Přidejte jednoduchý testovací oddíl pro novou funkci bloků dat. Tento oddíl by měl obsahovat blok předdefinovaného textu zadaného jako řetězec.
Ladění výkonu
Spark využívá oddíly k paralelizaci zpracování. Data se dělí na bloky řádků a každý oddíl se ve výchozím nastavení zpracovává jedním jádrem. Pokud ale apache Spark načte data, nemusí vytvářet oddíly optimalizované pro požadovaný výpočet, zejména pro naše funkce definované uživatelem provádějící úlohy analýzy a vytváření bloků dat. Je zásadní zajistit rovnováhu mezi vytvářením oddílů, které jsou dostatečně malé pro efektivní paralelizaci, a ne tak malé, že režie při jejich správě převáží nad výhodami.
Počet oddílů můžete upravit pomocí df.repartitions(<number of partitions>)
. Při použití funkcí definovaných uživatelem se snažte dosáhnout násobku počtu jader dostupných na pracovních uzlech. Například v poznámkovém bloku 02_parse_docs můžete zahrnout df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
, abyste vytvořili dvakrát tolik oddílů jako počet dostupných pracovních jader. Obvykle by měl násobek mezi 1 a 3 přinést uspokojivý výkon.
Ruční spuštění kanálu
Případně můžete jednotlivé poznámkové bloky spustit postupně:
- Načtěte nezpracované soubory pomocí poznámkového
01_load_files
bloku. Tím se každý binární soubor dokumentu uloží jako jeden záznam v bronzové tabulce (raw_files_table_name
) definované v souborudestination_tables_config
. Soubory se načítají přírůstkově a od posledního spuštění se zpracovávají jenom nové dokumenty. - Parsujte dokumenty s poznámkovým blokem
02_parse_docs
. Tento poznámkový blok spustíparser_library
poznámkový blok (nezapomeňte ho spustit jako první buňku pro restartování Pythonu), aby byly k dispozici různé analyzátory a související nástroje. Potom pomocí zadaného analyzátorupipeline_config
v dokumentu parsuje každý dokument do prostého textu. Jako příklad se zachytí relevantní metadata, jako je počet stránek původního SOUBORU PDF spolu s analyzovaným textem. Úspěšně analyzované dokumenty jsou uloženy ve stříbrné tabulce (parsed_docs_table_name
), zatímco všechny neoddělené dokumenty jsou v karanténě do odpovídající tabulky. - Blokujte analyzované dokumenty pomocí poznámkového
03_chunk_docs
bloku. Podobně jako při analýze tento poznámkový blok spustíchunker_library
poznámkový blok (znovu se spustí jako první buňka). Rozdělí každý analyzovaný dokument na menší bloky pomocí zadaného chunkeru z objektupipeline_config
. Každému bloku je přiřazeno jedinečné ID pomocí hodnoty hash MD5, které je nezbytné pro synchronizaci s indexem vektorového vyhledávání. Poslední bloky dat se načtou do zlaté tabulky (chunked_docs_table_name
). - Vytvořte nebo synchronizujte index vektorového vyhledávání pomocí funkce
04_vector_index
. Tento poznámkový blok ověřuje připravenost zadaného koncového bodu hledání vektoru v objektuvectorsearch_config
. Pokud nakonfigurovaný index již existuje, zahájí synchronizaci se zlatou tabulkou; jinak vytvoří synchronizaci indexu a aktivuje synchronizaci. To by mělo nějakou dobu trvat, pokud se koncový bod a index vektorového vyhledávání ještě nevytvořily.
Další krok
Pokračujte krokem 7. Nasazení a monitorování