다음을 통해 공유


병렬 실행 단계를 SDK v2로 업그레이드

SDK v2에서 "병렬 실행 단계"는 parallel job으로 작업 개념에 통합됩니다. 병렬 작업은 사용자가 강력한 다중 노드 컴퓨팅 클러스터에 반복 작업을 배포하여 작업 실행을 가속화할 수 있도록 동일한 대상을 유지합니다. 병렬 실행 단계 외에 v2 병렬 작업은 다음과 같은 추가 이점을 제공합니다.

  • 유연한 인터페이스 - 사용자가 병렬 작업에 대한 여러 사용자 지정 입력 및 출력을 정의할 수 있습니다. 다른 단계에 연결하여 항목 스크립트에서 콘텐츠를 사용하거나 관리할 수 있습니다.
  • v2 data asset 개념을 사용하여 Dataset을 입력으로 대체하는 입력 스키마를 간소화합니다. 로컬 파일 또는 Blob 디렉터리 URI를 병렬 작업에 대한 입력으로 쉽게 사용할 수 있습니다.
  • 더 강력한 기능은 v2 병렬 작업에서만 개발 중입니다. 예를 들어 중복된 노력을 방지하기 위해 성공적인 결과를 다시 사용하여, 실패한/취소된 병렬 작업을 다시 시작하여 실패하거나 처리되지 않은 미니 일괄 작업을 계속 처리합니다.

현재 sdk v1 병렬 실행 단계를 v2로 업그레이드하려면

  • v1에서 ParallelRunConfigParallelRunStep을 대체하여 병렬 작업을 만드는 데 parallel_run_function을 사용합니다.
  • v1 파이프라인을 v2로 업그레이드합니다. 그런 다음 v2 병렬 작업을 v2 파이프라인의 단계로 호출합니다. 파이프라인 업그레이드에 대한 자세한 내용은 v1에서 v2로 파이프라인을 업그레이드하는 방법을 참조하세요.

참고: 사용자 항목 스크립트는 v1 병렬 실행 단계와 v2 병렬 작업 간에 호환됩니다. 따라서 병렬 실행 작업을 업그레이드할 때 동일한 entry_script.py 계속 사용할 수 있습니다.

이 문서에서는 SDK v1과 SDK v2의 시나리오를 비교합니다. 다음 예제에서는 파이프라인 작업의 입력 데이터를 예측하는 병렬 작업을 빌드합니다. 병렬 작업을 빌드하는 방법과 SDK v1 및 SDK v2 모두에 대한 파이프라인 작업에서 이를 사용하는 방법을 확인할 수 있습니다.

필수 조건

병렬 단계 만들기

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

파이프라인에서 병렬 단계 사용

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

SDK v1 및 SDK v2의 주요 기능 매핑

SDK v1의 기능 SDK v2의 대략적인 매핑
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig 출력
dataset as_mount 입력

병렬 작업 구성 및 설정 매핑

SDK v1 SDK v2 설명
ParallelRunConfig.environment parallel_run_function.task.environment 학습 작업이 실행되는 환경입니다.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script 여러 노드에서 병렬로 실행될 사용자 스크립트입니다.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold 이 병렬 작업에서 무시할 수 있는 실패한 미니 일괄 처리의 수입니다. 실패한 미니 일괄 처리 수가 이 임계값보다 높으면 병렬 작업이 실패한 것으로 표시됩니다.

“-1”은 기본 수로, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시한다는 의미입니다.
ParallelRunConfig.output_action parallel_run_function.append_row_to 각 미니 일괄 처리 실행의 모든 반환을 집계하고 이 파일에 출력합니다. ${{outputs.<output_name>}} 식을 사용하여 병렬 작업의 출력 중 하나를 참조할 수 있습니다.
ParallelRunConfig.node_count parallel_run_function.instance_count 컴퓨팅 대상에서 사용하는 인스턴스 또는 노드의 선택적 수입니다. 기본값은 1입니다.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance 각 컴퓨팅 인스턴스에 있는 최대 병렬 처리입니다.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size 입력을 분할할 각 미니 일괄 처리의 크기를 정의합니다.

input_data가 폴더 또는 파일 집합인 경우 이 숫자는 각 미니 일괄 처리에 대한 파일 개수를 정의합니다. 예를 들어 10, 100입니다.

input_data가 mltable의 테이블 형식 데이터인 경우 이 숫자는 각 미니 일괄 처리에 대한 대략적인 물리적 크기를 정의합니다. 기본 단위는 바이트이고 값은 100kb, 100mb와 같은 문자열을 허용할 수 있습니다.
ParallelRunConfig.source_directory parallel_run_function.task.code 소스 코드를 가리키는 로컬 또는 원격 경로입니다.
ParallelRunConfig.description parallel_run_function.description 병렬에 대한 알기 쉬운 설명입니다.
ParallelRunConfig.logging_level parallel_run_function.logging_level ‘logging’에 정의된 로깅 수준 이름의 문자열입니다. 가능한 값은 ‘WARNING’, ‘INFO’, ‘DEBUG’입니다. (선택 사항, 기본값은 'INFO'입니다.) 이 값은 PipelineParameter를 통해 설정할 수 있습니다.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout 사용자 지정 run() 함수를 실행하기 위한 시간 제한(초)입니다. 실행 시간이 이 임계값보다 길면 미니 일괄 처리가 중단되고, 다시 시도를 트리거하는 데 실패한 미니 일괄 처리로 표시됩니다.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries 미니 일괄 처리가 실패하거나 시간 초과가 발생한 경우의 재시도 횟수입니다. 다시 시도에 모두 실패하면 미니 일괄 처리가 mini_batch_error_threshold 계산에 따라 계산하는 데 실패한 것으로 표시됩니다.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to 설정과 append_row_to 결합합니다.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold 이 병렬 작업에서 무시할 수 있는 실패한 미니 일괄 처리의 수입니다. 실패한 미니 일괄 처리 수가 이 임계값보다 높으면 병렬 작업이 실패한 것으로 표시됩니다.

“-1”은 기본 수로, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시한다는 의미입니다.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
"allowed_failed_count"와 비슷하지만 이 설정은 미니 일괄 처리 실패 횟수 대신 실패한 미니 일괄 처리의 백분율을 사용합니다.

이 설정의 범위는 [0, 100]입니다. "100"은 기본 숫자입니다. 즉, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시합니다.
ParallelRunConfig.partition_keys 개발 중입니다.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables 환경 변수 이름 및 값의 사전입니다. 이러한 환경 변수는 사용자 스크립트가 실행되는 프로세스에서 설정됩니다.
ParallelRunStep.name parallel_run_function.name 생성된 병렬 작업 또는 구성 요소의 이름입니다.
ParallelRunStep.inputs parallel_run_function.inputs 이 병렬에서 사용하는 입력의 받아쓰기입니다.
-- parallel_run_function.input_data 병렬로 분할 및 처리할 데이터를 선언합니다.
ParallelRunStep.output parallel_run_function.outputs 이 병렬 작업의 출력입니다.
ParallelRunStep.side_inputs parallel_run_function.inputs inputs와 함께 정의됩니다.
ParallelRunStep.arguments parallel_run_function.task.program_arguments 병렬 작업의 인수입니다.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic 동일한 입력이 지정된 경우 병렬에서 동일한 출력을 반환할지 여부를 지정합니다.

다음 단계

자세한 내용은 다음 설명서를 참조하세요.