Steg 6 (pipelines). Implementera datapipelinekorrigeringar
Följ de här stegen för att ändra din datapipeline och kör den till:
- Skapa ett nytt vektorindex.
- Skapa en MLflow-körning med datapipelinens metadata.
Den resulterande MLflow-körningen refereras av notebook-filen B_quality_iteration/02_evaluate_fixes
.
Det finns två metoder för att ändra datapipelinen:
- Implementera en enskild korrigering i taget I den här metoden konfigurerar och kör du en enda datapipeline samtidigt. Det här läget är bäst om du vill prova en enda inbäddningsmodell och testa en enda ny parser. Databricks föreslår att du börjar här för att bekanta dig med dessa notebook-filer.
- Implementera flera korrigeringar samtidigt I den här metoden, som även kallas för en svepning, kör du parallellt flera datapipelines som var och en har en annan konfiguration. Det här läget är bäst om du vill "sopa" över många olika strategier, till exempel utvärdera tre PDF-parsare eller utvärdera många olika segmentstorlekar.
Se GitHub-lagringsplatsen för exempelkoden i det här avsnittet.
Metod 1: Implementera en enskild korrigering i taget
- Öppna anteckningsboken B_quality_iteration/data_pipeline_fixes/single_fix/00_config
- Följ anvisningarna i något av följande:
- Följ anvisningarna för att implementera en ny konfiguration som tillhandahålls av den här kokboken.
- Följ stegen för att implementera anpassad kod för parsning eller segmentering.
- Kör pipelinen genom att antingen:
- Öppnar och kör 00_Run_Entire_Pipeline notebook-filen.
- Följ stegen för att köra varje steg i pipelinen manuellt.
- Lägg till namnet på den resulterande MLflow-körningen som matas ut till variabeln
DATA_PIPELINE_FIXES_RUN_NAMES
i B_quality_iteration/02_evaluate_fixes notebook-fil
Kommentar
Pipelinen för dataförberedelse använder Spark Structured Streaming för inkrementell inläsning och bearbetning av filer. Detta innebär att filer som redan har lästs in och förberetts spåras i kontrollpunkter och inte bearbetas på nytt. Endast nyligen tillagda filer läses in, förbereds och läggs till i motsvarande tabeller.
Om du vill köra hela pipelinen från grunden och bearbeta alla dokument igen måste du därför ta bort kontrollpunkterna och tabellerna. Du kan göra detta med hjälp av den reset_tables_and_checkpoints notebook-filen.
Metod 2: Implementera flera korrigeringar samtidigt
- Öppna anteckningsboken B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
- Följ anvisningarna i notebook-filen för att lägga till två eller flera konfigurationer av datapipelinen som ska köras.
- Kör notebook-filen för att köra dessa pipelines.
- Lägg till namnen på de resulterande MLflow-körningarna som matas ut till variabeln
DATA_PIPELINE_FIXES_RUN_NAMES
i B_quality_iteration/02_evaluate_fixes notebook-fil.
Bilaga
Kommentar
Du hittar anteckningsböckerna som refereras nedan i single_fix och multiple_fixes kataloger beroende på om du implementerar en enda korrigering eller flera korrigeringar i taget.
Djupdykning i konfigurationsinställningar
Nedan visas de olika förimpleierade konfigurationsalternativen för datapipelinen. Du kan också implementera en anpassad parser/chunker.
vectorsearch_config
: Ange slutpunkten för vektorsökning (måste vara igång) och namnet på indexet som ska skapas. Definiera dessutom synkroniseringstypen mellan källtabellen och indexet (standardvärdet ärTRIGGERED
).embedding_config
: Ange den inbäddningsmodell som ska användas tillsammans med tokenizern. En fullständig lista över alternativ finns i notebook-filensupporting_configs/embedding_models
. Inbäddningsmodellen måste distribueras till en modell som körs som betjänar slutpunkten. Beroende på segmenteringsstrategin är tokenizern också under delning för att se till att segmenten inte överskrider tokengränsen för inbäddningsmodellen. Tokeniserare används här för att räkna antalet token i textsegmenten för att säkerställa att de inte överskrider den maximala kontextlängden för den valda inbäddningsmodellen.
Följande visar en tokenizer från HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
Följande visar en tokenizer från TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: Definierar filparsern, segmenteraren och sökvägen till källfältet. Parsare och segmentare definieras iparser_library
notebook-filerna respektivechunker_library
notebook-filerna. Dessa finns i single_fix och multiple_fixes kataloger. En fullständig lista över alternativ finns isupporting_configs/parser_chunker_strategies
notebook-filen, som återigen är tillgänglig i både katalogerna för enkla och flera korrigeringar. Olika parser eller segment kan kräva olika konfigurationsparametrar där<param x>
representerar de potentiella parametrar som krävs för en specifik segmenterare. Parsare kan också skickas konfigurationsvärden med samma format.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementera en anpassad parser/chunker
Det här projektet är strukturerat för att underlätta tillägg av anpassade parsers eller segment i pipelinen för dataförberedelse.
Lägg till en ny parser
Anta att du vill införliva en ny parser med hjälp av PyMuPDF-biblioteket för att omvandla tolkad text till Markdown-format. Följ de här stegen:
Installera nödvändiga beroenden genom att lägga till
parser_library
följande kod i notebook-filen isingle_fix
katalogen ellermultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
I notebook-filen
parser_library
isingle_fix
katalogen ellermultiple_fix
lägger du till ett nytt avsnitt förPyMuPdfMarkdown
parsern och implementerar parsningsfunktionen. Se till att funktionens utdata överensstämmer med klassenParserReturnValue
som definierades i början av notebook-filen. Detta säkerställer kompatibilitet med Spark-UDF:er. Eller-blockettry
except
hindrar Spark från att misslyckas med hela parsningsjobbet på grund av fel i enskilda dokument när parsern används som en UDF i02_parse_docs
notebook-filen isingle_fix
katalogen ellermultiple_fix
. Den här notebook-filen kontrollerar om parsningen misslyckades för något dokument, placerar motsvarande rader i karantän och skapar en varning.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Lägg till den nya parsningsfunktionen
parser_factory
i anteckningsbokenparser_library
single_fix
i katalogen ellermultiple_fix
för att göra denpipeline_config
konfigurerbar i notebook-filen00_config
.02_parse_docs
I notebook-filen omvandlas parserfunktioner till Spark Python UDF:er (piloptimerade för Databricks Runtime 14.0 eller senare) och tillämpas på dataramen som innehåller de nya binära PDF-filerna. För testning och utveckling lägger du till en enkel testfunktion i den parser_library notebook-filen som läser in test-document.pdf-filen och hävdar lyckad parsning:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Lägga till en ny segmentare
Processen för att lägga till en ny segmentare följer liknande steg som beskrivs ovan för en ny parser.
- Lägg till nödvändiga beroenden i den chunker_library notebook-filen.
- Lägg till ett nytt avsnitt för segmentören och implementera en funktion, t.ex.
chunk_parsed_content_newchunkername
. Utdata från den nya segmentfunktionen måste vara en Python-ordlista som överensstämmer med denChunkerReturnValue
klass som definierades i början av chunker_library notebook-filen. Funktionen bör acceptera minst en sträng av den tolkade texten som ska segmenteras. Om segmentatorn kräver ytterligare parametrar kan du lägga till dem som funktionsparametrar. - Lägg till din nya segmenterare i funktionen
chunker_factory
som definierats i chunker_library notebook-filen. Om funktionen accepterar ytterligare parametrar använder du functools partiella för att förkonfigurera dem. Detta är nödvändigt eftersom UDF:er endast accepterar en indataparameter, vilket är den tolkade texten i vårt fall. Görchunker_factory
att du kan konfigurera olika segmentmetoder i pipeline_config och returnerar en Spark Python UDF (optimerad för Databricks Runtime 14.0 och senare). - Lägg till ett enkelt testavsnitt för din nya segmenteringsfunktion. Det här avsnittet ska dela upp en fördefinierad text som tillhandahålls som en sträng.
Prestandajustering
Spark använder partitioner för att parallellisera bearbetningen. Data är indelade i rader och varje partition bearbetas som standard av en enda kärna. Men när data först läss av Apache Spark kanske de inte skapar partitioner som är optimerade för önskad beräkning, särskilt för våra UDF:er som utför parsnings- och segmenteringsuppgifter. Det är viktigt att hitta en balans mellan att skapa partitioner som är tillräckligt små för effektiv parallellisering och inte så små att kostnaderna för att hantera dem överväger fördelarna.
Du kan justera antalet partitioner med hjälp av df.repartitions(<number of partitions>)
. När du tillämpar UDF:er ska du sikta på en multipel av antalet tillgängliga kärnor på arbetsnoderna. I 02_parse_docs notebook-filen kan du till exempel inkludera df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
att skapa dubbelt så många partitioner som antalet tillgängliga arbetskärnor. Normalt bör en multipel mellan 1 och 3 ge tillfredsställande prestanda.
Köra pipelinen manuellt
Du kan också köra varje enskild notebook-fil steg för steg:
- Läs in rådatafilerna med hjälp av notebook-filen
01_load_files
. Detta sparar varje binärt dokument som en post i en bronstabell (raw_files_table_name
) som definieras idestination_tables_config
. Filer läses in stegvis och bearbetar endast nya dokument sedan den senaste körningen. - Parsa dokumenten med notebook-filen
02_parse_docs
. Den här notebook-filen körparser_library
notebook-filen (se till att köra den här som den första cellen för att starta om Python), vilket gör olika parsers och relaterade verktyg tillgängliga. Den använder sedan den angivna parsern ipipeline_config
för att parsa varje dokument i oformaterad text. Till exempel registreras relevanta metadata som antalet sidor i den ursprungliga PDF-filen tillsammans med den tolkade texten. Parsade dokument lagras i en silvertabell (parsed_docs_table_name
), medan alla dokument som inte har parsats placeras i karantän i en motsvarande tabell. - Dela upp de tolkade dokumenten med hjälp av notebook-filen
03_chunk_docs
. Precis som vid parsning kör den här notebook-filen anteckningsbokenchunker_library
(kör igen som den första cellen). Varje parsat dokument delas upp i mindre segment med den angivna segmenten frånpipeline_config
. Varje segment tilldelas ett unikt ID med hjälp av en MD5-hash som krävs för synkronisering med vektorsökningsindexet. De sista segmenten läses in i en guldtabell (chunked_docs_table_name
). - Skapa/synkronisera vektorsökningsindexet
04_vector_index
med . Den här notebook-filen verifierar beredskapen för den angivna slutpunkten för vektorsökning ivectorsearch_config
. Om det konfigurerade indexet redan finns initieras synkroniseringen med guldtabellen. annars skapas indexet och synkroniseringen utlöses. Detta förväntas ta lite tid om slutpunkten och indexet för vektorsökning ännu inte har skapats.
Gå vidare
Fortsätt med steg 7. Distribuera och övervaka.