Paso 6 (canalizaciones). Implementación de correcciones de canalización de datos
Siga estos pasos para modificar la canalización de datos y ejecutarla en:
- Cree un nuevo índice vectorial.
- Cree una ejecución de MLflow con los metadatos de la canalización de datos.
El cuaderno B_quality_iteration/02_evaluate_fixes
hace referencia a la ejecución de MLflow resultante.
Hay dos enfoques para modificar la canalización de datos:
- Implementar una única corrección a la vez En este enfoque, configurará y ejecutará una única canalización de datos a la vez. Este modo es mejor si desea probar un único modelo de inserción y probar un único analizador nuevo. Databricks sugiere empezar aquí para familiarizarse con estos cuadernos.
- Implementar varias correcciones a la vez En este enfoque, también denominado barrido, ejecutará varias canalizaciones de datos en paralelo, y cada una de estas tiene una configuración diferente. Este modo es el mejor si desea "barrer" en muchas estrategias diferentes, por ejemplo, evaluar tres analizadores PDF o evaluar muchos tamaños de fragmentos diferentes.
Consulte el repositorio de GitHub para ver el código de ejemplo de esta sección.
Enfoque 1: Implementar una única corrección a la vez
- Abra el cuaderno B_quality_iteration/data_pipeline_fixes/single_fix/00_config
- Siga las instrucciones de una de las siguientes opciones:
- Siga las instrucciones para implementar una nueva configuración proporcionada por esta guía paso a paso.
- Siga los pasos para implementar código personalizado para un análisis o fragmentación.
- Ejecute la canalización, ya sea al:
- Abrir y ejecutar el cuaderno 00_Run_Entire_Pipeline.
- Seguir los pasos para ejecutar cada paso de la canalización manualmente.
- Agregue el nombre de la ejecución de MLflow resultante que se genera en la variable
DATA_PIPELINE_FIXES_RUN_NAMES
en el cuaderno B_quality_iteration/02_evaluate_fixes
Nota:
La canalización de preparación de datos emplea Spark Structured Streaming para cargar y procesar archivos de forma incremental. Esto implica que se realice un seguimiento de los archivos ya cargados y preparados en los puntos de control, y que no se vuelvan a procesar. Solo se cargarán, prepararán y anexarán los archivos recién agregados a las tablas correspondientes.
Por lo tanto, si desea volver a ejecutar toda la canalización desde cero y volver a procesar todos los documentos, debe eliminar los puntos de control y las tablas. Para ello, use el cuaderno reset_tables_and_checkpoints.
Enfoque 2: Implementar varias correcciones a la vez
- Abra el cuaderno B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
- Siga las instrucciones del cuaderno para agregar dos o más configuraciones de la canalización de datos que se va a ejecutar.
- Ejecute el cuaderno para ejecutar estas canalizaciones.
- Agregue los nombres de las ejecuciones de MLflow resultantes que se generan en la variable
DATA_PIPELINE_FIXES_RUN_NAMES
en el cuaderno B_quality_iteration/02_evaluate_fixes.
Apéndice
Nota:
Puede encontrar los cuadernos a los que se hace referencia a continuación en los directorios single_fix y multiple_fixes en función de si va a implementar una corrección única o varias correcciones a la vez.
Más detalles sobre las opciones de configuración
A continuación, se enumeran las distintas opciones de configuración implementadas de manera previa para la canalización de datos. Como alternativa, puede implementar un analizador o fragmentador personalizado.
vectorsearch_config
: especifique el punto de conexión del vector de búsqueda (debe estar activo y en ejecución) y el nombre del índice que se va a crear. Además, defina el tipo de sincronización entre la tabla de origen y el índice (el valor predeterminado esTRIGGERED
).embedding_config
: especifique el modelo de inserción que se va a usar, junto con el tokenizador. Para obtener una lista completa de las opciones, consulte el cuadernosupporting_configs/embedding_models
. El modelo de inserción debe desplegarse en un punto de conexión del servicio de modelos en funcionamiento. En función de la estrategia de fragmentación, el tokenizador también se divide para asegurarse de que los fragmentos no superan el límite de tokens del modelo de inserción. Los tokenizadores se usan aquí para contar el número de tokens en los fragmentos de texto para asegurarse de que no superan la longitud máxima del contexto del modelo de inserción seleccionado.
A continuación se muestra un tokenizador de HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
A continuación se muestra un tokenizador de TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: define el analizador de archivos, el fragmentador y la ruta de acceso al campo de orígenes. Los analizadores y los fragmentadores se definen en los cuadernosparser_library
ychunker_library
, respectivamente. Estos se pueden encontrar en los directorios single_fix y multiple_fixes. Para obtener una lista completa de las opciones, consulte el cuadernosupporting_configs/parser_chunker_strategies
, que está disponible de nuevo en los directorios de corrección única y múltiples. Los diferentes analizadores o fragmentadores pueden requerir parámetros de configuración diferentes, donde<param x>
representan los parámetros potenciales necesarios para un fragmentador específico. También se pueden pasar valores de configuración con el mismo formato a los analizadores.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementación de un analizador o un fragmentador personalizado
Este proyecto está estructurado para facilitar la adición de analizadores o fragmentadores personalizados a la canalización de preparación de datos.
Agregar un nuevo analizador
Supongamos que desea incorporar un nuevo analizador mediante la biblioteca PyMuPDF para transformar el texto analizado en formato Markdown. Siga estos pasos:
Instale las dependencias necesarias agregando el código siguiente al cuaderno
parser_library
en el directoriosingle_fix
omultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
En el cuaderno
parser_library
del directoriosingle_fix
omultiple_fix
, agregue una nueva sección para el analizadorPyMuPdfMarkdown
e implemente la función de análisis. Asegúrese de que la salida de la función cumple con la claseParserReturnValue
definida al principio del cuaderno. Esto garantiza la compatibilidad con las UDF de Spark. El bloquetry
oexcept
impide que Spark produzca un error en todo el trabajo de análisis debido a errores en documentos individuales al aplicar el analizador como UDF en el cuaderno02_parse_docs
en el directoriosingle_fix
omultiple_fix
. Este cuaderno comprobará si se produjo un error en el análisis de cualquier documento, pondrá en cuarentena las filas correspondientes y generará una advertencia.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Agregue la nueva función de análisis a
parser_factory
en el cuadernoparser_library
en el directoriosingle_fix
omultiple_fix
para que sea configurable enpipeline_config
del cuaderno00_config
.En el cuaderno
02_parse_docs
, las funciones del analizador se convierten en UDF de Python de Spark (optimizadas para Arrow para Databricks Runtime 14.0 o superior) y se aplican al DataFrame que contiene los nuevos archivos PDF binarios. Para pruebas y desarrollo, agregue una función de prueba sencilla al cuaderno parser_library que cargue el archivo test-document.pdf y confirme que el análisis se ha realizado correctamente:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Agregar un nuevo fragmentador
El proceso para agregar un nuevo fragmentador sigue pasos similares a los descritos anteriormente para un nuevo analizador.
- Agregue las dependencias necesarias en el cuaderno chunker_library.
- Agregue una nueva sección para el fragmentador e implemente una función, por ejemplo,
chunk_parsed_content_newchunkername
. La salida de la nueva función de fragmentador debe ser un diccionario de Python que cumpla con la claseChunkerReturnValue
definida al principio del cuaderno chunker_library. La función debe aceptar al menos una cadena del texto analizado que se va a fragmentar. Si el fragmentador requiere parámetros adicionales, puede agregarlos como parámetros de función. - Agregue el nuevo fragmentador a la función
chunker_factory
definida en el cuaderno chunker_library. Si la función acepta parámetros adicionales, use functools’ partial para configurarlos previamente. Esto es necesario porque las UDF solo aceptan un parámetro de entrada, que será el texto analizado en nuestro caso.chunker_factory
permite configurar diferentes métodos de fragmentador en pipeline_config y devuelve una UDF de Python de Spark (optimizada para Databricks Runtime 14.0 y versiones posteriores). - Agregue una sección de prueba sencilla para la nueva función de fragmentación. Esta sección debe fragmentar un texto predefinido proporcionado como una cadena.
Optimización del rendimiento
Spark utiliza particiones para paralelizar el procesamiento. Los datos se dividen en fragmentos de filas y cada partición se procesa mediante un único núcleo de forma predeterminada. Sin embargo, cuando Apache Spark lee inicialmente los datos, es posible que no cree particiones optimizadas para el cálculo deseado, especialmente para las UDF que realizan tareas de análisis y fragmentación. Es fundamental lograr un equilibrio entre la creación de particiones lo suficientemente pequeñas como para una paralelización eficaz y no tan pequeñas que la sobrecarga de administrarlas supere sus ventajas.
Puede ajustar el número de particiones mediante df.repartitions(<number of partitions>)
. Al aplicar UDF, apunte a un múltiplo del número de núcleos disponibles en los nodos de trabajo. Por ejemplo, en el cuaderno 02_parse_docs, puede incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
para crear dos veces más particiones del número de núcleos de trabajo disponibles. Por lo general, un múltiplo entre 1 y 3 debería producir un rendimiento satisfactorio.
Ejecución manual de la canalización
Como alternativa, puede ejecutar cada cuaderno individual paso a paso:
- Cargue los archivos sin procesar mediante el cuaderno
01_load_files
. Esto guarda cada archivo binario de documento como un registro en una tabla bronze (raw_files_table_name
) definida endestination_tables_config
. Los archivos se cargan de forma incremental, procesando solo los documentos nuevos desde la última ejecución. - Analice los documentos con el cuaderno
02_parse_docs
. Este cuaderno ejecuta el cuadernoparser_library
(asegúrese de ejecutarlo como la primera celda para reiniciar Python), lo que hace que diferentes analizadores y utilidades relacionadas estén disponibles. A continuación, usa el analizador especificado enpipeline_config
para analizar cada documento en texto sin formato. Por ejemplo, se capturan metadatos relevantes como el número de páginas del PDF original junto con el texto analizado. Los documentos analizados correctamente se almacenan en una tabla silver (parsed_docs_table_name
), mientras que los documentos no analizados se ponen en cuarentena en una tabla correspondiente. - Fragmenta los documentos analizados mediante el cuaderno
03_chunk_docs
. De forma similar al análisis, este cuaderno ejecuta el cuadernochunker_library
(de nuevo, ejecútelo como la primera celda). Divide cada documento analizado en fragmentos más pequeños mediante el fragmentador especificado depipeline_config
. A cada fragmento se le asigna un identificador único mediante un hash MD5, necesario para la sincronización con el índice de vector de búsqueda. Los fragmentos finales se cargan en una tabla gold (chunked_docs_table_name
). - Cree o sincronice el índice de vector de búsqueda con
04_vector_index
. Este cuaderno comprueba la preparación del punto de conexión del vector de búsqueda especificado envectorsearch_config
. Si el índice configurado ya existe, inicia la sincronización con la tabla gold; de lo contrario, crea el índice y desencadena la sincronización. Se espera que esto tarde un momento si aún no se han creado el punto de conexión ni el índice de vector de búsqueda.
Paso siguiente
Continúe con el Paso 7. Implementar y supervisar.