Поделиться через


Обновление шага параллельного выполнения до пакета SDK версии 2

В пакете SDK версии 2 "Шаг параллельного выполнения" объединяется в концепцию задания как parallel job. Параллельное задание позволяет пользователям ускорить выполнение задания, распределяя повторяющиеся задачи на мощных вычислительных кластерах с несколькими узлами. На вершине шага параллельного выполнения параллельное задание версии 2 обеспечивает дополнительные преимущества:

  • Гибкий интерфейс, позволяющий пользователю определять несколько пользовательских входных и выходных данных для параллельного задания. Вы можете подключить их к другим шагам, чтобы использовать или управлять их содержимым в скрипте записи.
  • Упрощение схемы ввода, которая заменяет Dataset входные данные с помощью концепции версии 2 data asset . Вы можете легко использовать локальные файлы или URI каталога BLOB-объектов в качестве входных данных для параллельного задания.
  • Более мощные функции разрабатываются только в параллельном задании версии 2. Например, возобновите параллельное задание с ошибкой или отменой, чтобы продолжить обработку неудачных или необработанных мини-пакетов, повторно выполнив успешный результат для сохранения повторяющихся усилий.

Чтобы обновить текущий шаг параллельного выполнения пакета SDK версии 1 до версии 2, вам потребуется выполнить следующие действия.

  • Используется parallel_run_function для создания параллельного задания, заменив ParallelRunConfig и ParallelRunStep в версии 1.
  • Обновите конвейер версии 1 до версии 2. Затем вызовите параллельное задание версии 2 в качестве шага в конвейере версии 2. Сведения об обновлении конвейера с версии 1 до версии 2 см. в статье об обновлении конвейера.

Примечание. Скрипт входа пользователя совместим между этапом параллельного выполнения версии 1 и параллельным заданием версии 2. Таким образом, вы можете использовать те же entry_script.py при обновлении задания параллельного выполнения.

В этой статье приводится сравнение сценариев в пакете SDK версии 1 и пакете SDK версии 2. В следующих примерах мы создадим параллельное задание для прогнозирования входных данных в задании конвейеров. Вы увидите, как создать параллельное задание и как использовать его в задании конвейера для пакета SDK версии 1 и пакета SDK версии 2.

Необходимые компоненты

Создание параллельного шага

  • Пакет SDK версии 1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • Пакет SDK версии 2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Использование параллельного шага в конвейере

  • Пакет SDK версии 1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • Пакет SDK версии 2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Сопоставление ключевых функций в пакете SDK версии 1 и пакете SDK версии 2

Функции пакета SDK версии 1 Грубое сопоставление в пакете SDK версии 2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Выходные данные
as_mount набора данных Входные данные

Сопоставление параллельных конфигураций заданий и параметров

Пакет SDK версии 1 Пакет SDK версии 2 Description
ParallelRunConfig.environment parallel_run_function.task.environment Среда, в которой будет выполняться задание обучения.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Пользовательский скрипт, который будет выполняться параллельно на нескольких узлах.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Количество неудачных мини-пакетов, которые можно игнорировать в этом параллельном задании. Если число неудачных мини-пакетов выше этого порогового значения, параллельное задание будет отмечено как неудачное.

"-1" — это номер по умолчанию, то есть игнорировать все сбои мини-пакета во время параллельного задания.
ParallelRunConfig.output_action parallel_run_function.append_row_to Агрегирование всех возвращается из каждого запуска мини-пакета и выводит его в этот файл. Может ссылаться на один из выходных данных параллельного задания с помощью выражения ${{выходных данных.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Необязательное количество экземпляров или узлов, используемых целевым объектом вычислений. По умолчанию равен 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Максимальное параллелизм, которое имеет каждый вычислительный экземпляр.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Определите размер каждого мини-пакета для разделения входных данных.

Если input_data является папкой или набором файлов, это число определяет количество файлов для каждого мини-пакета. Например, 10, 100.

Если input_data является табличными данными из mltable, это число определяет прокси-размер физического размера для каждого мини-пакета. Единица по умолчанию — Байт, и значение может принимать строку, например 100 кб, 100 МБ.
ParallelRunConfig.source_directory parallel_run_function.task.code Локальный или удаленный путь, указывающий на исходный код.
ParallelRunConfig.description parallel_run_function.description Понятное описание параллели
ParallelRunConfig.logging_level parallel_run_function.log_level Строка имени уровня ведения журнала, которая определена в параметре "logging". Возможные значения: "WARNING" (Предупреждение), "INFO" (Информация) и "DEBUG" (Отладка). (необязательно, значение по умолчанию — INFO.) Это значение можно задать с помощью PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Время ожидания в секундах для выполнения пользовательской функции run(). Если время выполнения превышает это пороговое значение, мини-пакет будет прерван и помечен как сбой мини-пакет для активации повторных попыток.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_повторные попытки Количество повторных попыток при сбое мини-пакета или истечении времени ожидания. Если все повторные попытки завершаются ошибкой, мини-пакет будет помечен как не удалось подсчитать по mini_batch_error_threshold вычислению.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to В сочетании с append_row_to параметром.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Количество неудачных мини-пакетов, которые можно игнорировать в этом параллельном задании. Если число неудачных мини-пакетов выше этого порогового значения, параллельное задание будет отмечено как неудачное.

"-1" — это номер по умолчанию, то есть игнорировать все сбои мини-пакета во время параллельного задания.
ParallelRunConfig.allowed_failed_percent набор parallel_run_function.task.program_arguments
--allowed_failed_percent
Аналогично "allowed_failed_count", но этот параметр использует процент неудачных мини-пакетов вместо числа сбоев мини-пакета.

Диапазон этого параметра — [0, 100]. "100" — это номер по умолчанию, что означает игнорировать все неудачные мини-пакет во время параллельного задания.
ParallelRunConfig.partition_keys В процессе разработки.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Словарь имен и значений переменных среды. Эти переменные среды задаются для процесса, в котором выполняется пользовательский скрипт.
ParallelRunStep.name parallel_run_function.name Имя созданного параллельного задания или компонента.
ParallelRunStep.inputs parallel_run_function.inputs Диктовка входных данных, используемых этим параллелем.
-- parallel_run_function.input_data Объявите данные, которые будут разделены и обработаны параллельно
ParallelRunStep.output parallel_run_function.outputs Выходные данные этого параллельного задания.
ParallelRunStep.side_inputs parallel_run_function.inputs Определяется вместе с inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Аргументы параллельной задачи.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Укажите, будет ли параллель возвращать одинаковые выходные данные с одинаковыми входами.

Следующие шаги

Дополнительные сведения см. в документации: