Stap 6 (pijplijnen). Oplossingen voor gegevenspijplijnen implementeren
Volg deze stappen om uw gegevenspijplijn te wijzigen en uit te voeren in:
- Maak een nieuwe vectorindex.
- Maak een MLflow-uitvoering met de metagegevens van de gegevenspijplijn.
Er wordt naar de resulterende MLflow-uitvoering verwezen door het B_quality_iteration/02_evaluate_fixes
notebook.
Er zijn twee benaderingen voor het wijzigen van de gegevenspijplijn:
- Implementeer één oplossing tegelijk In deze benadering configureert en voert u één gegevenspijplijn tegelijk uit. Deze modus is het beste als u één insluitmodel wilt uitproberen en één nieuwe parser wilt testen. Databricks stelt voor om hier te beginnen om vertrouwd te raken met deze notebooks.
- Implementeer meerdere oplossingen tegelijk In deze benadering, ook wel een opruimen genoemd, voert u, parallel, meerdere gegevenspijplijnen uit die elk een andere configuratie hebben. Deze modus is het beste als u wilt 'opruimen' in veel verschillende strategieën, bijvoorbeeld drie PDF-parsers evalueren of veel verschillende segmentgrootten evalueren.
Zie de GitHub-opslagplaats voor de voorbeeldcode in deze sectie.
Benadering 1: Één oplossing tegelijk implementeren
- Het notitieblok B_quality_iteration/data_pipeline_fixes/single_fix/00_config openen
- Volg de instructies in een van de onderstaande:
- Volg de instructies voor het implementeren van een nieuwe configuratie in deze zelfstudie.
- Volg de stappen voor het implementeren van aangepaste code voor een parsering of segmentering.
- Voer de pijplijn uit door:
- Het 00_Run_Entire_Pipeline notebook openen en uitvoeren.
- Volg de stappen om elke stap van de pijplijn handmatig uit te voeren.
- Voeg de naam toe van de resulterende MLflow-uitvoering die wordt uitgevoerd naar de
DATA_PIPELINE_FIXES_RUN_NAMES
variabele in B_quality_iteration/02_evaluate_fixes notebook
Notitie
De pijplijn voor gegevensvoorbereiding maakt gebruik van Spark Structured Streaming om incrementeel bestanden te laden en te verwerken. Dit houdt in dat bestanden die al zijn geladen en voorbereid, worden bijgehouden in controlepunten en niet opnieuw worden verwerkt. Alleen nieuw toegevoegde bestanden worden geladen, voorbereid en toegevoegd aan de bijbehorende tabellen.
Als u de hele pijplijn opnieuw wilt uitvoeren en alle documenten opnieuw wilt verwerken, moet u de controlepunten en tabellen verwijderen. U kunt dit doen met behulp van het reset_tables_and_checkpoints notebook.
Benadering 2: Meerdere oplossingen tegelijk implementeren
- Open het notitieblok B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
- Volg de instructies in het notebook om twee of meer configuraties van de gegevenspijplijn toe te voegen die moeten worden uitgevoerd.
- Voer het notebook uit om deze pijplijnen uit te voeren.
- Voeg de namen toe van de resulterende MLflow-uitvoeringen die worden uitgevoerd naar de
DATA_PIPELINE_FIXES_RUN_NAMES
variabele in B_quality_iteration/02_evaluate_fixes notebook.
Bijlage
Notitie
U vindt de notebooks waarnaar hieronder wordt verwezen in de single_fix en multiple_fixes mappen, afhankelijk van of u één oplossing of meerdere fixes tegelijk implementeert.
Uitgebreide informatie over configuratie-instellingen
Hieronder vindt u de verschillende vooraf geïmplementeerde configuratieopties voor de gegevenspijplijn. U kunt ook een aangepaste parser/chunker implementeren.
-
vectorsearch_config
: Geef het eindpunt voor vectorzoekopdrachten op (moet actief zijn) en de naam van de index die moet worden gemaakt. Definieer bovendien het synchronisatietype tussen de brontabel en de index (standaard isTRIGGERED
). -
embedding_config
: Geef het insluitmodel op dat moet worden gebruikt, samen met de tokenizer. Zie hetsupporting_configs/embedding_models
notebook voor een volledige lijst met opties. Het insluitmodel moet worden geïmplementeerd op een actief model voor eindpunt. Afhankelijk van de segmenteringsstrategie wordt de tokenizer ook gebruikt tijdens het splitsen om ervoor te zorgen dat de segmenten de tokenlimiet van het embeddingmodel niet overschrijden. Tokenizers worden hier gebruikt om het aantal tokens in de tekstsegmenten te tellen om ervoor te zorgen dat ze niet de maximale contextlengte van het geselecteerde insluitingsmodel overschrijden.
Hieronder ziet u een tokenizer van HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
Hieronder ziet u een tokenizer van TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
-
pipeline_config
: Definieert de bestandsparser, chunker en het pad naar het bronveld. Parsers en chunkers worden respectievelijk gedefinieerd in deparser_library
notitieblokken enchunker_library
notitieblokken. Deze zijn te vinden in de single_fix en multiple_fixes mappen. Zie voor een volledige lijst met opties hetsupporting_configs/parser_chunker_strategies
notebook, dat opnieuw beschikbaar is in zowel de mappen met één als meerdere oplossingen. Verschillende parsers of chunkers kunnen verschillende configuratieparameters vereisen, waarbij<param x>
de mogelijke parameters vertegenwoordigen die vereist zijn voor een specifieke chunker. Parsers kunnen ook configuratiewaarden worden doorgegeven met dezelfde indeling.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Een aangepaste parser/chunker implementeren
Dit project is gestructureerd om het toevoegen van aangepaste parsers of chunkers aan de pijplijn voor gegevensvoorbereiding te vergemakkelijken.
Een nieuwe parser toevoegen
Stel dat u een nieuwe parser wilt opnemen met behulp van de PyMuPDF-bibliotheek om geparseerde tekst te transformeren in Markdown-indeling. Volg vervolgens deze stappen:
Installeer de vereiste afhankelijkheden door de volgende code toe te voegen aan het
parser_library
notebook in desingle_fix
ofmultiple_fix
map:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
Voeg in het
parser_library
notitieblok in desingle_fix
ofmultiple_fix
map een nieuwe sectie toe voor dePyMuPdfMarkdown
parser en implementeer de parseringsfunctie. Zorg ervoor dat de uitvoer van de functie voldoet aan deParserReturnValue
klasse die aan het begin van het notebook is gedefinieerd. Dit zorgt voor compatibiliteit met Spark UDF's.try
Het ofexcept
blok voorkomt dat Spark de hele parseringstaak mislukt vanwege fouten in afzonderlijke documenten bij het toepassen van de parser als een UDF in02_parse_docs
notebook in desingle_fix
ofmultiple_fix
map. Met dit notitieblok wordt gecontroleerd of het parseren van documenten is mislukt, de bijbehorende rijen in quarantaine plaatst en een waarschuwing genereert.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Voeg de nieuwe parseringsfunctie toe aan het
parser_factory
parser_library
notitieblok in desingle_fix
ofmultiple_fix
map om deze te configureren in hetpipeline_config
00_config
notitieblok.In het
02_parse_docs
notebook worden parserfuncties omgezet in Spark Python UDF's (pijl geoptimaliseerd voor Databricks Runtime 14.0 of hoger) en toegepast op het dataframe met de nieuwe binaire PDF-bestanden. Voor testen en ontwikkelen voegt u een eenvoudige testfunctie toe aan het parser_library notebook waarmee het test-document.pdf-bestand wordt geladen en een geslaagde parsering wordt toegepast:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Een nieuwe chunker toevoegen
Het proces voor het toevoegen van een nieuwe chunker volgt vergelijkbare stappen als hierboven beschreven voor een nieuwe parser.
- Voeg de vereiste afhankelijkheden toe in het chunker_library notebook.
- Voeg een nieuwe sectie toe voor uw chunker en implementeer een functie, bijvoorbeeld
chunk_parsed_content_newchunkername
. De uitvoer van de nieuwe chunker-functie moet een Python-woordenlijst zijn die voldoet aan deChunkerReturnValue
klasse die is gedefinieerd aan het begin van het chunker_library notebook. De functie moet ten minste een tekenreeks van de geparseerde tekst accepteren die moet worden gesegmenteerd. Als uw chunker aanvullende parameters vereist, kunt u ze toevoegen als functieparameters. - Voeg uw nieuwe chunker toe aan de
chunker_factory
functie die is gedefinieerd in het chunker_library notebook. Als uw functie aanvullende parameters accepteert, gebruikt u gedeeltelijke van functools om deze vooraf te configureren. Dit is nodig omdat UDF's slechts één invoerparameter accepteren. Dit is de geparseerde tekst in ons geval. Hiermeechunker_factory
kunt u verschillende chunker-methoden in de pipeline_config configureren en een Spark Python UDF retourneren (geoptimaliseerd voor Databricks Runtime 14.0 en hoger). - Voeg een eenvoudige testsectie toe voor de nieuwe segmenteringsfunctie. In deze sectie moet een vooraf gedefinieerde tekst worden gesegmenteerd die is opgegeven als een tekenreeks.
Prestaties afstemmen
Spark maakt gebruik van partities om de verwerking te parallelliseren. Gegevens worden onderverdeeld in segmenten rijen en elke partitie wordt standaard verwerkt door één kern. Wanneer gegevens echter in eerste instantie worden gelezen door Apache Spark, worden er mogelijk geen partities gemaakt die zijn geoptimaliseerd voor de gewenste berekening, met name voor onze UDF's die parserings- en segmenteringstaken uitvoeren. Het is van cruciaal belang om een evenwicht te vinden tussen het maken van partities die klein genoeg zijn voor efficiënte parallelle uitvoering en niet zo klein dat de overhead van het beheren ervan opweegt tegen de voordelen.
U kunt het aantal partities aanpassen met behulp van df.repartitions(<number of partitions>)
. Bij het toepassen van UDF's moet u streven naar een veelvoud van het aantal kernen dat beschikbaar is op de werkknooppunten. In het 02_parse_docs notebook kunt df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
u bijvoorbeeld twee keer zoveel partities maken als het aantal beschikbare werkkernen. Normaal gesproken moet een veelvoud tussen 1 en 3 bevredigende prestaties opleveren.
De pijplijn handmatig uitvoeren
U kunt ook elke afzonderlijke notebook stapsgewijs uitvoeren:
-
Laad de onbewerkte bestanden met behulp van het
01_load_files
notebook. Hiermee wordt elk binair document opgeslagen als één record in een bronstabel (raw_files_table_name
) die is gedefinieerd in dedestination_tables_config
. Bestanden worden incrementeel geladen en verwerken alleen nieuwe documenten sinds de laatste uitvoering. -
De documenten parseren met het
02_parse_docs
notitieblok. Dit notebook voert hetparser_library
notebook uit (zorg ervoor dat u dit uitvoert als de eerste cel om Python opnieuw op te starten), waardoor verschillende parsers en gerelateerde hulpprogramma's beschikbaar zijn. Vervolgens wordt de opgegeven parser in hetpipeline_config
document gebruikt om elk document te parseren in tekst zonder opmaak. Als voorbeeld worden relevante metagegevens, zoals het aantal pagina's van het oorspronkelijke PDF-bestand, naast de geparseerde tekst vastgelegd. Geparseerde documenten worden opgeslagen in een zilveren tabel (parsed_docs_table_name
), terwijl niet-geparseerde documenten in quarantaine worden geplaatst in een bijbehorende tabel. -
Segmenter de geparseerde documenten met behulp van het
03_chunk_docs
notitieblok. Net als bij parseren wordt hetchunker_library
notebook uitgevoerd (opnieuw uitgevoerd als de eerste cel). Het splitst elk geparseerd document in kleinere segmenten met behulp van de opgegeven chunker van depipeline_config
. Aan elk segment wordt een unieke id toegewezen met behulp van een MD5-hash, die nodig is voor synchronisatie met de vectorzoekindex. De laatste delen worden geladen in een gouden tabel (chunked_docs_table_name
). -
de vectorzoekindex maken/synchroniseren met de
04_vector_index
. Dit notebook controleert de gereedheid van het opgegeven vectorzoekeindpunt in devectorsearch_config
. Als de geconfigureerde index al bestaat, wordt synchronisatie gestart met de gouden tabel; anders wordt de index gemaakt en wordt synchronisatie geactiveerd. Dit duurt naar verwachting enige tijd als het Vector Search-eindpunt en de index nog niet zijn gemaakt.
Volgende stap
Ga verder met stap 7. Implementeren en bewaken.
< vorige: stap 6. Kwaliteitsproblemen iteratief oplossen
Volgende: stap 7. De POC-> & bewaken