Поделиться через


Шаг 6 (конвейеры). Реализация исправлений конвейера данных

Конвейер данных

Выполните следующие действия, чтобы изменить конвейер данных и запустить его в:

  1. Создайте новый векторный индекс.
  2. Создайте запуск MLflow с метаданными конвейера данных.

Результирующий запуск MLflow ссылается на записную книжку B_quality_iteration/02_evaluate_fixes .

Существует два подхода к изменению конвейера данных:

  • Реализуйте одно исправление одновременно в этом подходе, вы настраиваете и запускаете один конвейер данных одновременно. Этот режим лучше всего подходит, если вы хотите попробовать одну модель внедрения и протестировать один новый средство синтаксического анализа. Databricks предлагает ознакомиться с этими записными книжками.
  • Реализуйте несколько исправлений одновременно в этом подходе, который также называется очисткой, вы параллельно запускаете несколько конвейеров данных, которые имеют другую конфигурацию. Этот режим лучше всего подходит, если вы хотите "проверить" множество различных стратегий, например оценить три средства синтаксического анализа PDF или оценить множество различных размеров блоков.

См. репозиторий GitHub для примера кода в этом разделе.

Подход 1. Реализация одного исправления за раз

  1. Откройте записную книжку B_quality_iteration/data_pipeline_fixes/single_fix/00_config
  2. Следуйте инструкциям, приведенным ниже.
  3. Запустите конвейер, выполнив следующие действия:
    • Открытие и запуск записной книжки 00_Run_Entire_Pipeline.
    • Выполните шаги, чтобы выполнить каждый шаг конвейера вручную.
  4. Добавьте имя результирующего запуска MLflow, выходного в DATA_PIPELINE_FIXES_RUN_NAMES переменную в записной книжке B_quality_iteration/02_evaluate_fixes

Примечание.

Конвейер подготовки данных использует структурированную потоковую передачу Spark для добавочной загрузки и обработки файлов. Это означает, что файлы, уже загруженные и подготовленные, отслеживаются в контрольных точках и не будут повторно обработаны. Только новые добавленные файлы будут загружены, подготовлены и добавлены в соответствующие таблицы.

Таким образом, если вы хотите повторно запустить весь конвейер с нуля и повторно обработать все документы, необходимо удалить контрольные точки и таблицы. Это можно сделать с помощью записной книжки reset_tables_and_checkpoints .

Подход 2. Реализация нескольких исправлений одновременно

  1. Откройте записную книжку B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
  2. Следуйте инструкциям в записной книжке, чтобы добавить две или более конфигурации конвейера данных для запуска.
  3. Запустите записную книжку для выполнения этих конвейеров.
  4. Добавьте имена полученных запусков MLflow, выходных DATA_PIPELINE_FIXES_RUN_NAMES в переменную в записной книжке B_quality_iteration/02_evaluate_fixes .

Приложение

Примечание.

Записные книжки, указанные ниже, можно найти в каталогах single_fix и multiple_fixes в зависимости от того, реализуете ли вы одно исправление или несколько исправлений одновременно.

Подробные сведения о параметрах конфигурации

Ниже перечислены различные предварительно реализованные параметры конфигурации для конвейера данных. Кроме того, можно реализовать пользовательский синтаксический анализатор или блок-модуль.

  • vectorsearch_config: укажите конечную точку векторного поиска (должна быть запущена и запущена) и имя создаваемого индекса. Кроме того, определите тип синхронизации между исходной таблицей и индексом (по TRIGGEREDумолчанию).
  • embedding_config: укажите используемую модель внедрения вместе с токенизатором. Полный список параметров см. в записной книжке supporting_configs/embedding_models . Модель внедрения должна быть развернута в запущенной конечной точке обслуживания модели. В зависимости от стратегии фрагментирования маркеризатор также во время разделения, чтобы убедиться, что блоки не превышают предел маркера модели внедрения. Маркеризаторы используются здесь для подсчета количества маркеров в текстовых блоках, чтобы убедиться, что они не превышают максимальную длину контекста выбранной модели внедрения.

Ниже показан маркеризатор из HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Ниже показан токенизатор из TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: определяет средство синтаксического анализа файлов, блокировщик и путь к полю источников. Синтаксический анализ и блоки определяются в parser_library записных chunker_library книжках соответственно. Их можно найти в каталогах single_fix и multiple_fixes . Полный список параметров см supporting_configs/parser_chunker_strategies . в записной книжке, которая снова доступна как в одном, так и в нескольких каталогах исправлений. Для разных синтаксического анализа или блокировщиков могут потребоваться различные параметры конфигурации, в которых <param x> представлены потенциальные параметры, необходимые для конкретного блока. Синтаксический анализ также можно передать значения конфигурации с помощью одного формата.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Реализация пользовательского средства синтаксического анализа или блока

Этот проект структурирован для упрощения добавления пользовательских анализаторов или блоков в конвейер подготовки данных.

Добавление нового средства синтаксического анализа

