다음을 통해 공유


6단계(파이프라인). 데이터 파이프라인 수정 사항 구현

데이터 파이프라인

다음 단계에 따라 데이터 파이프라인을 수정하고 실행합니다.

  1. 새 벡터 인덱스 만들기
  2. 데이터 파이프라인의 메타데이터를 사용하여 MLflow 실행을 만듭니다.

결과 MLflow 실행은 Notebook에서 참조됩니다 B_quality_iteration/02_evaluate_fixes .

데이터 파이프라인을 수정하는 방법에는 두 가지가 있습니다.

  • 한 번에 단일 수정을 구현하는 이 방법에서는 단일 데이터 파이프라인을 한 번에 구성하고 실행합니다. 이 모드는 단일 포함 모델을 시도하고 단일 새 파서 테스트하려는 경우에 가장 적합합니다. Databricks는 이러한 Notebook에 익숙해지기 위해 여기에서 시작하는 것이 좋습니다.
  • 한 번에 여러 수정을 구현합니다. 스윕이라고도 하는 이 접근 방식에서는 각각 다른 구성이 있는 여러 데이터 파이프라인을 병렬로 실행합니다. 예를 들어 세 개의 PDF 파서를 평가하거나 다양한 청크 크기를 평가하는 등 다양한 전략에서 "스윕"하려는 경우 이 모드가 가장 좋습니다.

이 섹션의 샘플 코드는 GitHub 리포지토리에서 확인하세요.

방법 1: 한 번에 단일 수정 구현

  1. B_quality_iteration/data_pipeline_fixes/single_fix/00_config Notebook 열기
  2. 아래 중 하나의 지침을 따릅니다.
  3. 다음 중 하나를 사용하여 파이프라인을 실행합니다.
  4. B_quality_iteration/02_evaluate_fixes Notebook의 변수에 출력되는 결과 MLflow Run의 DATA_PIPELINE_FIXES_RUN_NAMES 이름을 추가합니다.

참고 항목

데이터 준비 파이프라인은 Spark 구조적 스트리밍을 사용하여 파일을 증분 방식으로 로드하고 처리합니다. 이렇게 하면 이미 로드되고 준비된 파일이 검사점에서 추적되고 다시 처리되지 않습니다. 새로 추가된 파일만 해당 테이블에 로드, 준비 및 추가됩니다.

따라서 전체 파이프라인을 처음부터 다시 실행하고 모든 문서를 다시 처리하려면 검사점과 테이블을 삭제해야 합니다. reset_tables_and_checkpoints Notebook을 사용하여 이 작업을 수행할 수 있습니다.

방법 2: 한 번에 여러 수정 구현

  1. B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines Notebook을 엽니다.
  2. Notebook의 지침에 따라 실행할 데이터 파이프라인의 구성을 두 개 이상 추가합니다.
  3. Notebook을 실행하여 이러한 파이프라인을 실행합니다.
  4. B_quality_iteration/02_evaluate_fixes Notebook의 변수에 출력되는 결과 MLflow 실행의 DATA_PIPELINE_FIXES_RUN_NAMES 이름을 추가합니다.

부록

참고 항목

한 번에 단일 수정 또는 여러 수정을 구현하는지에 따라 single_fixmultiple_fixes 디렉터리에서 아래에서 참조되는 Notebook을 찾을 수 있습니다.

구성 설정 심층 분석

데이터 파이프라인에 대해 미리 구현된 다양한 구성 옵션이 아래에 나열되어 있습니다. 또는 사용자 지정 파서/청커구현할 수 있습니다.

  • vectorsearch_config: 벡터 검색 엔드포인트(실행 중이어야 합니다)와 만들 인덱스의 이름을 지정합니다. 또한 원본 테이블과 인덱스 간의 동기화 형식을 정의합니다(기본값은 TRIGGERED).
  • embedding_config: tokenizer와 함께 사용할 포함 모델을 지정합니다. 전체 옵션 목록은 Notebook을 참조하세요 supporting_configs/embedding_models . 포함 모델은 엔드포인트를 제공하는 실행 중인 모델에 배포되어야 합니다. 청크 분할 전략에 따라 토케나이저는 분할 중에 청크가 포함 모델의 토큰 제한을 초과하지 않도록 합니다. Tokenizer는 선택한 포함 모델의 최대 컨텍스트 길이를 초과하지 않도록 텍스트 청크의 토큰 수를 계산하는 데 사용됩니다.

