Dela via


Steg 6 (pipelines). Implementera datapipelinekorrigeringar

Datapipeline

Följ de här stegen för att ändra din datapipeline och kör den till:

  1. Skapa ett nytt vektorindex.
  2. Skapa en MLflow-körning med datapipelinens metadata.

Den resulterande MLflow-körningen refereras av notebook-filen B_quality_iteration/02_evaluate_fixes .

Det finns två metoder för att ändra datapipelinen:

  • Implementera en enskild korrigering i taget I den här metoden konfigurerar och kör du en enda datapipeline samtidigt. Det här läget är bäst om du vill prova en enda inbäddningsmodell och testa en enda ny parser. Databricks föreslår att du börjar här för att bekanta dig med dessa notebook-filer.
  • Implementera flera korrigeringar samtidigt I den här metoden, som även kallas för en svepning, kör du parallellt flera datapipelines som var och en har en annan konfiguration. Det här läget är bäst om du vill "sopa" över många olika strategier, till exempel utvärdera tre PDF-parsare eller utvärdera många olika segmentstorlekar.

Se GitHub-lagringsplatsen för exempelkoden i det här avsnittet.

Metod 1: Implementera en enskild korrigering i taget

  1. Öppna anteckningsboken B_quality_iteration/data_pipeline_fixes/single_fix/00_config
  2. Följ anvisningarna i något av följande:
  3. Kör pipelinen genom att antingen:
    • Öppnar och kör 00_Run_Entire_Pipeline notebook-filen.
    • Följ stegen för att köra varje steg i pipelinen manuellt.
  4. Lägg till namnet på den resulterande MLflow-körningen som matas ut till variabeln DATA_PIPELINE_FIXES_RUN_NAMES i B_quality_iteration/02_evaluate_fixes notebook-fil

Kommentar

Pipelinen för dataförberedelse använder Spark Structured Streaming för inkrementell inläsning och bearbetning av filer. Detta innebär att filer som redan har lästs in och förberetts spåras i kontrollpunkter och inte bearbetas på nytt. Endast nyligen tillagda filer läses in, förbereds och läggs till i motsvarande tabeller.

Om du vill köra hela pipelinen från grunden och bearbeta alla dokument igen måste du därför ta bort kontrollpunkterna och tabellerna. Du kan göra detta med hjälp av den reset_tables_and_checkpoints notebook-filen.

Metod 2: Implementera flera korrigeringar samtidigt

  1. Öppna anteckningsboken B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
  2. Följ anvisningarna i notebook-filen för att lägga till två eller flera konfigurationer av datapipelinen som ska köras.
  3. Kör notebook-filen för att köra dessa pipelines.
  4. Lägg till namnen på de resulterande MLflow-körningarna som matas ut till variabeln DATA_PIPELINE_FIXES_RUN_NAMES i B_quality_iteration/02_evaluate_fixes notebook-fil.

Bilaga

Kommentar

Du hittar anteckningsböckerna som refereras nedan i single_fix och multiple_fixes kataloger beroende på om du implementerar en enda korrigering eller flera korrigeringar i taget.

Djupdykning i konfigurationsinställningar

Nedan visas de olika förimpleierade konfigurationsalternativen för datapipelinen. Du kan också implementera en anpassad parser/chunker.

  • vectorsearch_config: Ange slutpunkten för vektorsökning (måste vara igång) och namnet på indexet som ska skapas. Definiera dessutom synkroniseringstypen mellan källtabellen och indexet (standardvärdet är TRIGGERED).
  • embedding_config: Ange den inbäddningsmodell som ska användas tillsammans med tokenizern. En fullständig lista över alternativ finns i notebook-filen supporting_configs/embedding_models . Inbäddningsmodellen måste distribueras till en modell som körs som betjänar slutpunkten. Beroende på segmenteringsstrategin är tokenizern också under delning för att se till att segmenten inte överskrider tokengränsen för inbäddningsmodellen. Tokeniserare används här för att räkna antalet token i textsegmenten för att säkerställa att de inte överskrider den maximala kontextlängden för den valda inbäddningsmodellen.

Följande visar en tokenizer från HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Följande visar en tokenizer från TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: Definierar filparsern, segmenteraren och sökvägen till källfältet. Parsare och segmentare definieras i parser_library notebook-filerna respektive chunker_library notebook-filerna. Dessa finns i single_fix och multiple_fixes kataloger. En fullständig lista över alternativ finns i supporting_configs/parser_chunker_strategies notebook-filen, som återigen är tillgänglig i både katalogerna för enkla och flera korrigeringar. Olika parser eller segment kan kräva olika konfigurationsparametrar där <param x> representerar de potentiella parametrar som krävs för en specifik segmenterare. Parsare kan också skickas konfigurationsvärden med samma format.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementera en anpassad parser/chunker

Det här projektet är strukturerat för att underlätta tillägg av anpassade parsers eller segment i pipelinen för dataförberedelse.

Lägg till en ny parser

