Upgrade des Schritts zur parallelen Ausführung auf SDK v2
In SDK v2 wird „Schritt zur parallelen Ausführung“ als parallel job
in das Auftragskonzept integriert. Bei einem Parallelauftrag wird dasselbe Ziel beibehalten, sodass Benutzer die Ausführung ihrer Aufträge beschleunigen können, indem sie wiederkehrende Aufgaben auf leistungsstarke Computecluster mit mehreren Knoten verteilen. Zusätzlich zum Schritt zur parallelen Ausführung bietet ein Parallelauftrag in v2 zusätzliche Vorteile:
- Flexible Oberfläche, auf der Benutzer mehrere benutzerdefinierte Ein- und Ausgaben für Ihren Parallelauftrag festlegen können. Sie können sie mit anderen Schritten verbinden, um ihre Inhalte in Ihrem Einstiegsskript zu nutzen oder zu verwalten.
- Vereinfachen Sie das Eingabeschema, das
Dataset
als Eingabe durch das Konzeptdata asset
von v2 ersetzt. Sie können ganz einfach Ihre lokalen Dateien oder den URI des Blobverzeichnisses als Eingaben in einen Parallelauftrag verwenden. - Leistungsfähigere Features sind nur für Parallelaufträge in v2 in der Entwicklung. Setzen Sie beispielsweise den fehlgeschlagenen/abgebrochenen Parallelauftrag fort, um die fehlgeschlagenen oder nicht verarbeiteten Minibatches weiter zu verarbeiten, indem Sie das erfolgreiche Ergebnis wiederverwenden, um sich so doppelten Aufwand zu ersparen.
Gehen Sie zum Upgrade Ihres aktuellen Schritts zur parallelen Ausführung von SDK v1 auf v2 wie folgt vor:
- Verwenden Sie
parallel_run_function
, um einen Parallelauftrag zu erstellen, wobei SieParallelRunConfig
undParallelRunStep
in v1 ersetzen. - Upgraden Sie Ihre v1-Pipeline zu v2. Rufen Sie dann Ihren v2-Parallelauftrag als Schritt in Ihrer v2-Pipeline auf. Weitere Informationen zum Pipelineupgrade finden Sie unter Upgraden von Pipelines von v1 zu v2.
Hinweis: Das Einstiegsskript ist beim Schritt zur parallelen Ausführung von v1 und v2 kompatibel. Sie können also die gleiche Datei „entry_script.py“ verwenden, wenn Sie Ihren Auftrag zur parallelen Ausführung upgraden.
Dieser Artikel enthält einen Vergleich der Szenarien in SDK v1 und SDK v2. In den folgenden Beispielen erstellen wir einen Parallelauftrag, um Eingabedaten in einem Pipelineauftrag vorherzusagen. Sie erfahren, wie Sie einen Parallelauftrag erstellen und ihn in einem Pipelineauftrag für SDK v1 und SDK v2 verwenden.
Voraussetzungen
- Vorbereiten Ihrer SDK v2-Umgebung: Installieren des Azure Machine Learning SDK v2 für Python
- Verstehen der Grundlagen der SDK v2-Pipeline: Erstellen einer Azure Machine Learning-Pipeline mit dem Python-SDK v2
Erstellen des parallelen Schritts
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Verwenden des parallelen Schritts in der Pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Zuordnung der wichtigsten Funktionen in SDK v1 und SDK v2
Funktionalität im SDK v1 | Grobe Zuordnung in SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Ausgabe |
dataset as_mount | Input (Eingabe) |
Konfigurationen des Parallelauftrags und Zuordnung von Einstellungen
SDK v1 | SDK v2 | BESCHREIBUNG |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Umgebung, in der der Trainingsauftrag ausgeführt wird. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Das Benutzerskript, das parallel auf mehreren Knoten ausgeführt wird. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Die Anzahl der fehlerhaften Minibatches, die in diesem Parallelauftrag ignoriert werden könnten. Wenn die Anzahl der fehlerhaften Minibatches über diesem Schwellenwert liegt, wird der Parallelauftrag als fehlerhaft markiert. „-1“ ist der Standardwert, der bedeutet, dass alle fehlerhaften Minibatches bei Parallelaufträgen ignoriert werden. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggregieren Sie alle Rückgaben aus jedem ausgeführten Minibatch, und geben Sie sie in dieser Datei aus. Kann mithilfe des Ausdrucks ${{outputs.<output_name>}} auf eine der Ausgaben eines Parallelauftrags verweisen |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Optionale Anzahl von Instanzen oder Knoten, die vom Computeziel verwendet werden. Der Standardwert lautet 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Die maximale Parallelität der einzelnen Compute-Instanzen. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definieren Sie die Größe der einzelnen Minibatches, um die Eingabe aufzuteilen. Wenn input_data ein Ordner oder eine Reihe von Dateien ist, bestimmt diese Zahl die Anzahl der Dateien für jeden Mini-Batch. Beispiel: 10, 100. Wenn es sich bei input_data um Tabellendaten aus mltable handelt, bestimmt diese Zahl die ungefähre physische Größe der einzelnen Minibatches. Die Standardeinheit ist Byte, und für den Wert ist eine Zeichenfolge wie 100 KB, 100 MB möglich. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Ein lokaler oder Remotepfad, der auf den Quellcode zeigt. |
ParallelRunConfig.description | parallel_run_function.description | Eine benutzerfreundliche Beschreibung des Parallelauftrags |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Eine Zeichenfolge mit dem Namen des Protokolliergrads, der in „logging“ definiert ist. Mögliche Werte sind „WARNING“, „INFO“ und „DEBUG“. (Optional, Standardwert ist INFO.) Dieser Wert kann über PipelineParameter festgelegt werden. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Das Timeout in Sekunden für die Ausführung der benutzerdefinierten run()-Funktion. Wenn die Ausführungszeit diesen Schwellenwert überschreitet, wird der Minibatch abgebrochen und als fehlerhafter Minibatch markiert, um eine Wiederholung auszulösen. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Die Anzahl der Wiederholungsversuche, wenn ein Minibatch fehlerhaft ist oder ein Timeout auftritt. Wenn alle Wiederholungsversuche fehlerhaft sind, wird der Minibatch als fehlerhaft markiert und mit der Berechnung von mini_batch_error_threshold gezählt. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Kombiniert mit der Einstellung append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Die Anzahl der fehlerhaften Minibatches, die in diesem Parallelauftrag ignoriert werden könnten. Wenn die Anzahl der fehlerhaften Minibatches über diesem Schwellenwert liegt, wird der Parallelauftrag als fehlerhaft markiert. „-1“ ist der Standardwert, der bedeutet, dass alle fehlerhaften Minibatches bei Parallelaufträgen ignoriert werden. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments festgelegt --allowed_failed_percent |
Ähnlich wie allowed_failed_count, aber für diese Einstellung wird der Prozentsatz fehlerhafter Minibatches anstelle der Anzahl verwendet. Der Bereich dieser Einstellung ist [0, 100]. 100 ist der Standardwert, der bedeutet, dass alle fehlerhaften Minibatches bei Parallelaufträgen ignoriert werden. |
ParallelRunConfig.partition_keys | In der Entwicklung. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Ein Wörterbuch mit Umgebungsvariablennamen und Werten. Diese Umgebungsvariablen werden für den Prozess festgelegt, in dem das Benutzerskript ausgeführt wird. |
ParallelRunStep.name | parallel_run_function.name | Name des erstellten Parallelauftrags oder der erstellten Komponente. |
ParallelRunStep.inputs | parallel_run_function.inputs | Ein Wörterbuch mit Eingaben, die von diesem Parallelauftrag verwendet werden. |
-- | parallel_run_function.input_data | Deklarieren der Daten, die aufgeteilt und mit dem Parallelauftrag verarbeitet werden sollen |
ParallelRunStep.output | parallel_run_function.outputs | Die Ausgabe dieses Parallelauftrags. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Wird zusammen mit inputs definiert. |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Die Argumente der parallelen Aufgabe. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Geben Sie an, ob der Parallelauftrag bei gleicher Eingabe die gleiche Ausgabe liefern soll. |
Nächste Schritte
Weitere Informationen finden Sie in folgender Dokumentation: