Partager via


Mettre à niveau l’étape d’exécution parallèle vers le SDK v2

Dans le SDK v2, l’« étape d’exécution parallèle » est consolidée dans le concept de travail en tant que parallel job. Le travail parallèle garde la même cible pour permettre aux utilisateurs d’accélérer l’exécution de leur travail en distribuant des tâches répétées sur de puissants clusters de calcul à plusieurs nœuds. En plus de l’étape d’exécution parallèle, le travail parallèle v2 offre des avantages supplémentaires :

  • Interface flexible, qui permet à l’utilisateur de définir plusieurs entrées et sorties personnalisées pour votre travail parallèle. Vous pouvez les connecter à d’autres étapes pour consommer ou gérer leur contenu dans votre script d’entrée
  • Simplifiez le schéma d’entrée, qui remplace Dataset comme entrée à l’aide du concept v2 data asset. Vous pouvez facilement utiliser vos fichiers locaux ou URI d’annuaire d’objets blob comme entrées dans le travail parallèle.
  • Des fonctionnalités plus puissantes sont en cours de développement uniquement dans le travail parallèle v2. Par exemple, reprenez le travail parallèle ayant échoué/annulé pour continuer à traiter les mini-lots défaillants ou non traités en réutilisant le résultat réussi pour enregistrer les efforts en double.

Pour mettre à niveau votre étape d’exécution parallèle v1 actuelle du kit SDK v1 vers la version v2, vous devez

  • Utiliser parallel_run_function pour créer un travail parallèle en remplaçant ParallelRunConfig et ParallelRunStep dans la version v1.
  • Effectuez une mise à niveau de votre pipeline v1 vers la version v2. Appelez ensuite votre travail parallèle v2 en tant qu’étape dans votre pipeline v2. Consultez Comment effectuer une mise à niveau du pipeline v1 vers la version v2 pour plus d’informations sur la mise à niveau du pipeline.

Remarque : Le script d’entrée utilisateur est compatible entre l’étape d’exécution parallèle v1 et le travail parallèle v2. Vous pouvez donc continuer à utiliser le même entry_script.py lorsque vous effectuer une mise à niveau de votre travail d’exécution parallèle.

Cet article fournit une comparaison des scénarios dans le SDK v1 et le SDK v2. Dans les exemples suivants, nous allons créer un travail parallèle pour prédire les données d’entrée dans un travail de pipeline. Vous verrez comment créer un travail parallèle et comment l’utiliser dans un travail de pipeline pour le SDK v1 et le SDK v2.

Prérequis

Créer une étape parallèle

  • Kit de développement logiciel (SDK) v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Utiliser l’étape parallèle dans le pipeline

  • Kit de développement logiciel (SDK) v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mappage des fonctionnalités clés dans le SDK v1 et le SDK v2

Fonctionnalités dans le SDK v1 Mappage approximatif dans le SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Sortie
as_mount (jeu de données) Input

Configurations de travail parallèle et mappage des paramètres

Kit de développement logiciel (SDK) v1 SDK v2 Description
ParallelRunConfig.environment parallel_run_function.task.environment Environnement dans lequel le travail de formation s’exécutera.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Script utilisateur qui sera exécuté en parallèle sur plusieurs nœuds.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Nombre de mini-lots ayant échoué qui peuvent être ignorés dans ce travail parallèle. Si le nombre de mini-lots ayant échoué est supérieur à ce seuil, le travail parallèle est marqué comme ayant échoué.

« -1 » est le nombre par défaut, ce qui signifie ignorer tous les mini-lots ayant échoué pendant le travail parallèle.
ParallelRunConfig.output_action parallel_run_function.append_row_to Agrégez tous les retours de chaque exécution de mini-lot et sortez-le dans ce fichier. Peut faire référence à l’une des sorties du travail parallèle à l’aide de l’expression ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Nombre facultatif d’instances ou de nœuds utilisés par la cible de calcul. La valeur par défaut est de 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Parallélisme maximal dont chaque instance de calcul dispose.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Définissez la taille de chaque mini-lot pour diviser l’entrée.

Si input_data est un dossier ou un ensemble de fichiers, ce nombre définit le nombre de fichiers de chaque mini-lot. Par exemple, 10, 100.

Si input_data correspond à des données tabulaires issues de mltable, ce nombre définit la taille physique approximative de chaque mini-lot. L’unité par défaut est l’octet et la valeur peut accepter des chaînes comme 100 Ko, 100 Mo.
ParallelRunConfig.source_directory parallel_run_function.task.code Chemin d’accès local ou distant pointant vers le code source.
ParallelRunConfig.description parallel_run_function.description Description conviviale du parallèle
ParallelRunConfig.logging_level parallel_run_function.logging_level Chaîne du nom du niveau de journalisation, définie dans « Logging ». Les valeurs possibles sont « WARNING », « INFO »et « DEBUG ». (facultatif, la valeur par défaut est « INFO »). Cette valeur peut être définie via PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Délai d’expiration en secondes pour l’exécution de la fonction run() personnalisée. Si le temps d’exécution est supérieur à ce seuil, le mini-lot est abandonné et marqué comme un mini-lot ayant échoué pour déclencher une nouvelle tentative.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Nombre de nouvelles tentatives lors de l’échec ou de l’expiration du mini-lot. Si toutes les nouvelles tentatives ont échoué, le mini-lot est marqué comme ayant échoué par calcul mini_batch_error_threshold calculation.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Combinée avec le paramètre append_row_to.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Nombre de mini-lots ayant échoué qui peuvent être ignorés dans ce travail parallèle. Si le nombre de mini-lots ayant échoué est supérieur à ce seuil, le travail parallèle est marqué comme ayant échoué.

« -1 » est le nombre par défaut, ce qui signifie ignorer tous les mini-lots ayant échoué pendant le travail parallèle.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
Similaire à « allowed_failed_count », mais ce paramètre utilise le pourcentage de mini-lots ayant échoué au lieu du nombre d’échecs de mini-lots.

La plage de ce paramètre est [0, 100]. « 100 » est le nombre par défaut, ce qui signifie ignorer tous les mini-lots ayant échoué pendant le travail parallèle.
ParallelRunConfig.partition_keys En cours de développement.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Dictionnaire des noms et valeurs des variables d’environnement. Ces variables d’environnement sont définies sur le processus où le script utilisateur est en cours d’exécution.
ParallelRunStep.name parallel_run_function.name Nom du travail parallèle ou du composant créé.
ParallelRunStep.inputs parallel_run_function.inputs Dictionnaire d’entrées utilisées par ce parallèle.
-- parallel_run_function.input_data Déclarer les données à fractionner et à traiter avec le travail parallèle
ParallelRunStep.output parallel_run_function.outputs Sorties de ce travail parallèle.
ParallelRunStep.side_inputs parallel_run_function.inputs Définie avec inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Arguments de la tâche parallèle.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Spécifiez si le parallèle retourne la même sortie en fonction de la même entrée.

Étapes suivantes

Pour plus d’informations, consultez cette documentation :