병렬 실행 단계를 SDK v2로 업그레이드
SDK v2에서 "병렬 실행 단계"는 parallel job
으로 작업 개념에 통합됩니다. 병렬 작업은 사용자가 강력한 다중 노드 컴퓨팅 클러스터에 반복 작업을 배포하여 작업 실행을 가속화할 수 있도록 동일한 대상을 유지합니다. 병렬 실행 단계 외에 v2 병렬 작업은 다음과 같은 추가 이점을 제공합니다.
- 유연한 인터페이스 - 사용자가 병렬 작업에 대한 여러 사용자 지정 입력 및 출력을 정의할 수 있습니다. 다른 단계에 연결하여 항목 스크립트에서 콘텐츠를 사용하거나 관리할 수 있습니다.
- v2
data asset
개념을 사용하여Dataset
을 입력으로 대체하는 입력 스키마를 간소화합니다. 로컬 파일 또는 Blob 디렉터리 URI를 병렬 작업에 대한 입력으로 쉽게 사용할 수 있습니다. - 더 강력한 기능은 v2 병렬 작업에서만 개발 중입니다. 예를 들어 중복된 노력을 방지하기 위해 성공적인 결과를 다시 사용하여, 실패한/취소된 병렬 작업을 다시 시작하여 실패하거나 처리되지 않은 미니 일괄 작업을 계속 처리합니다.
현재 sdk v1 병렬 실행 단계를 v2로 업그레이드하려면
- v1에서
ParallelRunConfig
및ParallelRunStep
을 대체하여 병렬 작업을 만드는 데parallel_run_function
을 사용합니다. - v1 파이프라인을 v2로 업그레이드합니다. 그런 다음 v2 병렬 작업을 v2 파이프라인의 단계로 호출합니다. 파이프라인 업그레이드에 대한 자세한 내용은 v1에서 v2로 파이프라인을 업그레이드하는 방법을 참조하세요.
참고: 사용자 항목 스크립트는 v1 병렬 실행 단계와 v2 병렬 작업 간에 호환됩니다. 따라서 병렬 실행 작업을 업그레이드할 때 동일한 entry_script.py 계속 사용할 수 있습니다.
이 문서에서는 SDK v1과 SDK v2의 시나리오를 비교합니다. 다음 예제에서는 파이프라인 작업의 입력 데이터를 예측하는 병렬 작업을 빌드합니다. 병렬 작업을 빌드하는 방법과 SDK v1 및 SDK v2 모두에 대한 파이프라인 작업에서 이를 사용하는 방법을 확인할 수 있습니다.
필수 조건
- SDK v2 환경 준비: Python용 Azure Machine Learning SDK v2 설치
- SDK v2 파이프라인의 기초 이해: Python SDK v2를 사용하여 Azure Machine Learning 파이프라인을 만드는 방법
병렬 단계 만들기
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
파이프라인에서 병렬 단계 사용
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
SDK v1 및 SDK v2의 주요 기능 매핑
SDK v1의 기능 | SDK v2의 대략적인 매핑 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | 출력 |
dataset as_mount | 입력 |
병렬 작업 구성 및 설정 매핑
SDK v1 | SDK v2 | 설명 |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | 학습 작업이 실행되는 환경입니다. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | 여러 노드에서 병렬로 실행될 사용자 스크립트입니다. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | 이 병렬 작업에서 무시할 수 있는 실패한 미니 일괄 처리의 수입니다. 실패한 미니 일괄 처리 수가 이 임계값보다 높으면 병렬 작업이 실패한 것으로 표시됩니다. “-1”은 기본 수로, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시한다는 의미입니다. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | 각 미니 일괄 처리 실행의 모든 반환을 집계하고 이 파일에 출력합니다. ${{outputs.<output_name>}} 식을 사용하여 병렬 작업의 출력 중 하나를 참조할 수 있습니다. |
ParallelRunConfig.node_count | parallel_run_function.instance_count | 컴퓨팅 대상에서 사용하는 인스턴스 또는 노드의 선택적 수입니다. 기본값은 1입니다. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | 각 컴퓨팅 인스턴스에 있는 최대 병렬 처리입니다. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | 입력을 분할할 각 미니 일괄 처리의 크기를 정의합니다. input_data가 폴더 또는 파일 집합인 경우 이 숫자는 각 미니 일괄 처리에 대한 파일 개수를 정의합니다. 예를 들어 10, 100입니다. input_data가 mltable 의 테이블 형식 데이터인 경우 이 숫자는 각 미니 일괄 처리에 대한 대략적인 물리적 크기를 정의합니다. 기본 단위는 바이트이고 값은 100kb, 100mb와 같은 문자열을 허용할 수 있습니다. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | 소스 코드를 가리키는 로컬 또는 원격 경로입니다. |
ParallelRunConfig.description | parallel_run_function.description | 병렬에 대한 알기 쉬운 설명입니다. |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | ‘logging’에 정의된 로깅 수준 이름의 문자열입니다. 가능한 값은 ‘WARNING’, ‘INFO’, ‘DEBUG’입니다. (선택 사항, 기본값은 'INFO'입니다.) 이 값은 PipelineParameter를 통해 설정할 수 있습니다. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | 사용자 지정 run() 함수를 실행하기 위한 시간 제한(초)입니다. 실행 시간이 이 임계값보다 길면 미니 일괄 처리가 중단되고, 다시 시도를 트리거하는 데 실패한 미니 일괄 처리로 표시됩니다. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | 미니 일괄 처리가 실패하거나 시간 초과가 발생한 경우의 재시도 횟수입니다. 다시 시도에 모두 실패하면 미니 일괄 처리가 mini_batch_error_threshold 계산에 따라 계산하는 데 실패한 것으로 표시됩니다. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | 설정과 append_row_to 결합합니다. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | 이 병렬 작업에서 무시할 수 있는 실패한 미니 일괄 처리의 수입니다. 실패한 미니 일괄 처리 수가 이 임계값보다 높으면 병렬 작업이 실패한 것으로 표시됩니다. “-1”은 기본 수로, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시한다는 의미입니다. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
"allowed_failed_count"와 비슷하지만 이 설정은 미니 일괄 처리 실패 횟수 대신 실패한 미니 일괄 처리의 백분율을 사용합니다. 이 설정의 범위는 [0, 100]입니다. "100"은 기본 숫자입니다. 즉, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시합니다. |
ParallelRunConfig.partition_keys | 개발 중입니다. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | 환경 변수 이름 및 값의 사전입니다. 이러한 환경 변수는 사용자 스크립트가 실행되는 프로세스에서 설정됩니다. |
ParallelRunStep.name | parallel_run_function.name | 생성된 병렬 작업 또는 구성 요소의 이름입니다. |
ParallelRunStep.inputs | parallel_run_function.inputs | 이 병렬에서 사용하는 입력의 받아쓰기입니다. |
-- | parallel_run_function.input_data | 병렬로 분할 및 처리할 데이터를 선언합니다. |
ParallelRunStep.output | parallel_run_function.outputs | 이 병렬 작업의 출력입니다. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | inputs 와 함께 정의됩니다. |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | 병렬 작업의 인수입니다. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | 동일한 입력이 지정된 경우 병렬에서 동일한 출력을 반환할지 여부를 지정합니다. |
다음 단계
자세한 내용은 다음 설명서를 참조하세요.