Etapa 6 (pipelines). Implementar correções de pipeline de dados
Siga estas etapas para modificar e executar o pipeline de dados para:
- Criar um novo índice de vetor.
- Criar uma execução do MLflow com os metadados do pipeline de dados.
A execução do MLflow resultante é referenciada pelo notebook B_quality_iteration/02_evaluate_fixes
.
Existem duas abordagens para modificar o pipeline de dados:
- Implementar uma única correção por vez Nessa abordagem, você configura e executa um único pipeline de dados de uma só vez. Esse modo é o melhor caso você queira experimentar um único modelo de inserção e testar um novo analisador único. O Databricks sugere começar por aqui para se familiarizar com esses notebooks.
- Implementar várias correções de uma só vez Nessa abordagem, também chamada de varredura, você executa vários pipelines de dados em paralelo e cada um tem uma configuração diferente. Esse modo é o melhor caso você queira "varrer" várias estratégias diferentes, por exemplo, avaliar três analisadores de PDF ou avaliar vários tamanhos de partes diferentes.
Consulte o repositório do GitHub para obter o código de exemplo nesta seção.
Abordagem 1: implementar uma única correção por vez
- Abra o notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config
- Siga as instruções em um dos seguintes:
- Siga as instruções para implementar uma nova configuração fornecida por este guia.
- Siga as etapas para implementar o código personalizado para uma análise ou agrupamento.
- Execute o pipeline assim:
- Abrindo e executando o notebook 00_Run_Entire_Pipeline.
- Seguindo as etapas para executar cada etapa do pipeline manualmente.
- Adicionar o nome da execução de MLflow resultante que é gerada para a variável
DATA_PIPELINE_FIXES_RUN_NAMES
no notebook B_quality_iteration/02_evaluate_fixes
Observação
O pipeline de preparação de dados utiliza o Streaming estruturado do Spark para carregar e processar arquivos de forma incremental. Isso significa que os arquivos já carregados e preparados são rastreados em pontos de verificação e não serão reprocessados. Somente os arquivos recentemente adicionados serão carregados, preparados e acrescentados às tabelas correspondentes.
Portanto, se desejar executar novamente todo o pipeline do zero e reprocessar todos os documentos, será necessário excluir os pontos de verificação e as tabelas. Você pode fazer isso usando o notebook reset_tables_and_checkpoints.
Abordagem 2: implementar várias correções ao mesmo tempo
- Abra o notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
- Siga as instruções no notebook para adicionar duas ou mais configurações do pipeline de dados a serem executadas.
- Execute o notebook para executar esses pipelines.
- Adicione os nomes das execuções de MLflow resultantes que são geradas para a variável
DATA_PIPELINE_FIXES_RUN_NAMES
no notebook B_quality_iteration/02_evaluate_fixes.
Apêndice
Observação
Você pode encontrar os notebooks referenciados abaixo nos diretórios single_fix e multiple_fixes dependendo de onde você está implementando uma correção única ou várias correções por vez.
Aprofundamento das configurações de configuração
As diversas opções de configuração pré-implementadas para o pipeline de dados estão listadas abaixo. Como alternativa, é possível implementar um analisador/chunker personalizado.
vectorsearch_config
: especifique o ponto de extremidade de busca em vetores (deve estar em execução) e o nome do índice a ser criado. Além disso, defina o tipo de sincronização entre a tabela de origem e o índice (o padrão éTRIGGERED
).embedding_config
: especifique o modelo de inserção a ser usado, junto ao gerador de token. Para obter uma lista completa de opções, consulte o notebooksupporting_configs/embedding_models
. O modelo de inserção deve ser implantado em um ponto de extremidade de serviço de modelo em execução. Dependendo da estratégia de agrupamento, o gerador de token também está presente durante a divisão para garantir que as partes não excedam o limite de token do modelo de inserção. Os geradores de token são usados aqui para contar o número de tokens nas partes de texto e garantir que eles não excedam o comprimento máximo de contexto do modelo de inserção selecionado.
O seguinte mostra um gerador de token do HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
O seguinte mostra um gerador de token do TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: define o analisador de arquivos, o chunker e o caminho para o campo de origem. Analisadores e chunkers são definidos nos notebooksparser_library
echunker_library
, respectivamente. Eles podem ser encontrados nos diretórios single_fix e multiple_fixes. Para obter uma lista completa de opções, consulte o notebooksupporting_configs/parser_chunker_strategies
, que está novamente disponível nos diretórios de correção única e múltipla. Analisadores ou chunkers diferentes podem exigir parâmetros de configuração diferentes, em que<param x>
representam os parâmetros potenciais necessários para um chunker específico. Os analisadores também podem ter valores de configuração passados usando o mesmo formato.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementação de um analisador/chunker personalizado
Este projeto é estruturado para facilitar a adição de analisadores ou chunkers personalizados ao pipeline de preparação de dados.
Adicionar um novo analisador
Digamos que você queira incorporar um novo analisador usando a biblioteca PyMuPDF para transformar o texto analisado no formato Markdown. Siga estas etapas:
Instale as dependências necessárias adicionando o seguinte código ao notebook
parser_library
no diretóriosingle_fix
oumultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
No notebook
parser_library
presente no diretóriosingle_fix
oumultiple_fix
, adicione uma nova seção para o analisadorPyMuPdfMarkdown
e implemente a função de análise. Verifique se a saída da função está em conformidade com a classeParserReturnValue
definida no início do notebook. Isso garante a compatibilidade com UDFs do Spark. O bloqueiotry
ouexcept
impede que o Spark falhe em todo o trabalho de análise devido a erros em documentos individuais ao aplicar o analisador como uma UDF no notebook02_parse_docs
no diretóriosingle_fix
oumultiple_fix
. Este notebook verificará se a análise falhou em algum documento, colocará as linhas correspondentes em quarentena e gerará um aviso.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Adicione sua nova função de análise ao
parser_factory
no notebookparser_library
no diretóriosingle_fix
oumultiple_fix
para tornar isso configurável nopipeline_config
do notebook00_config
.No notebook
02_parse_docs
, as funções do analisador são transformadas em UDFs de Python do Spark (arrow-optimized para Databricks Runtime 14.0 ou superior) e aplicadas ao dataframe que contém os novos arquivos PDF binários. Para teste e desenvolvimento, adicione uma função de teste simples ao notebook parser_library que carrega o arquivo test-document.pdf e declara a análise bem-sucedida:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Adicionar um novo chunker
O processo de adição de um novo chunker segue etapas semelhantes às explicadas acima para um novo analisador.
- Adicione as dependências necessárias no notebook chunker_library.
- Adicione uma nova seção para o chunker e implemente uma função, por exemplo,
chunk_parsed_content_newchunkername
. A saída da nova função chunker deve ser um dicionário Python que esteja em conformidade com a classeChunkerReturnValue
definida no início do notebook chunker_library. A função deve aceitar pelo menos uma cadeia de caracteres do texto analisado a ser agrupado. Se o chunker exigir parâmetros adicionais, você poderá adicionar isso como parâmetros de função. - Adicione o novo chunker à função
chunker_factory
definida no notebook chunker_library. Se a função aceitar parâmetros adicionais, use parcial de functools para pré-configurar isso. Isso é necessário porque as UDFs aceitam apenas um parâmetro de entrada, que será o texto analisado em nosso caso. Ochunker_factory
permite configurar diferentes métodos de chunker no pipeline_config e retorna um UDF de Python do Spark (otimizado para Databricks Runtime 14.0 e superior). - Adicione uma seção de teste simples para a nova função de agrupamento. Esta seção deve agrupar um texto predefinido fornecido como uma cadeia de caracteres.
Ajuste de desempenho
O Spark utiliza partições para paralelizar o processamento. Os dados são divididos em partes de linhas e cada partição é processada por um único núcleo por padrão. No entanto, quando os dados são lidos inicialmente pelo Apache Spark, isso pode não criar partições otimizadas para a computação desejada, especialmente para nossos UDFs que executam tarefas de análise e agrupamento. É crucial encontrar um equilíbrio entre a criação de partições que são pequenas o suficiente para paralelização eficiente e não tão pequenas que a sobrecarga de gerenciá-las supere os benefícios.
É possível ajustar o número de partições usando df.repartitions(<number of partitions>)
. Ao aplicar UDFs, busque obter um múltiplo do número de núcleos disponíveis nos nós de trabalho. Por exemplo, no notebook 02_parse_docs, você pode incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
para criar o dobro de partições do que o número de núcleos de trabalho disponíveis. Normalmente, um múltiplo entre 1 e 3 deve gerar um desempenho satisfatório.
Execução do pipeline manualmente
Como alternativa, você pode executar cada notebook individual passo a passo:
- Carregue os arquivos brutos usando o notebook
01_load_files
. Isso salva cada binário de documento como um registro em uma tabela de bronze (raw_files_table_name
) definida nodestination_tables_config
. Os arquivos são carregados incrementalmente, processando apenas novos documentos desde a última execução. - Analise os documentos com o notebook
02_parse_docs
. Este notebook executa o notebookparser_library
(certifique-se de executar isso como a primeira célula a reiniciar o Python), disponibilizando analisadores diferentes e utilitários relacionados. Em seguida, ele usa o analisador especificado nopipeline_config
para analisar cada documento em texto sem formatação. Por exemplo, metadados relevantes como o número de páginas do PDF original ao lado do texto analisado são capturados. Documentos analisados com êxito são armazenados em uma tabela prata (parsed_docs_table_name
), enquanto todos os documentos não analisados são colocados em quarentena em uma tabela correspondente. - Dividir em partes os documentos analisados usando o notebook
03_chunk_docs
. Semelhante à análise, este notebook executa o notebookchunker_library
(novamente, execute como a primeira célula). Ele divide cada documento analisado em partes menores usando o chunker especificado dopipeline_config
. Cada parte recebe uma ID exclusiva usando um hash MD5, necessário para sincronização com o índice de busca em vetores. As partes finais são carregadas em uma tabela ouro (chunked_docs_table_name
). - Criar/sincronizar o índice de busca em vetores com o
04_vector_index
. Este notebook verifica a prontidão do ponto de extremidade de busca em vetores especificado novectorsearch_config
. Se o índice configurado já existir, ele iniciará a sincronização com a tabela ouro, caso contrário, ele criará o índice e disparará a sincronização. É esperado que isso leve algum tempo se o ponto de extremidade e o índice da busca em vetores ainda não tiverem sido criados.
Próxima etapa
Continue na Etapa 7. Implantar e monitorar.