Mettre à niveau l’étape d’exécution parallèle vers le SDK v2
Dans le SDK v2, l’« étape d’exécution parallèle » est consolidée dans le concept de travail en tant que parallel job
. Le travail parallèle garde la même cible pour permettre aux utilisateurs d’accélérer l’exécution de leur travail en distribuant des tâches répétées sur de puissants clusters de calcul à plusieurs nœuds. En plus de l’étape d’exécution parallèle, le travail parallèle v2 offre des avantages supplémentaires :
- Interface flexible, qui permet à l’utilisateur de définir plusieurs entrées et sorties personnalisées pour votre travail parallèle. Vous pouvez les connecter à d’autres étapes pour consommer ou gérer leur contenu dans votre script d’entrée
- Simplifiez le schéma d’entrée, qui remplace
Dataset
comme entrée à l’aide du concept v2data asset
. Vous pouvez facilement utiliser vos fichiers locaux ou URI d’annuaire d’objets blob comme entrées dans le travail parallèle. - Des fonctionnalités plus puissantes sont en cours de développement uniquement dans le travail parallèle v2. Par exemple, reprenez le travail parallèle ayant échoué/annulé pour continuer à traiter les mini-lots défaillants ou non traités en réutilisant le résultat réussi pour enregistrer les efforts en double.
Pour mettre à niveau votre étape d’exécution parallèle v1 actuelle du kit SDK v1 vers la version v2, vous devez
- Utiliser
parallel_run_function
pour créer un travail parallèle en remplaçantParallelRunConfig
etParallelRunStep
dans la version v1. - Effectuez une mise à niveau de votre pipeline v1 vers la version v2. Appelez ensuite votre travail parallèle v2 en tant qu’étape dans votre pipeline v2. Consultez Comment effectuer une mise à niveau du pipeline v1 vers la version v2 pour plus d’informations sur la mise à niveau du pipeline.
Remarque : Le script d’entrée utilisateur est compatible entre l’étape d’exécution parallèle v1 et le travail parallèle v2. Vous pouvez donc continuer à utiliser le même entry_script.py lorsque vous effectuer une mise à niveau de votre travail d’exécution parallèle.
Cet article fournit une comparaison des scénarios dans le SDK v1 et le SDK v2. Dans les exemples suivants, nous allons créer un travail parallèle pour prédire les données d’entrée dans un travail de pipeline. Vous verrez comment créer un travail parallèle et comment l’utiliser dans un travail de pipeline pour le SDK v1 et le SDK v2.
Prérequis
- Préparer votre environnement SDK v2 : Installer le Kit de développement logiciel (SDK) Azure Machine Learning v2 pour Python
- Comprendre la base du pipeline SDK v2 : Comment créer un pipeline Azure Machine Learning avec le SDK Python v2
Créer une étape parallèle
Kit de développement logiciel (SDK) v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Utiliser l’étape parallèle dans le pipeline
Kit de développement logiciel (SDK) v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mappage des fonctionnalités clés dans le SDK v1 et le SDK v2
Fonctionnalités dans le SDK v1 | Mappage approximatif dans le SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Sortie |
as_mount (jeu de données) | Input |
Configurations de travail parallèle et mappage des paramètres
Kit de développement logiciel (SDK) v1 | SDK v2 | Description |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Environnement dans lequel le travail de formation s’exécutera. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Script utilisateur qui sera exécuté en parallèle sur plusieurs nœuds. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Nombre de mini-lots ayant échoué qui peuvent être ignorés dans ce travail parallèle. Si le nombre de mini-lots ayant échoué est supérieur à ce seuil, le travail parallèle est marqué comme ayant échoué. « -1 » est le nombre par défaut, ce qui signifie ignorer tous les mini-lots ayant échoué pendant le travail parallèle. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Agrégez tous les retours de chaque exécution de mini-lot et sortez-le dans ce fichier. Peut faire référence à l’une des sorties du travail parallèle à l’aide de l’expression ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Nombre facultatif d’instances ou de nœuds utilisés par la cible de calcul. La valeur par défaut est de 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Parallélisme maximal dont chaque instance de calcul dispose. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Définissez la taille de chaque mini-lot pour diviser l’entrée. Si input_data est un dossier ou un ensemble de fichiers, ce nombre définit le nombre de fichiers de chaque mini-lot. Par exemple, 10, 100. Si input_data correspond à des données tabulaires issues de mltable , ce nombre définit la taille physique approximative de chaque mini-lot. L’unité par défaut est l’octet et la valeur peut accepter des chaînes comme 100 Ko, 100 Mo. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Chemin d’accès local ou distant pointant vers le code source. |
ParallelRunConfig.description | parallel_run_function.description | Description conviviale du parallèle |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Chaîne du nom du niveau de journalisation, définie dans « Logging ». Les valeurs possibles sont « WARNING », « INFO »et « DEBUG ». (facultatif, la valeur par défaut est « INFO »). Cette valeur peut être définie via PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Délai d’expiration en secondes pour l’exécution de la fonction run() personnalisée. Si le temps d’exécution est supérieur à ce seuil, le mini-lot est abandonné et marqué comme un mini-lot ayant échoué pour déclencher une nouvelle tentative. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Nombre de nouvelles tentatives lors de l’échec ou de l’expiration du mini-lot. Si toutes les nouvelles tentatives ont échoué, le mini-lot est marqué comme ayant échoué par calcul mini_batch_error_threshold calculation. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Combinée avec le paramètre append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Nombre de mini-lots ayant échoué qui peuvent être ignorés dans ce travail parallèle. Si le nombre de mini-lots ayant échoué est supérieur à ce seuil, le travail parallèle est marqué comme ayant échoué. « -1 » est le nombre par défaut, ce qui signifie ignorer tous les mini-lots ayant échoué pendant le travail parallèle. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
Similaire à « allowed_failed_count », mais ce paramètre utilise le pourcentage de mini-lots ayant échoué au lieu du nombre d’échecs de mini-lots. La plage de ce paramètre est [0, 100]. « 100 » est le nombre par défaut, ce qui signifie ignorer tous les mini-lots ayant échoué pendant le travail parallèle. |
ParallelRunConfig.partition_keys | En cours de développement. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Dictionnaire des noms et valeurs des variables d’environnement. Ces variables d’environnement sont définies sur le processus où le script utilisateur est en cours d’exécution. |
ParallelRunStep.name | parallel_run_function.name | Nom du travail parallèle ou du composant créé. |
ParallelRunStep.inputs | parallel_run_function.inputs | Dictionnaire d’entrées utilisées par ce parallèle. |
-- | parallel_run_function.input_data | Déclarer les données à fractionner et à traiter avec le travail parallèle |
ParallelRunStep.output | parallel_run_function.outputs | Sorties de ce travail parallèle. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Définie avec inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Arguments de la tâche parallèle. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Spécifiez si le parallèle retourne la même sortie en fonction de la même entrée. |
Étapes suivantes
Pour plus d’informations, consultez cette documentation :