다음은 HuggingFace의 토케나이저를 보여줍니다.

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

다음은 TikToken의 토큰 변환기를 보여줍니다.

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: 소스 필드의 파일 파서, 청커 및 경로를 정의합니다. 파서와 청크는 각각 Notebook 및 chunker_library Notebook에 parser_library 정의됩니다. single_fix 및 multiple_fixes 디렉터리에서 찾을 수 있습니다. 전체 옵션 목록은 단일 및 여러 수정 디렉터리 모두에서 다시 사용할 수 있는 Notebook을 참조 supporting_configs/parser_chunker_strategies 하세요. 파서나 청커는 특정 청커에 필요한 잠재적인 매개 변수 <param x> 를 나타내는 서로 다른 구성 매개 변수가 필요할 수 있습니다. 파서는 동일한 형식을 사용하여 구성 값을 전달할 수도 있습니다.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

사용자 지정 파서/청커 구현

이 프로젝트는 데이터 준비 파이프라인에 사용자 지정 파서 또는 청커를 쉽게 추가할 수 있도록 구조화되었습니다.

새 파서 추가

PyMuPDF 라이브러리를 사용하여 구문 분석된 텍스트를 Markdown 형식으로 변환하는 새 파서가 통합되었다고 가정합니다. 다음 단계를 수행합니다.

  1. 또는 multiple_fix 디렉터리의 Notebook single_fix 에 다음 코드를 parser_library 추가하여 필요한 종속성을 설치합니다.

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. parser_library 또는 multiple_fix 디렉터리의 Notebook에서 single_fix 파서에 대한 새 섹션을 PyMuPdfMarkdown 추가하고 구문 분석 함수를 구현합니다. 함수의 출력이 Notebook의 시작 부분에 정의된 클래스를 ParserReturnValue 준수하는지 확인합니다. 이렇게 하면 Spark UDF와의 호환성이 보장됩니다. 또는 블록은 try 파서를 전자 필 single_fix 기장 또는 multiple_fix 디렉터리의 UDF 02_parse_docs 로 적용할 때 개별 문서의 오류로 인해 Spark가 전체 구문 분석 작업에 실패하지 않도록 방지 except 합니다. 이 Notebook은 문서에 대해 구문 분석이 실패했는지 확인하고, 해당 행을 격리하고, 경고를 발생합니다.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Notebook에서 parser_library 구성할 수 00_config pipeline_config 있도록 새 구문 분석 함수 parser_factory 를 전자 필 single_fix 기장의 전자 필기장 또는 multiple_fix 디렉터리에 추가합니다.

  4. 02_parse_docs Notebook에서 파서 함수는 Spark Python UDF(Databricks Runtime 14.0 이상에 대해 화살표 최적화됨)로 바뀌고 새 이진 PDF 파일이 포함된 데이터 프레임에 적용됩니다. 테스트 및 개발을 위해 test-document.pdf 파일을 로드하고 성공적인 구문 분석을 어설션하는 간단한 테스트 함수를 parser_library Notebook에 추가합니다.

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

새 청커 추가

새 청커를 추가하는 프로세스는 새 파서에 대해 위에서 설명한 것과 유사한 단계를 따릅니다.

  1. chunker_library Notebook에 필요한 종속성을 추가합니다.
  2. 청커에 대한 새 섹션을 추가하고 함수를 구현합니다(예: ) chunk_parsed_content_newchunkername. 새 청커 함수의 출력은 chunker_library Notebook의 시작 부분에 정의된 클래스를 ChunkerReturnValue 준수하는 Python 사전이어야 합니다. 함수는 구문 분석된 텍스트의 문자열을 청크로 묶을 수 있도록 허용해야 합니다. 청커에 추가 매개 변수가 필요한 경우 함수 매개 변수로 추가할 수 있습니다.
  3. chunker_library Notebook에 정의된 함수에 새 청커 chunker_factory추가합니다. 함수가 추가 매개 변수를 허용하는 경우 펑툴의 일부를 사용하여 미리 구성합니다. UDF는 하나의 입력 매개 변수만 허용하므로 이 매개 변수는 구문 분석된 텍스트입니다. 이를 chunker_factory 통해 pipeline_config 다양한 청커 메서드를 구성하고 Spark Python UDF(Databricks Runtime 14.0 이상에 최적화됨)를 반환할 수 있습니다.
  4. 새 청크 함수에 대한 간단한 테스트 섹션을 추가합니다. 이 섹션에서는 문자열로 제공된 미리 정의된 텍스트를 청크해야 합니다.

