Delen via


Stap 6 (pijplijnen). Oplossingen voor gegevenspijplijnen implementeren

Gegevenspijplijn

Volg deze stappen om uw gegevenspijplijn te wijzigen en uit te voeren in:

  1. Maak een nieuwe vectorindex.
  2. Maak een MLflow-uitvoering met de metagegevens van de gegevenspijplijn.

Er wordt naar de resulterende MLflow-uitvoering verwezen door het B_quality_iteration/02_evaluate_fixes notebook.

Er zijn twee benaderingen voor het wijzigen van de gegevenspijplijn:

  • Implementeer één oplossing tegelijk In deze benadering configureert en voert u één gegevenspijplijn tegelijk uit. Deze modus is het beste als u één insluitmodel wilt uitproberen en één nieuwe parser wilt testen. Databricks stelt voor om hier te beginnen om vertrouwd te raken met deze notebooks.
  • Implementeer meerdere oplossingen tegelijk In deze benadering, ook wel een opruimen genoemd, voert u, parallel, meerdere gegevenspijplijnen uit die elk een andere configuratie hebben. Deze modus is het beste als u wilt 'opruimen' in veel verschillende strategieën, bijvoorbeeld drie PDF-parsers evalueren of veel verschillende segmentgrootten evalueren.

Zie de GitHub-opslagplaats voor de voorbeeldcode in deze sectie.

Benadering 1: Één oplossing tegelijk implementeren

  1. Het notitieblok B_quality_iteration/data_pipeline_fixes/single_fix/00_config openen
  2. Volg de instructies in een van de onderstaande:
  3. Voer de pijplijn uit door:
  4. Voeg de naam toe van de resulterende MLflow-uitvoering die wordt uitgevoerd naar de DATA_PIPELINE_FIXES_RUN_NAMES variabele in B_quality_iteration/02_evaluate_fixes notebook

Notitie

De pijplijn voor gegevensvoorbereiding maakt gebruik van Spark Structured Streaming om incrementeel bestanden te laden en te verwerken. Dit houdt in dat bestanden die al zijn geladen en voorbereid, worden bijgehouden in controlepunten en niet opnieuw worden verwerkt. Alleen nieuw toegevoegde bestanden worden geladen, voorbereid en toegevoegd aan de bijbehorende tabellen.

Als u de hele pijplijn opnieuw wilt uitvoeren en alle documenten opnieuw wilt verwerken, moet u de controlepunten en tabellen verwijderen. U kunt dit doen met behulp van het reset_tables_and_checkpoints notebook.

Benadering 2: Meerdere oplossingen tegelijk implementeren

  1. Open het notitieblok B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
  2. Volg de instructies in het notebook om twee of meer configuraties van de gegevenspijplijn toe te voegen die moeten worden uitgevoerd.
  3. Voer het notebook uit om deze pijplijnen uit te voeren.
  4. Voeg de namen toe van de resulterende MLflow-uitvoeringen die worden uitgevoerd naar de DATA_PIPELINE_FIXES_RUN_NAMES variabele in B_quality_iteration/02_evaluate_fixes notebook.

Bijlage

Notitie

U vindt de notebooks waarnaar hieronder wordt verwezen in de single_fix en multiple_fixes mappen, afhankelijk van of u één oplossing of meerdere fixes tegelijk implementeert.

Uitgebreide informatie over configuratie-instellingen

Hieronder vindt u de verschillende vooraf geïmplementeerde configuratieopties voor de gegevenspijplijn. U kunt ook een aangepaste parser/chunker implementeren.

  • vectorsearch_config: Geef het eindpunt voor vectorzoekopdrachten op (moet actief zijn) en de naam van de index die moet worden gemaakt. Definieer bovendien het synchronisatietype tussen de brontabel en de index (standaard is TRIGGERED).
  • embedding_config: Geef het insluitmodel op dat moet worden gebruikt, samen met de tokenizer. Zie het supporting_configs/embedding_models notebook voor een volledige lijst met opties. Het insluitmodel moet worden geïmplementeerd op een actief model voor eindpunt. Afhankelijk van de segmenteringsstrategie wordt de tokenizer ook gebruikt tijdens het splitsen om ervoor te zorgen dat de segmenten de tokenlimiet van het embeddingmodel niet overschrijden. Tokenizers worden hier gebruikt om het aantal tokens in de tekstsegmenten te tellen om ervoor te zorgen dat ze niet de maximale contextlengte van het geselecteerde insluitingsmodel overschrijden.

