Troubleshooting the ParallelRunStep
APPLIES TO: Python SDK azureml v1
In this article, you learn how to troubleshoot when you get errors using the ParallelRunStep class from the Azure Machine Learning SDK.
For general tips on troubleshooting a pipeline, see Troubleshooting machine learning pipelines.
Testing scripts locally
Your ParallelRunStep runs as a step in ML pipelines. You might want to test your scripts locally as a first step.
Entry script requirements
The entry script for a ParallelRunStep
must contain a run()
function and optionally contains an init()
function:
init()
: Use this function for any costly or common preparation for later processing. For example, use it to load the model into a global object. This function will be called only once at beginning of process.Note
If your
init
method creates an output directory, specify thatparents=True
andexist_ok=True
. Theinit
method is called from each worker process on every node on which the job is running.run(mini_batch)
: The function will run for eachmini_batch
instance.mini_batch
:ParallelRunStep
will invoke run method and pass either a list or pandasDataFrame
as an argument to the method. Each entry in mini_batch will be a file path if input is aFileDataset
or a pandasDataFrame
if input is aTabularDataset
.response
: run() method should return a pandasDataFrame
or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful run of input element in the input mini-batch. Make sure that enough data is included in run result to map input to run output result. Run output will be written in output file and not guaranteed to be in order, you should use some key in the output to map it to input.Note
One output element is expected for one input element.
%%writefile digit_identification.py
# Snippets from a sample script.
# Refer to the accompanying digit_identification.py
# (https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run)
# for the implementation script.
import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model
def init():
global g_tf_sess
# Pull down the model from the workspace
model_path = Model.get_model_path("mnist")
# Construct a graph to execute
tf.reset_default_graph()
saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
g_tf_sess = tf.Session()
saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))
def run(mini_batch):
print(f'run method start: {__file__}, run({mini_batch})')
resultList = []
in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")
for image in mini_batch:
# Prepare each image
data = Image.open(image)
np_im = np.array(data).reshape((1, 784))
# Perform inference
inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
# Find the best probability, and add it to the result list
best_result = np.argmax(inference_result)
resultList.append("{}: {}".format(os.path.basename(image), best_result))
return resultList
If you have another file or folder in the same directory as your inference script, you can reference it by finding the current working directory. If you want to import your packages, you can also append your package folder to sys.path
.
script_dir = os.path.realpath(os.path.join(__file__, '..',))
file_path = os.path.join(script_dir, "<file_name>")
packages_dir = os.path.join(file_path, '<your_package_folder>')
if packages_dir not in sys.path:
sys.path.append(packages_dir)
from <your_package> import <your_class>
Parameters for ParallelRunConfig
ParallelRunConfig
is the major configuration for ParallelRunStep
instance within the Azure Machine Learning pipeline. You use it to wrap your script and configure necessary parameters, including all of the following entries:
entry_script
: A user script as a local file path that will be run in parallel on multiple nodes. Ifsource_directory
is present, use a relative path. Otherwise, use any path that's accessible on the machine.mini_batch_size
: The size of the mini-batch passed to a singlerun()
call. (optional; the default value is10
files forFileDataset
and1MB
forTabularDataset
.)- For
FileDataset
, it's the number of files with a minimum value of1
. You can combine multiple files into one mini-batch. - For
TabularDataset
, it's the size of data. Example values are1024
,1024KB
,10MB
, and1GB
. The recommended value is1MB
. The mini-batch fromTabularDataset
will never cross file boundaries. For example, if you have .csv files with various sizes, the smallest file is 100 KB and the largest is 10 MB. If you setmini_batch_size = 1MB
, then files with a size smaller than 1 MB will be treated as one mini-batch. Files with a size larger than 1 MB will be split into multiple mini-batches.Note
TabularDatasets backed by SQL cannot be partitioned. TabularDatasets from a single parquet file and single row group cannot be partitioned.
- For
error_threshold
: The number of record failures forTabularDataset
and file failures forFileDataset
that should be ignored during processing. If the error count for the entire input goes above this value, the job will be aborted. The error threshold is for the entire input and not for individual mini-batch sent to therun()
method. The range is[-1, int.max]
. The-1
part indicates ignoring all failures during processing.output_action
: One of the following values indicates how the output will be organized:summary_only
: The user script will store the output.ParallelRunStep
will use the output only for the error threshold calculation.append_row
: For all inputs, only one file will be created in the output folder to append all outputs separated by line.
append_row_file_name
: To customize the output file name for append_row output_action (optional; default value isparallel_run_step.txt
).source_directory
: Paths to folders that contain all files to execute on the compute target (optional).compute_target
: OnlyAmlCompute
is supported.node_count
: The number of compute nodes to be used for running the user script.process_count_per_node
: The number of worker processes per node to run the entry script in parallel. For a GPU machine, the default value is 1. For a CPU machine, the default value is the number of cores per node. A worker process will callrun()
repeatedly by passing the mini batch it gets. The total number of worker processes in your job isprocess_count_per_node * node_count
, which decides the max number ofrun()
to execute in parallel.environment
: The Python environment definition. You can configure it to use an existing Python environment or to set up a temporary environment. The definition is also responsible for setting the required application dependencies (optional).logging_level
: Log verbosity. Values in increasing verbosity are:WARNING
,INFO
, andDEBUG
. (optional; the default value isINFO
)run_invocation_timeout
: Therun()
method invocation timeout in seconds. (optional; default value is60
)run_max_try
: Maximum try count ofrun()
for a mini-batch. Arun()
is failed if an exception is thrown, or nothing is returned whenrun_invocation_timeout
is reached (optional; default value is3
).
You can specify mini_batch_size
, node_count
, process_count_per_node
, logging_level
, run_invocation_timeout
, and run_max_try
as PipelineParameter
, so that when you resubmit a pipeline run, you can fine-tune the parameter values. In this example, you use PipelineParameter
for mini_batch_size
and Process_count_per_node
and you will change these values when you resubmit another run.
CUDA devices visibility
For compute targets equipped with GPUs, the environment variable CUDA_VISIBLE_DEVICES
will be set in worker processes. In AmlCompute, you can find the total number of GPU devices in the environment variable AZ_BATCHAI_GPU_COUNT_FOUND
, which is set automatically. If you want each worker process to have a dedicated GPU, set process_count_per_node
equal to the number of GPU devices on a machine. Each worker process will assign a unique index to CUDA_VISIBLE_DEVICES
. If a worker process stops for any reason, the next started worker process will use the released GPU index.
If the total number of GPU devices is less than process_count_per_node
, the worker processes will be assigned GPU index until all have been used.
Given the total GPU devices is 2 and process_count_per_node = 4
as an example, process 0 and process 1 will have index 0 and 1. Process 2 and 3 won't have an environment variable. For a library using this environment variable for GPU assignment, process 2 and 3 won't have GPUs and won't try to acquire GPU devices. If process 0 stops, it will release GPU index 0. The next process, which is process 4, will have GPU index 0 assigned.
For more information, see CUDA Pro Tip: Control GPU Visibility with CUDA_VISIBLE_DEVICES.
Parameters for creating the ParallelRunStep
Create the ParallelRunStep by using the script, environment configuration, and parameters. Specify the compute target that you already attached to your workspace as the target of execution for your inference script. Use ParallelRunStep
to create the batch inference pipeline step, which takes all the following parameters:
name
: The name of the step, with the following naming restrictions: unique, 3-32 characters, and regex ^[a-z]([-a-z0-9]*[a-z0-9])?$.parallel_run_config
: AParallelRunConfig
object, as defined earlier.inputs
: One or more single-typed Azure Machine Learning datasets to be partitioned for parallel processing.side_inputs
: One or more reference data or datasets used as side inputs without need to be partitioned.output
: AnOutputFileDatasetConfig
object that represents the directory path at which the output data will be stored.arguments
: A list of arguments passed to the user script. Use unknown_args to retrieve them in your entry script (optional).allow_reuse
: Whether the step should reuse previous results when run with the same settings/inputs. If this parameter isFalse
, a new run will always be generated for this step during pipeline execution. (optional; the default value isTrue
.)
from azureml.pipeline.steps import ParallelRunStep
parallelrun_step = ParallelRunStep(
name="predict-digits-mnist",
parallel_run_config=parallel_run_config,
inputs=[input_mnist_ds_consumption],
output=output_dir,
allow_reuse=True
)
Debugging scripts from remote context
The transition from debugging a scoring script locally to debugging a scoring script in an actual pipeline can be a difficult leap. For information on finding your logs in the portal, see machine learning pipelines section on debugging scripts from a remote context. The information in that section also applies to a ParallelRunStep.
For example, the log file 70_driver_log.txt
contains information from the controller that launches the ParallelRunStep code.
Because of the distributed nature of ParallelRunStep jobs, there are logs from several different sources. However, two consolidated files are created that provide high-level information:
~/logs/job_progress_overview.txt
: This file provides a high-level info about the number of mini-batches (also known as tasks) created so far and number of mini-batches processed so far. At this end, it shows the result of the job. If the job failed, it will show the error message and where to start the troubleshooting.~/logs/sys/master_role.txt
: This file provides the principal node (also known as the orchestrator) view of the running job. Includes task creation, progress monitoring, the run result.
Logs generated from entry script using EntryScript helper and print statements will be found in following files:
~/logs/user/entry_script_log/<node_id>/<process_name>.log.txt
: These files are the logs written from entry_script using EntryScript helper.~/logs/user/stdout/<node_id>/<process_name>.stdout.txt
: These files are the logs from stdout (for example, print statement) of entry_script.~/logs/user/stderr/<node_id>/<process_name>.stderr.txt
: These files are the logs from stderr of entry_script.
For a concise understanding of errors in your script there is:
~/logs/user/error.txt
: This file will try to summarize the errors in your script.
For more information on errors in your script, there is:
~/logs/user/error/
: Contains full stack traces of exceptions thrown while loading and running entry script.
When you need a full understanding of how each node executed the score script, look at the individual process logs for each node. The process logs can be found in the sys/node
folder, grouped by worker nodes:
~/logs/sys/node/<node_id>/<process_name>.txt
: This file provides detailed info about each mini-batch as it's picked up or completed by a worker. For each mini-batch, this file includes:- The IP address and the PID of the worker process.
- The total number of items, successfully processed items count, and failed item count.
- The start time, duration, process time and run method time.
You can also view the results of periodical checks of the resource usage for each node. The log files and setup files are in this folder:
~/logs/perf
: Set--resource_monitor_interval
to change the checking interval in seconds. The default interval is600
, which is approximately 10 minutes. To stop the monitoring, set the value to0
. Each<node_id>
folder includes:os/
: Information about all running processes in the node. One check runs an operating system command and saves the result to a file. On Linux, the command isps
. On Windows, usetasklist
.%Y%m%d%H
: The sub folder name is the time to hour.processes_%M
: The file ends with the minute of the checking time.
node_disk_usage.csv
: Detailed disk usage of the node.node_resource_usage.csv
: Resource usage overview of the node.processes_resource_usage.csv
: Resource usage overview of each process.
How do I log from my user script from a remote context?
ParallelRunStep may run multiple processes on one node based on process_count_per_node. In order to organize logs from each process on node and combine print and log statement, we recommend using ParallelRunStep logger as shown below. You get a logger from EntryScript and make the logs show up in logs/user folder in the portal.
A sample entry script using the logger:
from azureml_user.parallel_run import EntryScript
def init():
"""Init once in a worker process."""
entry_script = EntryScript()
logger = entry_script.logger
logger.info("This will show up in files under logs/user on the Azure portal.")
def run(mini_batch):
"""Call once for a mini batch. Accept and return the list back."""
# This class is in singleton pattern and will return same instance as the one in init()
entry_script = EntryScript()
logger = entry_script.logger
logger.info(f"{__file__}: {mini_batch}.")
...
return mini_batch
Where does the message from Python logging
sink to?
ParallelRunStep sets a handler on the root logger, which sinks the message to logs/user/stdout/<node_id>/processNNN.stdout.txt
.
logging
defaults to INFO
level. By default, levels below INFO
won't show up, such as DEBUG
.
How could I write to a file to show up in the portal?
Files in logs
folder will be uploaded and show up in the portal.
You can get the folder logs/user/entry_script_log/<node_id>
like below and compose your file path to write:
from pathlib import Path
from azureml_user.parallel_run import EntryScript
def init():
"""Init once in a worker process."""
entry_script = EntryScript()
log_dir = entry_script.log_dir
log_dir = Path(entry_script.log_dir) # logs/user/entry_script_log/<node_id>/.
log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
proc_name = entry_script.agent_name # The process name in pattern "processNNN".
fil_path = log_dir / f"{proc_name}_<file_name>" # Avoid conflicting among worker processes with proc_name.
How to handle log in new processes?
You can spawn new processes in your entry script with subprocess
module, connect to their input/output/error pipes and obtain their return codes.
The recommended approach is to use the run()
function with capture_output=True
. Errors will show up in logs/user/error/<node_id>/<process_name>.txt
.
If you want to use Popen()
, you should redirect stdout/stderr to files, like:
from pathlib import Path
from subprocess import Popen
from azureml_user.parallel_run import EntryScript
def init():
"""Show how to redirect stdout/stderr to files in logs/user/entry_script_log/<node_id>/."""
entry_script = EntryScript()
proc_name = entry_script.agent_name # The process name in pattern "processNNN".
log_dir = Path(entry_script.log_dir) # logs/user/entry_script_log/<node_id>/.
log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
stdout_file = str(log_dir / f"{proc_name}_demo_stdout.txt")
stderr_file = str(log_dir / f"{proc_name}_demo_stderr.txt")
proc = Popen(
["...")],
stdout=open(stdout_file, "w"),
stderr=open(stderr_file, "w"),
# ...
)
Note
A worker process runs "system" code and the entry script code in the same process.
If no stdout
or stderr
specified, a subprocess created with Popen()
in your entry script will inherit the setting of the worker process.
stdout
will write to logs/sys/node/<node_id>/processNNN.stdout.txt
and stderr
to logs/sys/node/<node_id>/processNNN.stderr.txt
.
How do I write a file to the output directory, and then view it in the portal?
You can get the output directory from the EntryScript
class and write to it. To view the written files, in the step Run view in the Azure Machine Learning portal, select the Outputs + logs tab. Select the Data outputs link, and then complete the steps that are described in the dialog.
Use EntryScript
in your entry script like in this example:
from pathlib import Path
from azureml_user.parallel_run import EntryScript
def run(mini_batch):
output_dir = Path(entry_script.output_dir)
(Path(output_dir) / res1).write...
(Path(output_dir) / res2).write...
How can I pass a side input such as, a file or file(s) containing a lookup table, to all my workers?
User can pass reference data to script using side_inputs parameter of ParalleRunStep. All datasets provided as side_inputs will be mounted on each worker node. User can get the location of mount by passing argument.
Construct a Dataset containing the reference data, specify a local mount path and register it with your workspace. Pass it to the side_inputs
parameter of your ParallelRunStep
. Additionally, you can add its path in the arguments
section to easily access its mounted path.
Note
Use FileDatasets only for side_inputs.
local_path = "/tmp/{}".format(str(uuid.uuid4()))
label_config = label_ds.as_named_input("labels_input").as_mount(local_path)
batch_score_step = ParallelRunStep(
name=parallel_step_name,
inputs=[input_images.as_named_input("input_images")],
output=output_dir,
arguments=["--labels_dir", label_config],
side_inputs=[label_config],
parallel_run_config=parallel_run_config,
)
After that you can access it in your inference script (for example, in your init() method) as follows:
parser = argparse.ArgumentParser()
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()
labels_path = args.labels_dir
How to use input datasets with service principal authentication?
User can pass input datasets with service principal authentication used in workspace. Using such dataset in ParallelRunStep requires that dataset to be registered for it to construct ParallelRunStep configuration.
service_principal = ServicePrincipalAuthentication(
tenant_id="***",
service_principal_id="***",
service_principal_password="***")
ws = Workspace(
subscription_id="***",
resource_group="***",
workspace_name="***",
auth=service_principal
)
default_blob_store = ws.get_default_datastore() # or Datastore(ws, '***datastore-name***')
ds = Dataset.File.from_files(default_blob_store, '**path***')
registered_ds = ds.register(ws, '***dataset-name***', create_new_version=True)
How to Check Progress and Analyze it
This section is about how to check the progress of a ParallelRunStep job and check the cause of unexpected behavior.
How to check job progress?
Besides looking at the overall status of the StepRun, the count of scheduled/processed mini-batches and the progress of generating output can be viewed in ~/logs/job_progress_overview.<timestamp>.txt
. The file rotates on daily basis, you can check the one with the largest timestamp for the latest information.
What should I check if there is no progress for a while?
You can go into ~/logs/sys/errror
to see if there's any exception. If there is none, it's likely that your entry script is taking a long time, you can print out progress information in your code to locate the time-consuming part, or add "--profiling_module", "cProfile"
to the arguments
of ParallelRunStep
to generate a profile file named as <process_name>.profile
under ~/logs/sys/node/<node_id>
folder.
When will a job stop?
if not canceled, the job will stop with status:
- Completed. If all mini-batches have been processed and output has been generated for
append_row
mode. - Failed. If
error_threshold
inParameters for ParallelRunConfig
is exceeded, or system error occurred during the job.
Where to find the root cause of failure?
You can follow the lead in ~logs/job_result.txt
to find the cause and detailed error log.
Will node failure impact the job result?
Not if there are other available nodes in the designated compute cluster. The orchestrator will start a new node as replacement, and ParallelRunStep is resilient to such operation.
What happens if init
function in entry script fails?
ParallelRunStep has mechanism to retry for a certain time to give chance for recovery from transient issues without delaying the job failure for too long, the mechanism is as follows:
- If after a node starts,
init
on all agents keeps failing, we will stop trying after3 * process_count_per_node
failures. - If after job starts,
init
on all agents of all nodes keeps failing, we will stop trying if job runs more than 2 minutes and there're2 * node_count * process_count_per_node
failures. - If all agents are stuck on
init
for more than3 * run_invocation_timeout + 30
seconds, the job would fail because of no progress for too long.
What will happen on OutOfMemory? How can I check the cause?
ParallelRunStep will set the current attempt to process the mini-batch to failure status and try to restart the failed process. You can check ~logs/perf/<node_id>
to find the memory-consuming process.
Why do I have a lot of processNNN files?
ParallelRunStep will start new worker processes in replace of the ones exited abnormally, and each process will generate a processNNN
file as log. However, if the process failed because of exception during the init
function of user script, and that the error repeated continuously for 3 * process_count_per_node
times, no new worker process will be started.
Next steps
See these Jupyter notebooks demonstrating Azure Machine Learning pipelines
See the SDK reference for help with the azureml-pipeline-steps package.
View reference documentation for ParallelRunConfig class and documentation for ParallelRunStep class.
Follow the advanced tutorial on using pipelines with ParallelRunStep. The tutorial shows how to pass another file as a side input.