Anta att du vill införliva en ny parser med hjälp av PyMuPDF-biblioteket för att omvandla tolkad text till Markdown-format. Följ de här stegen:

  1. Installera nödvändiga beroenden genom att lägga till parser_library följande kod i notebook-filen i single_fix katalogen eller multiple_fix :

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. I notebook-filen parser_library i single_fix katalogen eller multiple_fix lägger du till ett nytt avsnitt för PyMuPdfMarkdown parsern och implementerar parsningsfunktionen. Se till att funktionens utdata överensstämmer med klassen ParserReturnValue som definierades i början av notebook-filen. Detta säkerställer kompatibilitet med Spark-UDF:er. Eller-blocket try except hindrar Spark från att misslyckas med hela parsningsjobbet på grund av fel i enskilda dokument när parsern används som en UDF i 02_parse_docs notebook-filen i single_fix katalogen eller multiple_fix . Den här notebook-filen kontrollerar om parsningen misslyckades för något dokument, placerar motsvarande rader i karantän och skapar en varning.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Lägg till den nya parsningsfunktionen parser_factory i anteckningsboken parser_library single_fix i katalogen eller multiple_fix för att göra den pipeline_config konfigurerbar i notebook-filen 00_config .

  4. 02_parse_docs I notebook-filen omvandlas parserfunktioner till Spark Python UDF:er (piloptimerade för Databricks Runtime 14.0 eller senare) och tillämpas på dataramen som innehåller de nya binära PDF-filerna. För testning och utveckling lägger du till en enkel testfunktion i den parser_library notebook-filen som läser in test-document.pdf-filen och hävdar lyckad parsning:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Lägga till en ny segmentare

Processen för att lägga till en ny segmentare följer liknande steg som beskrivs ovan för en ny parser.

  1. Lägg till nödvändiga beroenden i den chunker_library notebook-filen.
  2. Lägg till ett nytt avsnitt för segmentören och implementera en funktion, t.ex. chunk_parsed_content_newchunkername. Utdata från den nya segmentfunktionen måste vara en Python-ordlista som överensstämmer med den ChunkerReturnValue klass som definierades i början av chunker_library notebook-filen. Funktionen bör acceptera minst en sträng av den tolkade texten som ska segmenteras. Om segmentatorn kräver ytterligare parametrar kan du lägga till dem som funktionsparametrar.
  3. Lägg till din nya segmenterare i funktionen chunker_factory som definierats i chunker_library notebook-filen. Om funktionen accepterar ytterligare parametrar använder du functools partiella för att förkonfigurera dem. Detta är nödvändigt eftersom UDF:er endast accepterar en indataparameter, vilket är den tolkade texten i vårt fall. Gör chunker_factory att du kan konfigurera olika segmentmetoder i pipeline_config och returnerar en Spark Python UDF (optimerad för Databricks Runtime 14.0 och senare).
  4. Lägg till ett enkelt testavsnitt för din nya segmenteringsfunktion. Det här avsnittet ska dela upp en fördefinierad text som tillhandahålls som en sträng.

Prestandajustering

Spark använder partitioner för att parallellisera bearbetningen. Data är indelade i rader och varje partition bearbetas som standard av en enda kärna. Men när data först läss av Apache Spark kanske de inte skapar partitioner som är optimerade för önskad beräkning, särskilt för våra UDF:er som utför parsnings- och segmenteringsuppgifter. Det är viktigt att hitta en balans mellan att skapa partitioner som är tillräckligt små för effektiv parallellisering och inte så små att kostnaderna för att hantera dem överväger fördelarna.

Du kan justera antalet partitioner med hjälp av df.repartitions(<number of partitions>). När du tillämpar UDF:er ska du sikta på en multipel av antalet tillgängliga kärnor på arbetsnoderna. I 02_parse_docs notebook-filen kan du till exempel inkludera df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) att skapa dubbelt så många partitioner som antalet tillgängliga arbetskärnor. Normalt bör en multipel mellan 1 och 3 ge tillfredsställande prestanda.

Köra pipelinen manuellt

Du kan också köra varje enskild notebook-fil steg för steg:

  1. Läs in rådatafilerna med hjälp av notebook-filen 01_load_files . Detta sparar varje binärt dokument som en post i en bronstabell (raw_files_table_name) som definieras i destination_tables_config. Filer läses in stegvis och bearbetar endast nya dokument sedan den senaste körningen.
  2. Parsa dokumenten med notebook-filen 02_parse_docs . Den här notebook-filen kör parser_library notebook-filen (se till att köra den här som den första cellen för att starta om Python), vilket gör olika parsers och relaterade verktyg tillgängliga. Den använder sedan den angivna parsern i pipeline_config för att parsa varje dokument i oformaterad text. Till exempel registreras relevanta metadata som antalet sidor i den ursprungliga PDF-filen tillsammans med den tolkade texten. Parsade dokument lagras i en silvertabell (parsed_docs_table_name), medan alla dokument som inte har parsats placeras i karantän i en motsvarande tabell.
  3. Dela upp de tolkade dokumenten med hjälp av notebook-filen 03_chunk_docs . Precis som vid parsning kör den här notebook-filen anteckningsboken chunker_library (kör igen som den första cellen). Varje parsat dokument delas upp i mindre segment med den angivna segmenten från pipeline_config. Varje segment tilldelas ett unikt ID med hjälp av en MD5-hash som krävs för synkronisering med vektorsökningsindexet. De sista segmenten läses in i en guldtabell (chunked_docs_table_name).
  4. Skapa/synkronisera vektorsökningsindexet 04_vector_indexmed . Den här notebook-filen verifierar beredskapen för den angivna slutpunkten för vektorsökning i vectorsearch_config. Om det konfigurerade indexet redan finns initieras synkroniseringen med guldtabellen. annars skapas indexet och synkroniseringen utlöses. Detta förväntas ta lite tid om slutpunkten och indexet för vektorsökning ännu inte har skapats.

Gå vidare

Fortsätt med steg 7. Distribuera och övervaka.