Hieronder ziet u een tokenizer van HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Hieronder ziet u een tokenizer van TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: Definieert de bestandsparser, chunker en het pad naar het bronveld. Parsers en chunkers worden respectievelijk gedefinieerd in de parser_library notitieblokken en chunker_library notitieblokken. Deze zijn te vinden in de single_fix en multiple_fixes mappen. Zie voor een volledige lijst met opties het supporting_configs/parser_chunker_strategies notebook, dat opnieuw beschikbaar is in zowel de mappen met één als meerdere oplossingen. Verschillende parsers of chunkers kunnen verschillende configuratieparameters vereisen, waarbij <param x> de mogelijke parameters vertegenwoordigen die vereist zijn voor een specifieke chunker. Parsers kunnen ook configuratiewaarden worden doorgegeven met dezelfde indeling.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Een aangepaste parser/chunker implementeren

Dit project is gestructureerd om het toevoegen van aangepaste parsers of chunkers aan de pijplijn voor gegevensvoorbereiding te vergemakkelijken.

Een nieuwe parser toevoegen

Stel dat u een nieuwe parser wilt opnemen met behulp van de PyMuPDF-bibliotheek om geparseerde tekst te transformeren in Markdown-indeling. Volg vervolgens deze stappen:

  1. Installeer de vereiste afhankelijkheden door de volgende code toe te voegen aan het parser_library notebook in de single_fix of multiple_fix map:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. Voeg in het parser_library notitieblok in de single_fix of multiple_fix map een nieuwe sectie toe voor de PyMuPdfMarkdown parser en implementeer de parseringsfunctie. Zorg ervoor dat de uitvoer van de functie voldoet aan de ParserReturnValue klasse die aan het begin van het notebook is gedefinieerd. Dit zorgt voor compatibiliteit met Spark UDF's. try Het of except blok voorkomt dat Spark de hele parseringstaak mislukt vanwege fouten in afzonderlijke documenten bij het toepassen van de parser als een UDF in 02_parse_docs notebook in de single_fix of multiple_fix map. Met dit notitieblok wordt gecontroleerd of het parseren van documenten is mislukt, de bijbehorende rijen in quarantaine plaatst en een waarschuwing genereert.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Voeg de nieuwe parseringsfunctie toe aan het parser_factoryparser_library notitieblok in de single_fix of multiple_fix map om deze te configureren in het pipeline_config00_config notitieblok.

  4. In het 02_parse_docs notebook worden parserfuncties omgezet in Spark Python UDF's (pijl geoptimaliseerd voor Databricks Runtime 14.0 of hoger) en toegepast op het dataframe met de nieuwe binaire PDF-bestanden. Voor testen en ontwikkelen voegt u een eenvoudige testfunctie toe aan het parser_library notebook waarmee het test-document.pdf-bestand wordt geladen en een geslaagde parsering wordt toegepast:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Een nieuwe chunker toevoegen

Het proces voor het toevoegen van een nieuwe chunker volgt vergelijkbare stappen als hierboven beschreven voor een nieuwe parser.

  1. Voeg de vereiste afhankelijkheden toe in het chunker_library notebook.
  2. Voeg een nieuwe sectie toe voor uw chunker en implementeer een functie, bijvoorbeeld chunk_parsed_content_newchunkername. De uitvoer van de nieuwe chunker-functie moet een Python-woordenlijst zijn die voldoet aan de ChunkerReturnValue klasse die is gedefinieerd aan het begin van het chunker_library notebook. De functie moet ten minste een tekenreeks van de geparseerde tekst accepteren die moet worden gesegmenteerd. Als uw chunker aanvullende parameters vereist, kunt u ze toevoegen als functieparameters.
  3. Voeg uw nieuwe chunker toe aan de chunker_factory functie die is gedefinieerd in het chunker_library notebook. Als uw functie aanvullende parameters accepteert, gebruikt u gedeeltelijke van functools om deze vooraf te configureren. Dit is nodig omdat UDF's slechts één invoerparameter accepteren. Dit is de geparseerde tekst in ons geval. Hiermee chunker_factory kunt u verschillende chunker-methoden in de pipeline_config configureren en een Spark Python UDF retourneren (geoptimaliseerd voor Databricks Runtime 14.0 en hoger).
  4. Voeg een eenvoudige testsectie toe voor de nieuwe segmenteringsfunctie. In deze sectie moet een vooraf gedefinieerde tekst worden gesegmenteerd die is opgegeven als een tekenreeks.

