Compartir a través de


Paso 6 (canalizaciones). Implementación de correcciones de canalización de datos

Canalización de datos

Siga estos pasos para modificar la canalización de datos y ejecutarla en:

  1. Cree un nuevo índice vectorial.
  2. Cree una ejecución de MLflow con los metadatos de la canalización de datos.

El cuaderno B_quality_iteration/02_evaluate_fixes hace referencia a la ejecución de MLflow resultante.

Hay dos enfoques para modificar la canalización de datos:

  • Implementar una única corrección a la vez En este enfoque, configurará y ejecutará una única canalización de datos a la vez. Este modo es mejor si desea probar un único modelo de inserción y probar un único analizador nuevo. Databricks sugiere empezar aquí para familiarizarse con estos cuadernos.
  • Implementar varias correcciones a la vez En este enfoque, también denominado barrido, ejecutará varias canalizaciones de datos en paralelo, y cada una de estas tiene una configuración diferente. Este modo es el mejor si desea "barrer" en muchas estrategias diferentes, por ejemplo, evaluar tres analizadores PDF o evaluar muchos tamaños de fragmentos diferentes.

Consulte el repositorio de GitHub para ver el código de ejemplo de esta sección.

Enfoque 1: Implementar una única corrección a la vez

  1. Abra el cuaderno B_quality_iteration/data_pipeline_fixes/single_fix/00_config
  2. Siga las instrucciones de una de las siguientes opciones:
  3. Ejecute la canalización, ya sea al:
  4. Agregue el nombre de la ejecución de MLflow resultante que se genera en la variable DATA_PIPELINE_FIXES_RUN_NAMES en el cuaderno B_quality_iteration/02_evaluate_fixes

Nota:

La canalización de preparación de datos emplea Spark Structured Streaming para cargar y procesar archivos de forma incremental. Esto implica que se realice un seguimiento de los archivos ya cargados y preparados en los puntos de control, y que no se vuelvan a procesar. Solo se cargarán, prepararán y anexarán los archivos recién agregados a las tablas correspondientes.

Por lo tanto, si desea volver a ejecutar toda la canalización desde cero y volver a procesar todos los documentos, debe eliminar los puntos de control y las tablas. Para ello, use el cuaderno reset_tables_and_checkpoints.

Enfoque 2: Implementar varias correcciones a la vez

  1. Abra el cuaderno B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
  2. Siga las instrucciones del cuaderno para agregar dos o más configuraciones de la canalización de datos que se va a ejecutar.
  3. Ejecute el cuaderno para ejecutar estas canalizaciones.
  4. Agregue los nombres de las ejecuciones de MLflow resultantes que se generan en la variable DATA_PIPELINE_FIXES_RUN_NAMES en el cuaderno B_quality_iteration/02_evaluate_fixes.

Apéndice

Nota:

Puede encontrar los cuadernos a los que se hace referencia a continuación en los directorios single_fix y multiple_fixes en función de si va a implementar una corrección única o varias correcciones a la vez.

Más detalles sobre las opciones de configuración

A continuación, se enumeran las distintas opciones de configuración implementadas de manera previa para la canalización de datos. Como alternativa, puede implementar un analizador o fragmentador personalizado.

  • vectorsearch_config: especifique el punto de conexión del vector de búsqueda (debe estar activo y en ejecución) y el nombre del índice que se va a crear. Además, defina el tipo de sincronización entre la tabla de origen y el índice (el valor predeterminado es TRIGGERED).
  • embedding_config: especifique el modelo de inserción que se va a usar, junto con el tokenizador. Para obtener una lista completa de las opciones, consulte el cuaderno supporting_configs/embedding_models. El modelo de inserción debe desplegarse en un punto de conexión del servicio de modelos en funcionamiento. En función de la estrategia de fragmentación, el tokenizador también se divide para asegurarse de que los fragmentos no superan el límite de tokens del modelo de inserción. Los tokenizadores se usan aquí para contar el número de tokens en los fragmentos de texto para asegurarse de que no superan la longitud máxima del contexto del modelo de inserción seleccionado.

A continuación se muestra un tokenizador de HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

A continuación se muestra un tokenizador de TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: define el analizador de archivos, el fragmentador y la ruta de acceso al campo de orígenes. Los analizadores y los fragmentadores se definen en los cuadernos parser_library y chunker_library, respectivamente. Estos se pueden encontrar en los directorios single_fix y multiple_fixes. Para obtener una lista completa de las opciones, consulte el cuaderno supporting_configs/parser_chunker_strategies, que está disponible de nuevo en los directorios de corrección única y múltiples. Los diferentes analizadores o fragmentadores pueden requerir parámetros de configuración diferentes, donde <param x> representan los parámetros potenciales necesarios para un fragmentador específico. También se pueden pasar valores de configuración con el mismo formato a los analizadores.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementación de un analizador o un fragmentador personalizado

Este proyecto está estructurado para facilitar la adición de analizadores o fragmentadores personalizados a la canalización de preparación de datos.

Agregar un nuevo analizador