성능 튜닝

Spark는 파티션을 활용하여 처리를 병렬 처리합니다. 데이터는 행 청크로 나뉘며 각 파티션은 기본적으로 단일 코어로 처리됩니다. 그러나 Apache Spark에서 데이터를 처음 읽는 경우, 특히 구문 분석 및 청크 작업을 수행하는 UDF에 대해 원하는 계산에 최적화된 파티션을 만들지 못할 수 있습니다. 효율적인 병렬 처리를 위해 충분히 작고 너무 작지 않은 파티션을 만드는 것 사이의 균형을 맞추는 것이 중요하므로 관리 오버헤드가 이점보다 큽니다.

를 사용하여 df.repartitions(<number of partitions>)파티션 수를 조정할 수 있습니다. UDF를 적용할 때 작업자 노드에서 사용할 수 있는 코어 수의 배수를 목표로 합니다. 예를 들어 02_parse_docs Notebook에서 사용 가능한 작업자 코어 수보다 두 배 많은 파티션을 만들도록 포함 df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) 할 수 있습니다. 일반적으로 1에서 3 사이의 배수는 만족스러운 성능을 생성해야 합니다.

파이프라인을 수동으로 실행

또는 각 개별 Notebook을 단계별로 실행할 수 있습니다.

  1. Notebook을 사용하여 원시 파일을 로드합니다 01_load_files . 이렇게 하면 각 문서 이진 파일이 에 정의된 브론즈 테이블(raw_files_table_name)에 하나의 레코드로 저장됩니다 destination_tables_config. 파일은 증분 방식으로 로드되며 마지막 실행 이후 새 문서만 처리합니다.
  2. 전자 필기장을 사용하여 문서를 구문 분석합니다 02_parse_docs . 이 Notebook은 Notebook을 parser_library 실행하여(Python을 다시 시작하는 첫 번째 셀로 실행) 다른 파서 및 관련 유틸리티를 사용할 수 있도록 합니다. 그런 다음, 지정된 파서를 pipeline_config 사용하여 각 문서를 일반 텍스트로 구문 분석합니다. 예를 들어 구문 분석된 텍스트와 함께 원래 PDF의 페이지 수와 같은 관련 메타데이터가 캡처됩니다. 구문 분석된 문서는 실버 테이블()parsed_docs_table_name에 저장되고, 구문 분석되지 않은 모든 문서는 해당 테이블로 격리됩니다.
  3. Notebook을 사용하여 구문 분석된 문서를 청크합니다 03_chunk_docs . 구문 분석과 마찬가지로 이 Notebook은 Notebook을 chunker_library 실행합니다(다시 첫 번째 셀로 실행). 에서 지정된 청커를 사용하여 구문 분석된 각 문서를 더 작은 청크 pipeline_config로 분할합니다. 각 청크에는 벡터 검색 인덱스와 동기화하는 데 필요한 MD5 해시를 사용하여 고유 ID가 할당됩니다. 최종 청크는 골드 테이블(chunked_docs_table_name)에 로드됩니다.
  4. 벡터 검색 인덱스 만들기/동기화 04_vector_index 이 Notebook은 .에서 vectorsearch_config지정된 벡터 검색 엔드포인트의 준비 상태를 확인합니다. 구성된 인덱스가 이미 있는 경우 골드 테이블과의 동기화를 시작합니다. 그렇지 않으면 인덱스를 만들고 동기화를 트리거합니다. Vector Search 엔드포인트 및 인덱스가 아직 만들어지지 않은 경우 시간이 걸릴 것으로 예상됩니다.

다음 단계

7단계를 계속 진행합니다. 배포 및 모니터.