Prestaties afstemmen

Spark maakt gebruik van partities om de verwerking te parallelliseren. Gegevens worden onderverdeeld in segmenten rijen en elke partitie wordt standaard verwerkt door één kern. Wanneer gegevens echter in eerste instantie worden gelezen door Apache Spark, worden er mogelijk geen partities gemaakt die zijn geoptimaliseerd voor de gewenste berekening, met name voor onze UDF's die parserings- en segmenteringstaken uitvoeren. Het is van cruciaal belang om een evenwicht te vinden tussen het maken van partities die klein genoeg zijn voor efficiënte parallelle uitvoering en niet zo klein dat de overhead van het beheren ervan opweegt tegen de voordelen.

U kunt het aantal partities aanpassen met behulp van df.repartitions(<number of partitions>). Bij het toepassen van UDF's moet u streven naar een veelvoud van het aantal kernen dat beschikbaar is op de werkknooppunten. In het 02_parse_docs notebook kunt df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) u bijvoorbeeld twee keer zoveel partities maken als het aantal beschikbare werkkernen. Normaal gesproken moet een veelvoud tussen 1 en 3 bevredigende prestaties opleveren.

De pijplijn handmatig uitvoeren

U kunt ook elke afzonderlijke notebook stapsgewijs uitvoeren:

  1. Laad de onbewerkte bestanden met behulp van het 01_load_files notebook. Hiermee wordt elk binair document opgeslagen als één record in een bronstabel (raw_files_table_name) die is gedefinieerd in de destination_tables_config. Bestanden worden incrementeel geladen en verwerken alleen nieuwe documenten sinds de laatste uitvoering.
  2. De documenten parseren met het 02_parse_docs notitieblok. Dit notebook voert het parser_library notebook uit (zorg ervoor dat u dit uitvoert als de eerste cel om Python opnieuw op te starten), waardoor verschillende parsers en gerelateerde hulpprogramma's beschikbaar zijn. Vervolgens wordt de opgegeven parser in het pipeline_config document gebruikt om elk document te parseren in tekst zonder opmaak. Als voorbeeld worden relevante metagegevens, zoals het aantal pagina's van het oorspronkelijke PDF-bestand, naast de geparseerde tekst vastgelegd. Geparseerde documenten worden opgeslagen in een zilveren tabel (parsed_docs_table_name), terwijl niet-geparseerde documenten in quarantaine worden geplaatst in een bijbehorende tabel.
  3. Segmenter de geparseerde documenten met behulp van het 03_chunk_docs notitieblok. Net als bij parseren wordt het chunker_library notebook uitgevoerd (opnieuw uitgevoerd als de eerste cel). Het splitst elk geparseerd document in kleinere segmenten met behulp van de opgegeven chunker van de pipeline_config. Aan elk segment wordt een unieke id toegewezen met behulp van een MD5-hash, die nodig is voor synchronisatie met de vectorzoekindex. De laatste delen worden geladen in een gouden tabel (chunked_docs_table_name).
  4. de vectorzoekindex maken/synchroniseren met de 04_vector_index. Dit notebook controleert de gereedheid van het opgegeven vectorzoekeindpunt in de vectorsearch_config. Als de geconfigureerde index al bestaat, wordt synchronisatie gestart met de gouden tabel; anders wordt de index gemaakt en wordt synchronisatie geactiveerd. Dit duurt naar verwachting enige tijd als het Vector Search-eindpunt en de index nog niet zijn gemaakt.

Volgende stap

Ga verder met stap 7. Implementeren en bewaken.

< vorige: stap 6. Kwaliteitsproblemen iteratief oplossen

Volgende: stap 7. De POC-> & bewaken