Предположим, вы хотите включить новый средство синтаксического анализа с помощью библиотеки PyMuPDF для преобразования синтаксического текста в формат Markdown. Выполните следующие действия:

  1. Установите необходимые зависимости, добавив следующий код в записную книжку parser_library или single_fixmultiple_fix каталог:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. В записной книжке parser_library в single_fixmultiple_fix каталоге добавьте новый раздел для PyMuPdfMarkdown средства синтаксического анализа и реализуйте функцию синтаксического анализа. Убедитесь, что выходные данные функции соответствуют классу ParserReturnValue , определенному в начале записной книжки. Это гарантирует совместимость с пользовательскими файлами Spark. try При except применении средства синтаксического анализа в записной книжке в записной книжке в 02_parse_docs записной книжке или single_fixmultiple_fix каталоге не удается выполнить синтаксический анализ или заблокировать блокировку. Эта записная книжка проверяет, не удалось ли синтаксический анализ для любого документа, заместите соответствующие строки в карантин и вызовет предупреждение.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Добавьте новую функцию parser_factory синтаксического анализа в parser_library записную книжку или single_fixmultiple_fix каталог, чтобы сделать ее настраиваемой в pipeline_config записной книжке 00_config .

  4. В записной книжке 02_parse_docs функции синтаксического анализа превратятся в определяемые пользователем Spark Python (оптимизированные со стрелками для Databricks Runtime 14.0 или более поздней версии) и применяются к кадру данных с новыми двоичными PDF-файлами. Для тестирования и разработки добавьте простую функцию тестирования в записную книжку parser_library, которая загружает файл test-document.pdf и утверждает успешный анализ:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Добавление нового блока

Процесс добавления нового блокировщика выполняет аналогичные действия, описанные выше для нового средства синтаксического анализа.

  1. Добавьте необходимые зависимости в записную книжку chunker_library .
  2. Добавьте новый раздел для блокировщика и реализуйте функцию, например chunk_parsed_content_newchunkername. Выходные данные новой функции блокировщика должны быть словарем Python, который соответствует ChunkerReturnValue классу, определенному в начале записной книжки chunker_library . Функция должна принимать по крайней мере строку синтаксического текста для фрагментации. Если блокировщик требует дополнительных параметров, их можно добавить в качестве параметров функции.
  3. Добавьте новый блокировщик в функцию, chunker_factory определенную в chunker_library записной книжке. Если функция принимает дополнительные параметры, используйте частичные функции functools для предварительной настройки. Это необходимо, так как определяемые пользователем функции принимают только один входной параметр, который будет проанализированным текстом в нашем случае. Позволяет chunker_factory настроить различные методы блокировщика в pipeline_config и возвратить UDF Python Spark (оптимизирован для Databricks Runtime 14.0 и более поздних версий).
  4. Добавьте простой раздел тестирования для новой функции блокирования. В этом разделе должен быть фрагмент предопределенного текста, предоставленного как строка.

Настройка производительности

Spark использует секции для параллелизации обработки. Данные делятся на блоки строк, и каждая секция обрабатывается одним ядром по умолчанию. Однако при первоначальном чтении данных Apache Spark он может не создавать секции, оптимизированные для требуемых вычислений, особенно для наших определяемых пользователем функций, выполняющих синтаксический анализ и задачи блокирования. Важно обеспечить баланс между созданием секций, которые достаточно малы для эффективной параллелизации и не так мало, что затраты на управление ими перевешивают преимущества.

Вы можете настроить количество секций с помощью df.repartitions(<number of partitions>). При применении определяемых пользователем объектов стремитесь к кратности числа ядер, доступных на рабочих узлах. Например, в записной книжке 02_parse_docs можно включить df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) создание в два раза больше секций, чем количество доступных рабочих ядер. Как правило, несколько от 1 до 3 должны обеспечить удовлетворительную производительность.

Запуск конвейера вручную

Кроме того, можно выполнить каждую отдельную записную книжку пошаговые инструкции.

  1. Загрузите необработанные файлы с помощью записной книжки 01_load_files . При этом каждый двоичный файл документа сохраняется как одна запись в бронзовой таблице (raw_files_table_name), определенной в элементе destination_tables_config. Файлы загружаются постепенно, обрабатывая только новые документы с момента последнего запуска.
  2. Анализ документов с помощью записной книжки 02_parse_docs . Эта записная книжка parser_library выполняет записную книжку (убедитесь, что она выполняется в качестве первой ячейки для перезапуска Python), что делает различные средства синтаксического анализа и связанные служебные программы доступными. Затем он использует указанный средство синтаксического анализа в pipeline_config синтаксическом анализе каждого документа в виде обычного текста. Например, фиксируются соответствующие метаданные, такие как количество страниц исходного PDF-файла вместе с проанализированным текстом. Успешно проанализированные документы хранятся в серебряной таблице (parsed_docs_table_name), а все неподпарированные документы помещаются в соответствующую таблицу.
  3. Блокируют проанализированные документы с помощью записной книжки 03_chunk_docs . Аналогично синтаксическому анализу, эта записная книжка выполняет записную книжку chunker_library (снова запустите как первую ячейку). Он разделяет каждый проанализированный документ на небольшие блоки с помощью указанного pipeline_configфрагмента из . Каждому блоку присваивается уникальный идентификатор с помощью хэша MD5, необходимого для синхронизации с индексом векторного поиска. Последние блоки загружаются в золотую таблицу (chunked_docs_table_name).
  4. Создание и синхронизация индекса векторного поиска с помощью 04_vector_index. Эта записная книжка проверяет готовность указанной конечной точки поиска векторов в этой записной книжке vectorsearch_config. Если настроенный индекс уже существует, он инициирует синхронизацию с золотой таблицей; в противном случае он создает индекс и активирует синхронизацию. Ожидается, что это займет некоторое время, если конечная точка и индекс векторного поиска еще не созданы.

Следующий шаг

Перейдите к шагу 7. Развертывание и мониторинг.

< назад: шаг 6. Итеративно устранять проблемы с качеством

Далее: шаг 7. Разверните & и мониторьте POC >