Supongamos que desea incorporar un nuevo analizador mediante la biblioteca PyMuPDF para transformar el texto analizado en formato Markdown. Siga estos pasos:

  1. Instale las dependencias necesarias agregando el código siguiente al cuaderno parser_library en el directorio single_fix o multiple_fix:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. En el cuaderno parser_library del directorio single_fix o multiple_fix, agregue una nueva sección para el analizador PyMuPdfMarkdown e implemente la función de análisis. Asegúrese de que la salida de la función cumple con la clase ParserReturnValue definida al principio del cuaderno. Esto garantiza la compatibilidad con las UDF de Spark. El bloque try o except impide que Spark produzca un error en todo el trabajo de análisis debido a errores en documentos individuales al aplicar el analizador como UDF en el cuaderno 02_parse_docs en el directorio single_fix o multiple_fix. Este cuaderno comprobará si se produjo un error en el análisis de cualquier documento, pondrá en cuarentena las filas correspondientes y generará una advertencia.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Agregue la nueva función de análisis a parser_factory en el cuaderno parser_library en el directorio single_fix o multiple_fix para que sea configurable en pipeline_config del cuaderno 00_config.

  4. En el cuaderno 02_parse_docs, las funciones del analizador se convierten en UDF de Python de Spark (optimizadas para Arrow para Databricks Runtime 14.0 o superior) y se aplican al DataFrame que contiene los nuevos archivos PDF binarios. Para pruebas y desarrollo, agregue una función de prueba sencilla al cuaderno parser_library que cargue el archivo test-document.pdf y confirme que el análisis se ha realizado correctamente:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Agregar un nuevo fragmentador

El proceso para agregar un nuevo fragmentador sigue pasos similares a los descritos anteriormente para un nuevo analizador.

  1. Agregue las dependencias necesarias en el cuaderno chunker_library.
  2. Agregue una nueva sección para el fragmentador e implemente una función, por ejemplo, chunk_parsed_content_newchunkername. La salida de la nueva función de fragmentador debe ser un diccionario de Python que cumpla con la clase ChunkerReturnValue definida al principio del cuaderno chunker_library. La función debe aceptar al menos una cadena del texto analizado que se va a fragmentar. Si el fragmentador requiere parámetros adicionales, puede agregarlos como parámetros de función.
  3. Agregue el nuevo fragmentador a la función chunker_factory definida en el cuaderno chunker_library. Si la función acepta parámetros adicionales, use functools’ partial para configurarlos previamente. Esto es necesario porque las UDF solo aceptan un parámetro de entrada, que será el texto analizado en nuestro caso. chunker_factory permite configurar diferentes métodos de fragmentador en pipeline_config y devuelve una UDF de Python de Spark (optimizada para Databricks Runtime 14.0 y versiones posteriores).
  4. Agregue una sección de prueba sencilla para la nueva función de fragmentación. Esta sección debe fragmentar un texto predefinido proporcionado como una cadena.

Optimización del rendimiento

Spark utiliza particiones para paralelizar el procesamiento. Los datos se dividen en fragmentos de filas y cada partición se procesa mediante un único núcleo de forma predeterminada. Sin embargo, cuando Apache Spark lee inicialmente los datos, es posible que no cree particiones optimizadas para el cálculo deseado, especialmente para las UDF que realizan tareas de análisis y fragmentación. Es fundamental lograr un equilibrio entre la creación de particiones lo suficientemente pequeñas como para una paralelización eficaz y no tan pequeñas que la sobrecarga de administrarlas supere sus ventajas.

Puede ajustar el número de particiones mediante df.repartitions(<number of partitions>). Al aplicar UDF, apunte a un múltiplo del número de núcleos disponibles en los nodos de trabajo. Por ejemplo, en el cuaderno 02_parse_docs, puede incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) para crear dos veces más particiones del número de núcleos de trabajo disponibles. Por lo general, un múltiplo entre 1 y 3 debería producir un rendimiento satisfactorio.

Ejecución manual de la canalización

Como alternativa, puede ejecutar cada cuaderno individual paso a paso:

  1. Cargue los archivos sin procesar mediante el cuaderno 01_load_files. Esto guarda cada archivo binario de documento como un registro en una tabla bronze (raw_files_table_name) definida en destination_tables_config. Los archivos se cargan de forma incremental, procesando solo los documentos nuevos desde la última ejecución.
  2. Analice los documentos con el cuaderno 02_parse_docs. Este cuaderno ejecuta el cuaderno parser_library (asegúrese de ejecutarlo como la primera celda para reiniciar Python), lo que hace que diferentes analizadores y utilidades relacionadas estén disponibles. A continuación, usa el analizador especificado en pipeline_config para analizar cada documento en texto sin formato. Por ejemplo, se capturan metadatos relevantes como el número de páginas del PDF original junto con el texto analizado. Los documentos analizados correctamente se almacenan en una tabla silver (parsed_docs_table_name), mientras que los documentos no analizados se ponen en cuarentena en una tabla correspondiente.
  3. Fragmenta los documentos analizados mediante el cuaderno 03_chunk_docs. De forma similar al análisis, este cuaderno ejecuta el cuaderno chunker_library (de nuevo, ejecútelo como la primera celda). Divide cada documento analizado en fragmentos más pequeños mediante el fragmentador especificado de pipeline_config. A cada fragmento se le asigna un identificador único mediante un hash MD5, necesario para la sincronización con el índice de vector de búsqueda. Los fragmentos finales se cargan en una tabla gold (chunked_docs_table_name).
  4. Cree o sincronice el índice de vector de búsqueda con 04_vector_index. Este cuaderno comprueba la preparación del punto de conexión del vector de búsqueda especificado en vectorsearch_config. Si el índice configurado ya existe, inicia la sincronización con la tabla gold; de lo contrario, crea el índice y desencadena la sincronización. Se espera que esto tarde un momento si aún no se han creado el punto de conexión ni el índice de vector de búsqueda.

Paso siguiente

Continúe con el Paso 7. Implementar y supervisar.